You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/03/03 19:43:22 UTC
[pinot] branch master updated: Add support for multiple comparison columns (i.e. one comparison column per producer sinking to a table) (#10234)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2e03df02f6 Add support for multiple comparison columns (i.e. one comparison column per producer sinking to a table) (#10234)
2e03df02f6 is described below
commit 2e03df02f65d766bde040389e4e6b12ea09710fa
Author: Evan Galpin <eg...@users.noreply.github.com>
AuthorDate: Fri Mar 3 11:43:16 2023 -0800
Add support for multiple comparison columns (i.e. one comparison column per producer sinking to a table) (#10234)
---
.../multipleIndexPartialUpsertConfig.json | 41 +++++++++++
...artialUpsertMultipleComparisonColumnConfig.json | 41 +++++++++++
.../api/resources/PinotUpsertRestletResource.java | 27 +++++---
.../realtime/LLRealtimeSegmentDataManager.java | 2 +-
...adataAndDictionaryAggregationPlanMakerTest.java | 3 +-
.../indexsegment/mutable/MutableSegmentImpl.java | 48 ++++++++++---
.../local/realtime/impl/RealtimeSegmentConfig.java | 18 ++---
.../upsert/BasePartitionUpsertMetadataManager.java | 14 ++--
.../upsert/BaseTableUpsertMetadataManager.java | 11 +--
.../segment/local/upsert/ComparisonColumns.java | 67 ++++++++++++++++++
...oncurrentMapPartitionUpsertMetadataManager.java | 4 +-
.../ConcurrentMapTableUpsertMetadataManager.java | 2 +-
.../segment/local/upsert/PartialUpsertHandler.java | 8 +--
.../pinot/segment/local/upsert/UpsertUtils.java | 79 ++++++++++++++++++++--
.../segment/local/utils/TableConfigUtils.java | 12 ++--
.../ImmutableSegmentImplUpsertSnapshotTest.java | 2 +-
.../mutable/MutableSegmentImplTestUtils.java | 4 +-
.../mutable/MutableSegmentImplUpsertTest.java | 64 ++++++++++++++----
...rrentMapPartitionUpsertMetadataManagerTest.java | 15 ++--
.../local/upsert/PartialUpsertHandlerTest.java | 7 +-
.../src/test/resources/data/test_upsert_data.json | 21 +++++-
.../test/resources/data/test_upsert_schema.json | 4 ++
.../apache/pinot/spi/config/table/TableConfig.java | 4 +-
.../pinot/spi/config/table/UpsertConfig.java | 56 +++++++--------
.../pinot/spi/config/table/UpsertConfigTest.java | 3 +-
25 files changed, 443 insertions(+), 114 deletions(-)
diff --git a/pinot-common/src/test/resources/testConfigs/multipleIndexPartialUpsertConfig.json b/pinot-common/src/test/resources/testConfigs/multipleIndexPartialUpsertConfig.json
new file mode 100644
index 0000000000..8e273a0514
--- /dev/null
+++ b/pinot-common/src/test/resources/testConfigs/multipleIndexPartialUpsertConfig.json
@@ -0,0 +1,41 @@
+{
+ "tableName": "testTableUpsert",
+ "segmentsConfig": {
+ "replication": "1",
+ "schemaName": "testTable",
+ "timeColumnName": "time"
+ },
+ "fieldConfigList": [
+ {
+ "encodingType": "DICTIONARY",
+ "indexTypes": [
+ "INVERTED",
+ "RANGE"
+ ],
+ "name": "hits"
+ }
+ ],
+ "tableIndexConfig": {
+ "invertedIndexColumns": [
+ "hits"
+ ],
+ "rangeIndexColumns": [
+ "hits"
+ ],
+ "loadMode": "HEAP"
+ },
+ "tenants": {
+ "broker": "DefaultTenant",
+ "server": "DefaultTenant"
+ },
+ "tableType": "REALTIME",
+ "upsertConfig": {
+ "mode": "PARTIAL",
+ "comparisonColumn": "_comparison_column",
+ "partialUpsertStrategies": {},
+ "defaultPartialUpsertStrategy": "OVERWRITE",
+ "hashFunction": "MURMUR3",
+ "enableSnapshot": false
+ },
+ "metadata": {}
+}
diff --git a/pinot-common/src/test/resources/testConfigs/multipleIndexPartialUpsertMultipleComparisonColumnConfig.json b/pinot-common/src/test/resources/testConfigs/multipleIndexPartialUpsertMultipleComparisonColumnConfig.json
new file mode 100644
index 0000000000..06ac7c2624
--- /dev/null
+++ b/pinot-common/src/test/resources/testConfigs/multipleIndexPartialUpsertMultipleComparisonColumnConfig.json
@@ -0,0 +1,41 @@
+{
+ "tableName": "testTableUpsertMultiComparison",
+ "segmentsConfig": {
+ "replication": "1",
+ "schemaName": "testTable",
+ "timeColumnName": "time"
+ },
+ "fieldConfigList": [
+ {
+ "encodingType": "DICTIONARY",
+ "indexTypes": [
+ "INVERTED",
+ "RANGE"
+ ],
+ "name": "hits"
+ }
+ ],
+ "tableIndexConfig": {
+ "invertedIndexColumns": [
+ "hits"
+ ],
+ "rangeIndexColumns": [
+ "hits"
+ ],
+ "loadMode": "HEAP"
+ },
+ "tenants": {
+ "broker": "DefaultTenant",
+ "server": "DefaultTenant"
+ },
+ "tableType": "REALTIME",
+ "upsertConfig": {
+ "mode": "PARTIAL",
+ "comparisonColumns": ["_comparison_column", "foo"],
+ "partialUpsertStrategies": {},
+ "defaultPartialUpsertStrategy": "OVERWRITE",
+ "hashFunction": "MURMUR3",
+ "enableSnapshot": false
+ },
+ "metadata": {}
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertRestletResource.java
index 363b288ef6..0c0fdaa8e0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertRestletResource.java
@@ -129,14 +129,25 @@ public class PinotUpsertRestletResource {
// Estimated value space, it contains <segmentName, DocId, ComparisonValue(timestamp)> and overhead.
// Here we only calculate the map content size. TODO: Add the map entry size and the array size within the map.
int bytesPerValue = 60;
- String comparisonColumn = tableConfig.getUpsertConfig().getComparisonColumn();
- if (comparisonColumn != null) {
- FieldSpec.DataType dt = schema.getFieldSpecFor(comparisonColumn).getDataType();
- if (!dt.isFixedWidth()) {
- String msg = "Not support data types for the comparison column";
- throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
- } else {
- bytesPerValue = 52 + dt.size();
+ List<String> comparisonColumns = tableConfig.getUpsertConfig().getComparisonColumns();
+ if (comparisonColumns != null) {
+ int bytesPerArrayElem = 8; // object ref
+ bytesPerValue = 52;
+ for (String columnName : comparisonColumns) {
+ FieldSpec.DataType dt = schema.getFieldSpecFor(columnName).getDataType();
+ if (!dt.isFixedWidth()) {
+ String msg = "Not support data types for the comparison column";
+ throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+ } else {
+ if (comparisonColumns.size() == 1) {
+ bytesPerValue += dt.size();
+ } else {
+ bytesPerValue += bytesPerArrayElem + dt.size();
+ }
+ }
+ }
+ if (comparisonColumns.size() > 1) {
+ bytesPerValue += 48 + 4; // array overhead + comparableIndex integer
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 28a24ad937..9d962dbe3f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1394,7 +1394,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
.setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode())
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
- .setUpsertComparisonColumn(tableConfig.getUpsertComparisonColumn())
+ .setUpsertComparisonColumns(tableConfig.getUpsertComparisonColumns())
.setFieldConfigList(tableConfig.getFieldConfigList());
// Create message decoder
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index f6df4e9b4e..66fbf400d0 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -130,7 +130,8 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
_upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
- "daysSinceEpoch", HashFunction.NONE, null, false, serverMetrics), new ThreadSafeMutableRoaringBitmap());
+ Collections.singletonList("daysSinceEpoch"), HashFunction.NONE, null, false, serverMetrics),
+ new ThreadSafeMutableRoaringBitmap());
}
@AfterClass
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index f3e97443cb..df75e10333 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -62,6 +62,7 @@ import org.apache.pinot.segment.local.segment.store.TextIndexUtils;
import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext;
import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProvider;
import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
+import org.apache.pinot.segment.local.upsert.ComparisonColumns;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.RecordInfo;
import org.apache.pinot.segment.local.utils.FixedIntArrayOffHeapIdMap;
@@ -163,7 +164,7 @@ public class MutableSegmentImpl implements MutableSegment {
private RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders _realtimeLuceneReaders;
private final UpsertConfig.Mode _upsertMode;
- private final String _upsertComparisonColumn;
+ private final List<String> _upsertComparisonColumns;
private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
// The valid doc ids are maintained locally instead of in the upsert metadata manager because:
@@ -408,12 +409,13 @@ public class MutableSegmentImpl implements MutableSegment {
"Metrics aggregation and upsert cannot be enabled together");
_partitionUpsertMetadataManager = config.getPartitionUpsertMetadataManager();
_validDocIds = new ThreadSafeMutableRoaringBitmap();
- String upsertComparisonColumn = config.getUpsertComparisonColumn();
- _upsertComparisonColumn = upsertComparisonColumn != null ? upsertComparisonColumn : _timeColumnName;
+ List<String> upsertComparisonColumns = config.getUpsertComparisonColumns();
+ _upsertComparisonColumns =
+ upsertComparisonColumns != null ? upsertComparisonColumns : Collections.singletonList(_timeColumnName);
} else {
_partitionUpsertMetadataManager = null;
_validDocIds = null;
- _upsertComparisonColumn = null;
+ _upsertComparisonColumns = null;
}
}
@@ -558,15 +560,45 @@ public class MutableSegmentImpl implements MutableSegment {
PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
if (isUpsertEnabled()) {
- Object upsertComparisonValue = row.getValue(_upsertComparisonColumn);
- Preconditions.checkState(upsertComparisonValue instanceof Comparable,
- "Upsert comparison column: %s must be comparable", _upsertComparisonColumn);
- return new RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue);
+ if (_upsertComparisonColumns.size() > 1) {
+ return multiComparisonRecordInfo(primaryKey, docId, row);
+ }
+ Comparable comparisonValue = (Comparable) row.getValue(_upsertComparisonColumns.get(0));
+ return new RecordInfo(primaryKey, docId, comparisonValue);
}
return new RecordInfo(primaryKey, docId, null);
}
+ private RecordInfo multiComparisonRecordInfo(PrimaryKey primaryKey, int docId, GenericRow row) {
+ int numComparisonColumns = _upsertComparisonColumns.size();
+ Comparable[] comparisonValues = new Comparable[numComparisonColumns];
+
+ int comparableIndex = -1;
+ for (int i = 0; i < numComparisonColumns; i++) {
+ String columnName = _upsertComparisonColumns.get(i);
+
+ if (!row.isNullValue(columnName)) {
+ // Inbound records may only have exactly 1 non-null value in one of the comparison column i.e. comparison
+ // columns are mutually exclusive. If comparableIndex has already been modified from its initialized value,
+ // that means there must have already been a non-null value processed and therefore processing an additional
+ // non-null value would be an error.
+ Preconditions.checkState(comparableIndex == -1,
+ "Documents must have exactly 1 non-null comparison column value");
+
+ comparableIndex = i;
+
+ Object comparisonValue = row.getValue(columnName);
+ Preconditions.checkState(comparisonValue instanceof Comparable,
+ "Upsert comparison column: %s must be comparable", columnName);
+ comparisonValues[i] = (Comparable) comparisonValue;
+ }
+ }
+ Preconditions.checkState(comparableIndex != -1,
+ "Documents must have exactly 1 non-null comparison column value");
+ return new RecordInfo(primaryKey, docId, new ComparisonColumns(comparisonValues, comparableIndex));
+ }
+
private void updateDictionary(GenericRow row) {
for (Map.Entry<String, IndexContainer> entry : _indexContainerMap.entrySet()) {
IndexContainer indexContainer = entry.getValue();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index 9af8367b49..7068d3e382 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -62,7 +62,7 @@ public class RealtimeSegmentConfig {
private final boolean _aggregateMetrics;
private final boolean _nullHandlingEnabled;
private final UpsertConfig.Mode _upsertMode;
- private final String _upsertComparisonColumn;
+ private final List<String> _upsertComparisonColumns;
private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
private final String _consumerDir;
@@ -77,7 +77,7 @@ public class RealtimeSegmentConfig {
Map<String, H3IndexConfig> h3IndexConfigs, SegmentZKMetadata segmentZKMetadata, boolean offHeap,
PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory statsHistory, String partitionColumn,
PartitionFunction partitionFunction, int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled,
- String consumerDir, UpsertConfig.Mode upsertMode, String upsertComparisonColumn,
+ String consumerDir, UpsertConfig.Mode upsertMode, List<String> upsertComparisonColumns,
PartitionUpsertMetadataManager partitionUpsertMetadataManager,
PartitionDedupMetadataManager partitionDedupMetadataManager, List<FieldConfig> fieldConfigList,
List<AggregationConfig> ingestionAggregationConfigs) {
@@ -106,7 +106,7 @@ public class RealtimeSegmentConfig {
_nullHandlingEnabled = nullHandlingEnabled;
_consumerDir = consumerDir;
_upsertMode = upsertMode != null ? upsertMode : UpsertConfig.Mode.NONE;
- _upsertComparisonColumn = upsertComparisonColumn;
+ _upsertComparisonColumns = upsertComparisonColumns;
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
_partitionDedupMetadataManager = partitionDedupMetadataManager;
_fieldConfigList = fieldConfigList;
@@ -222,8 +222,8 @@ public class RealtimeSegmentConfig {
return _partitionDedupMetadataManager != null;
}
- public String getUpsertComparisonColumn() {
- return _upsertComparisonColumn;
+ public List<String> getUpsertComparisonColumns() {
+ return _upsertComparisonColumns;
}
public PartitionUpsertMetadataManager getPartitionUpsertMetadataManager() {
@@ -268,7 +268,7 @@ public class RealtimeSegmentConfig {
private boolean _nullHandlingEnabled = false;
private String _consumerDir;
private UpsertConfig.Mode _upsertMode;
- private String _upsertComparisonColumn;
+ private List<String> _upsertComparisonColumns;
private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private PartitionDedupMetadataManager _partitionDedupMetadataManager;
private List<FieldConfig> _fieldConfigList;
@@ -410,8 +410,8 @@ public class RealtimeSegmentConfig {
return this;
}
- public Builder setUpsertComparisonColumn(String upsertComparisonColumn) {
- _upsertComparisonColumn = upsertComparisonColumn;
+ public Builder setUpsertComparisonColumns(List<String> upsertComparisonColumns) {
+ _upsertComparisonColumns = upsertComparisonColumns;
return this;
}
@@ -440,7 +440,7 @@ public class RealtimeSegmentConfig {
_capacity, _avgNumMultiValues, _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns,
_textIndexColumns, _fstIndexColumns, _jsonIndexConfigs, _h3IndexConfigs, _segmentZKMetadata, _offHeap,
_memoryManager, _statsHistory, _partitionColumn, _partitionFunction, _partitionId, _aggregateMetrics,
- _nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumn, _partitionUpsertMetadataManager,
+ _nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumns, _partitionUpsertMetadataManager,
_partitionDedupMetadataManager, _fieldConfigList, _ingestionAggregationConfigs);
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 361052fdc2..066ef9cac1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -54,7 +54,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected final String _tableNameWithType;
protected final int _partitionId;
protected final List<String> _primaryKeyColumns;
- protected final String _comparisonColumn;
+ protected final List<String> _comparisonColumns;
protected final HashFunction _hashFunction;
protected final PartialUpsertHandler _partialUpsertHandler;
protected final boolean _enableSnapshot;
@@ -72,12 +72,12 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected int _numOutOfOrderEvents = 0;
protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
- List<String> primaryKeyColumns, String comparisonColumn, HashFunction hashFunction,
+ List<String> primaryKeyColumns, List<String> comparisonColumns, HashFunction hashFunction,
@Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, ServerMetrics serverMetrics) {
_tableNameWithType = tableNameWithType;
_partitionId = partitionId;
_primaryKeyColumns = primaryKeyColumns;
- _comparisonColumn = comparisonColumn;
+ _comparisonColumns = comparisonColumns;
_hashFunction = hashFunction;
_partialUpsertHandler = partialUpsertHandler;
_enableSnapshot = enableSnapshot;
@@ -131,8 +131,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
immutableSegmentImpl.deleteValidDocIdsSnapshot();
}
- try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
- _comparisonColumn)) {
+ try (UpsertUtils.RecordInfoReader recordInfoReader = UpsertUtils.makeRecordReader(segment, _primaryKeyColumns,
+ _comparisonColumns)) {
Iterator<RecordInfo> recordInfoIterator;
if (validDocIds != null) {
recordInfoIterator = UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds);
@@ -225,8 +225,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
return;
}
- try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
- _comparisonColumn)) {
+ try (UpsertUtils.RecordInfoReader recordInfoReader = UpsertUtils.makeRecordReader(segment, _primaryKeyColumns,
+ _comparisonColumns)) {
Iterator<RecordInfo> recordInfoIterator =
UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs());
replaceSegment(segment, null, recordInfoIterator, oldSegment);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 7147341b69..2ceb5ddc72 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.upsert;
import com.google.common.base.Preconditions;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.concurrent.ThreadSafe;
@@ -35,7 +36,7 @@ import org.apache.pinot.spi.data.Schema;
public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetadataManager {
protected String _tableNameWithType;
protected List<String> _primaryKeyColumns;
- protected String _comparisonColumn;
+ protected List<String> _comparisonColumns;
protected HashFunction _hashFunction;
protected PartialUpsertHandler _partialUpsertHandler;
protected boolean _enableSnapshot;
@@ -54,9 +55,9 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
Preconditions.checkArgument(!CollectionUtils.isEmpty(_primaryKeyColumns),
"Primary key columns must be configured for upsert enabled table: %s", _tableNameWithType);
- _comparisonColumn = upsertConfig.getComparisonColumn();
- if (_comparisonColumn == null) {
- _comparisonColumn = tableConfig.getValidationConfig().getTimeColumnName();
+ _comparisonColumns = upsertConfig.getComparisonColumns();
+ if (_comparisonColumns == null) {
+ _comparisonColumns = Collections.singletonList(tableConfig.getValidationConfig().getTimeColumnName());
}
_hashFunction = upsertConfig.getHashFunction();
@@ -67,7 +68,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
"Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType);
_partialUpsertHandler =
new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(),
- _comparisonColumn);
+ _comparisonColumns);
}
_enableSnapshot = upsertConfig.isEnableSnapshot();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java
new file mode 100644
index 0000000000..de223f27d3
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ComparisonColumns implements Comparable<ComparisonColumns> {
+ private final Comparable[] _values;
+ private final int _comparableIndex;
+
+ public ComparisonColumns(Comparable[] values, int comparableIndex) {
+ _values = values;
+ _comparableIndex = comparableIndex;
+ }
+
+ public Comparable[] getValues() {
+ return _values;
+ }
+
+ @Override
+ public int compareTo(ComparisonColumns other) {
+ // _comparisonColumns should only at most one non-null comparison value. If not, it is the user's responsibility.
+ // There is no attempt to guarantee behavior in the case where there are multiple non-null values
+ int comparisonResult;
+
+ Comparable comparisonValue = _values[_comparableIndex];
+ Comparable otherComparisonValue = other.getValues()[_comparableIndex];
+
+ if (otherComparisonValue == null) {
+ // Keep this record because the existing record has no value for the same comparison column, therefore the
+ // (lack of) existing value could not possibly cause the new value to be rejected.
+ comparisonResult = 1;
+ } else {
+ comparisonResult = comparisonValue.compareTo(otherComparisonValue);
+ }
+
+ if (comparisonResult >= 0) {
+ // TODO(egalpin): This method currently may have side-effects on _values. Depending on the comparison result,
+ // entities from {@param other} may be merged into _values. This really should not be done implicitly as part
+ // of compareTo, but has been implemented this way to minimize the changes required within all subclasses of
+ // {@link BasePartitionUpsertMetadataManager}. Ideally, this merge should only be triggered explicitly by
+ // implementations of {@link BasePartitionUpsertMetadataManager}.
+ for (int i = 0; i < _values.length; i++) {
+ if (i != _comparableIndex) {
+ _values[i] = other._values[i];
+ }
+ }
+ }
+ return comparisonResult;
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index b1687afdf8..f7529a0011 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -57,9 +57,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
private final GenericRow _reuse = new GenericRow();
public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
- List<String> primaryKeyColumns, String comparisonColumn, HashFunction hashFunction,
+ List<String> primaryKeyColumns, List<String> comparisonColumns, HashFunction hashFunction,
@Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, ServerMetrics serverMetrics) {
- super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumn, hashFunction, partialUpsertHandler,
+ super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, hashFunction, partialUpsertHandler,
enableSnapshot, serverMetrics);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 67d6fe773b..cfc6e529a4 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -36,7 +36,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) {
return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
- _comparisonColumn, _hashFunction, _partialUpsertHandler, _enableSnapshot, _serverMetrics));
+ _comparisonColumns, _hashFunction, _partialUpsertHandler, _enableSnapshot, _serverMetrics));
}
@Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
index 3444a5ac54..f18a95ca00 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -35,13 +35,13 @@ public class PartialUpsertHandler {
// _column2Mergers maintains the mapping of merge strategies per columns.
private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>();
private final PartialUpsertMerger _defaultPartialUpsertMerger;
- private final String _comparisonColumn;
+ private final List<String> _comparisonColumns;
private final List<String> _primaryKeyColumns;
public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
- UpsertConfig.Strategy defaultPartialUpsertStrategy, String comparisonColumn) {
+ UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) {
_defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
- _comparisonColumn = comparisonColumn;
+ _comparisonColumns = comparisonColumns;
_primaryKeyColumns = schema.getPrimaryKeyColumns();
for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
@@ -66,7 +66,7 @@ public class PartialUpsertHandler {
*/
public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) {
for (String column : previousRecord.getFieldToValueMap().keySet()) {
- if (!_primaryKeyColumns.contains(column) && !_comparisonColumn.equals(column)) {
+ if (!_primaryKeyColumns.contains(column) && !_comparisonColumns.contains(column)) {
if (!previousRecord.isNullValue(column)) {
if (newRecord.isNullValue(column)) {
newRecord.putValue(column, previousRecord.getValue(column));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index 8ac5fee8d3..acda88a27b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -75,19 +75,31 @@ public class UpsertUtils {
};
}
+ public static RecordInfoReader makeRecordReader(IndexSegment segment, List<String> primaryKeyColumns,
+ List<String> comparisonColumns) {
+ if (comparisonColumns.size() > 1) {
+ return new RecordInfoReader(segment, primaryKeyColumns, comparisonColumns);
+ }
+ return new RecordInfoReader(segment, primaryKeyColumns, comparisonColumns.get(0));
+ }
+
public static class RecordInfoReader implements Closeable {
public final PrimaryKeyReader _primaryKeyReader;
- public final PinotSegmentColumnReader _comparisonColumnReader;
+ public final ComparisonColumnReader _comparisonColumnReader;
+
+ public RecordInfoReader(IndexSegment segment, List<String> primaryKeyColumns, List<String> comparisonColumns) {
+ _primaryKeyReader = new PrimaryKeyReader(segment, primaryKeyColumns);
+ _comparisonColumnReader = new MultiComparisonColumnReader(segment, comparisonColumns);
+ }
public RecordInfoReader(IndexSegment segment, List<String> primaryKeyColumns, String comparisonColumn) {
_primaryKeyReader = new PrimaryKeyReader(segment, primaryKeyColumns);
- _comparisonColumnReader = new PinotSegmentColumnReader(segment, comparisonColumn);
+ _comparisonColumnReader = new SingleComparisonColumnReader(segment, comparisonColumn);
}
public RecordInfo getRecordInfo(int docId) {
PrimaryKey primaryKey = _primaryKeyReader.getPrimaryKey(docId);
- Comparable comparisonValue = (Comparable) getValue(_comparisonColumnReader, docId);
- return new RecordInfo(primaryKey, docId, comparisonValue);
+ return new RecordInfo(primaryKey, docId, _comparisonColumnReader.getComparisonValue(docId));
}
@Override
@@ -134,8 +146,65 @@ public class UpsertUtils {
}
}
- private static Object getValue(PinotSegmentColumnReader columnReader, int docId) {
+ static Object getValue(PinotSegmentColumnReader columnReader, int docId) {
Object value = columnReader.getValue(docId);
return value instanceof byte[] ? new ByteArray((byte[]) value) : value;
}
+
+
+ public interface ComparisonColumnReader extends Closeable {
+ Comparable getComparisonValue(int docId);
+ }
+
+ public static class SingleComparisonColumnReader implements UpsertUtils.ComparisonColumnReader {
+ private final PinotSegmentColumnReader _comparisonColumnReader;
+
+ public SingleComparisonColumnReader(IndexSegment segment, String comparisonColumn) {
+ _comparisonColumnReader = new PinotSegmentColumnReader(segment, comparisonColumn);
+ }
+
+ @Override
+ public Comparable getComparisonValue(int docId) {
+ return (Comparable) _comparisonColumnReader.getValue(docId);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _comparisonColumnReader.close();
+ }
+ }
+
+ public static class MultiComparisonColumnReader implements UpsertUtils.ComparisonColumnReader {
+ private final PinotSegmentColumnReader[] _comparisonColumnReaders;
+
+ public MultiComparisonColumnReader(IndexSegment segment, List<String> comparisonColumns) {
+ _comparisonColumnReaders = new PinotSegmentColumnReader[comparisonColumns.size()];
+ for (int i = 0; i < comparisonColumns.size(); i++) {
+ _comparisonColumnReaders[i] = new PinotSegmentColumnReader(segment, comparisonColumns.get(i));
+ }
+ }
+
+ public Comparable getComparisonValue(int docId) {
+ Comparable[] comparisonColumns = new Comparable[_comparisonColumnReaders.length];
+
+ for (int i = 0; i < _comparisonColumnReaders.length; i++) {
+ PinotSegmentColumnReader columnReader = _comparisonColumnReaders[i];
+ Comparable comparisonValue = (Comparable) UpsertUtils.getValue(columnReader, docId);
+ comparisonColumns[i] = comparisonValue;
+ }
+
+ // Note that the comparable index is negative here to indicate that this instance could be the argument to
+ // ComparisonColumns#compareTo, but should never call compareTo itself.
+ return new ComparisonColumns(comparisonColumns, -1);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ for (PinotSegmentColumnReader comparisonColumnReader : _comparisonColumnReaders) {
+ comparisonColumnReader.close();
+ }
+ }
+ }
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 987d10de6c..046c0175e7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -573,9 +573,11 @@ public final class TableConfigUtils {
"The upsert table cannot have star-tree index.");
// comparison column exists
- if (tableConfig.getUpsertConfig().getComparisonColumn() != null) {
- String comparisonCol = tableConfig.getUpsertConfig().getComparisonColumn();
- Preconditions.checkState(schema.hasColumn(comparisonCol), "The comparison column does not exist on schema");
+ if (tableConfig.getUpsertConfig().getComparisonColumns() != null) {
+ List<String> comparisonCols = tableConfig.getUpsertConfig().getComparisonColumns();
+ for (String comparisonCol : comparisonCols) {
+ Preconditions.checkState(schema.hasColumn(comparisonCol), "The comparison column does not exist on schema");
+ }
}
}
validateAggregateMetricsForUpsertConfig(tableConfig);
@@ -644,8 +646,8 @@ public final class TableConfigUtils {
UpsertConfig.Strategy columnStrategy = entry.getValue();
Preconditions.checkState(!primaryKeyColumns.contains(column), "Merger cannot be applied to primary key columns");
- if (upsertConfig.getComparisonColumn() != null) {
- Preconditions.checkState(!upsertConfig.getComparisonColumn().equals(column),
+ if (upsertConfig.getComparisonColumns() != null) {
+ Preconditions.checkState(!upsertConfig.getComparisonColumns().contains(column),
"Merger cannot be applied to comparison column");
} else {
Preconditions.checkState(!tableConfig.getValidationConfig().getTimeColumnName().equals(column),
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java
index 6c31c3981f..4f3858d55c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java
@@ -118,7 +118,7 @@ public class ImmutableSegmentImplUpsertSnapshotTest {
ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class);
_partitionUpsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
- "daysSinceEpoch", HashFunction.NONE, null, true, serverMetrics);
+ Collections.singletonList("daysSinceEpoch"), HashFunction.NONE, null, true, serverMetrics);
_immutableSegmentImpl.enableUpsert(_partitionUpsertMetadataManager, new ThreadSafeMutableRoaringBitmap());
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
index 4c0a924c81..04a6e09783 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -96,7 +96,7 @@ public class MutableSegmentImplTestUtils {
when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);
UpsertConfig.Mode upsertMode = upsertConfig == null ? UpsertConfig.Mode.NONE : upsertConfig.getMode();
- String comparisonColumn = upsertConfig == null ? null : upsertConfig.getComparisonColumn();
+ List<String> comparisonColumns = upsertConfig == null ? null : upsertConfig.getComparisonColumns();
RealtimeSegmentConfig realtimeSegmentConfig =
new RealtimeSegmentConfig.Builder().setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
.setStreamName(STREAM_NAME).setSchema(schema).setTimeColumnName(timeColumnName).setCapacity(100000)
@@ -105,7 +105,7 @@ public class MutableSegmentImplTestUtils {
.setSegmentZKMetadata(new SegmentZKMetadata(SEGMENT_NAME))
.setMemoryManager(new DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
.setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).setUpsertMode(upsertMode)
- .setUpsertComparisonColumn(comparisonColumn)
+ .setUpsertComparisonColumns(comparisonColumns)
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
.setIngestionAggregationConfigs(aggregationConfigs).build();
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index a2439d9f8f..4f5ad8bc6c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -20,7 +20,9 @@ package org.apache.pinot.segment.local.indexsegment.mutable;
import java.io.File;
import java.net.URL;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
@@ -52,13 +54,26 @@ public class MutableSegmentImplUpsertTest {
private MutableSegmentImpl _mutableSegmentImpl;
private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
- private void setup(HashFunction hashFunction)
+ private UpsertConfig createPartialUpsertConfig(HashFunction hashFunction) {
+ UpsertConfig upsertConfigWithHash = new UpsertConfig(UpsertConfig.Mode.PARTIAL);
+ upsertConfigWithHash.setPartialUpsertStrategies(new HashMap<>());
+ upsertConfigWithHash.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE);
+ upsertConfigWithHash.setComparisonColumns(Arrays.asList("secondsSinceEpoch", "otherComparisonColumn"));
+ upsertConfigWithHash.setHashFunction(hashFunction);
+ return upsertConfigWithHash;
+ }
+
+ private UpsertConfig createFullUpsertConfig(HashFunction hashFunction) {
+ UpsertConfig upsertConfigWithHash = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfigWithHash.setHashFunction(hashFunction);
+ return upsertConfigWithHash;
+ }
+
+ private void setup(UpsertConfig upsertConfigWithHash)
throws Exception {
URL schemaResourceUrl = this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
_schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
- UpsertConfig upsertConfigWithHash = new UpsertConfig(UpsertConfig.Mode.FULL);
- upsertConfigWithHash.setHashFunction(hashFunction);
_tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfigWithHash)
.build();
@@ -87,18 +102,43 @@ public class MutableSegmentImplUpsertTest {
@Test
public void testHashFunctions()
throws Exception {
- testUpsertIngestion(HashFunction.NONE);
- testUpsertIngestion(HashFunction.MD5);
- testUpsertIngestion(HashFunction.MURMUR3);
+ testUpsertIngestion(createFullUpsertConfig(HashFunction.NONE));
+ testUpsertIngestion(createFullUpsertConfig(HashFunction.MD5));
+ testUpsertIngestion(createFullUpsertConfig(HashFunction.MURMUR3));
}
- private void testUpsertIngestion(HashFunction hashFunction)
+ @Test
+ public void testMultipleComparisonColumns()
throws Exception {
- setup(hashFunction);
+ testUpsertIngestion(createPartialUpsertConfig(HashFunction.NONE));
+ testUpsertIngestion(createPartialUpsertConfig(HashFunction.MD5));
+ testUpsertIngestion(createPartialUpsertConfig(HashFunction.MURMUR3));
+ }
+
+ private void testUpsertIngestion(UpsertConfig upsertConfig)
+ throws Exception {
+ setup(upsertConfig);
ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap();
- Assert.assertFalse(bitmap.contains(0));
- Assert.assertTrue(bitmap.contains(1));
- Assert.assertTrue(bitmap.contains(2));
- Assert.assertFalse(bitmap.contains(3));
+ if (upsertConfig.getComparisonColumns() == null) {
+ // aa
+ Assert.assertFalse(bitmap.contains(0));
+ Assert.assertTrue(bitmap.contains(1));
+ Assert.assertFalse(bitmap.contains(2));
+ Assert.assertFalse(bitmap.contains(3));
+ // bb
+ Assert.assertFalse(bitmap.contains(4));
+ Assert.assertTrue(bitmap.contains(5));
+ Assert.assertFalse(bitmap.contains(6));
+ } else {
+ // aa
+ Assert.assertFalse(bitmap.contains(0));
+ Assert.assertFalse(bitmap.contains(1));
+ Assert.assertTrue(bitmap.contains(2));
+ Assert.assertFalse(bitmap.contains(3));
+ // bb
+ Assert.assertFalse(bitmap.contains(4));
+ Assert.assertTrue(bitmap.contains(5));
+ Assert.assertFalse(bitmap.contains(6));
+ }
}
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 6b4bf34368..1e3f130059 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -73,9 +73,10 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot)
throws IOException {
+ String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- "timeCol", hashFunction, null, false, mock(ServerMetrics.class));
+ Collections.singletonList(comparisonColumn), hashFunction, null, false, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
@@ -217,7 +218,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private List<RecordInfo> getRecordInfoList(int numRecords, int[] primaryKeys, int[] timestamps) {
List<RecordInfo> recordInfoList = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
- recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new IntWrapper(timestamps[i])));
+ recordInfoList.add(
+ new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new IntWrapper(timestamps[i])));
}
return recordInfoList;
}
@@ -286,7 +288,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertNotNull(recordLocation);
assertSame(recordLocation.getSegment(), segment);
assertEquals(recordLocation.getDocId(), docId);
- assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value, comparisonValue);
+ assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value,
+ comparisonValue);
}
@Test
@@ -299,9 +302,10 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private void verifyAddRecord(HashFunction hashFunction)
throws IOException {
+ String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- "timeCol", hashFunction, null, false, mock(ServerMetrics.class));
+ Collections.singletonList(comparisonColumn), hashFunction, null, false, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
@@ -329,6 +333,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(120)));
+
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -339,6 +344,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(1), 2, new IntWrapper(100)));
+
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -349,6 +355,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(0), 3, new IntWrapper(100)));
+
// segment1: 1 -> {1, 120}
// segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
index 913215c85d..fd335ad41b 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.upsert;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -43,7 +44,8 @@ public class PartialUpsertHandlerTest {
Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>();
partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT);
PartialUpsertHandler handler =
- new PartialUpsertHandler(schema, partialUpsertStrategies, UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch");
+ new PartialUpsertHandler(schema, partialUpsertStrategies, UpsertConfig.Strategy.OVERWRITE,
+ Collections.singletonList("hoursSinceEpoch"));
// both records are null.
GenericRow previousRecord = new GenericRow();
@@ -98,7 +100,8 @@ public class PartialUpsertHandlerTest {
Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>();
partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT);
PartialUpsertHandler handler =
- new PartialUpsertHandler(schema, partialUpsertStrategies, UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch");
+ new PartialUpsertHandler(schema, partialUpsertStrategies, UpsertConfig.Strategy.OVERWRITE,
+ Collections.singletonList("hoursSinceEpoch"));
// previousRecord is null default value, while newRecord is not.
GenericRow previousRecord = new GenericRow();
diff --git a/pinot-segment-local/src/test/resources/data/test_upsert_data.json b/pinot-segment-local/src/test/resources/data/test_upsert_data.json
index 70fe90b4c1..5407326f9c 100644
--- a/pinot-segment-local/src/test/resources/data/test_upsert_data.json
+++ b/pinot-segment-local/src/test/resources/data/test_upsert_data.json
@@ -4,15 +4,30 @@
"description" : "first",
"secondsSinceEpoch": 1567205394
},
+ {
+ "event_id": "aa",
+ "description" : "update",
+ "secondsSinceEpoch": 1567205397
+ },
+ {
+ "event_id": "aa",
+ "description" : "first arrival, other column",
+ "otherComparisonColumn": 1567205395
+ },
+ {
+ "event_id": "aa",
+ "description" : "late arrival, other column",
+ "otherComparisonColumn": 1567205393
+ },
{
"event_id": "bb",
"description" : "first",
"secondsSinceEpoch": 1567205396
},
{
- "event_id": "aa",
- "description" : "update",
- "secondsSinceEpoch": 1567205397
+ "event_id": "bb",
+ "description" : "duplicate arrival",
+ "secondsSinceEpoch": 1567205396
},
{
"event_id": "bb",
diff --git a/pinot-segment-local/src/test/resources/data/test_upsert_schema.json b/pinot-segment-local/src/test/resources/data/test_upsert_schema.json
index 859c6f70c7..8e6b7d2849 100644
--- a/pinot-segment-local/src/test/resources/data/test_upsert_schema.json
+++ b/pinot-segment-local/src/test/resources/data/test_upsert_schema.json
@@ -8,6 +8,10 @@
{
"name": "description",
"dataType": "STRING"
+ },
+ {
+ "name": "otherComparisonColumn",
+ "dataType": "LONG"
}
],
"timeFieldSpec": {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index a71ddb47f6..bae7b7c798 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -359,8 +359,8 @@ public class TableConfig extends BaseJsonConfig {
@JsonIgnore
@Nullable
- public String getUpsertComparisonColumn() {
- return _upsertConfig == null ? null : _upsertConfig.getComparisonColumn();
+ public List<String> getUpsertComparisonColumns() {
+ return _upsertConfig == null ? null : _upsertConfig.getComparisonColumns();
}
@JsonProperty(TUNER_CONFIG_LIST_KEY)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 0c06d9e07d..68b532dc83 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -18,12 +18,12 @@
*/
package org.apache.pinot.spi.config.table;
-import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
-import com.google.common.base.Preconditions;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.spi.config.BaseJsonConfig;
@@ -51,8 +51,8 @@ public class UpsertConfig extends BaseJsonConfig {
@JsonPropertyDescription("default upsert strategy for partial mode")
private Strategy _defaultPartialUpsertStrategy;
- @JsonPropertyDescription("Column for upsert comparison, default to time column")
- private String _comparisonColumn;
+ @JsonPropertyDescription("Columns for upsert comparison, default to time column")
+ private List<String> _comparisonColumns;
@JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery")
private boolean _enableSnapshot;
@@ -63,28 +63,6 @@ public class UpsertConfig extends BaseJsonConfig {
@JsonPropertyDescription("Custom configs for upsert metadata manager")
private Map<String, String> _metadataManagerConfigs;
- @Deprecated
- public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode mode,
- @JsonProperty("partialUpsertStrategies") @Nullable Map<String, Strategy> partialUpsertStrategies,
- @JsonProperty("defaultPartialUpsertStrategy") @Nullable Strategy defaultPartialUpsertStrategy,
- @JsonProperty("comparisonColumn") @Nullable String comparisonColumn,
- @JsonProperty("hashFunction") @Nullable HashFunction hashFunction) {
- Preconditions.checkArgument(mode != null, "Upsert mode must be configured");
- _mode = mode;
-
- if (mode == Mode.PARTIAL) {
- _partialUpsertStrategies = partialUpsertStrategies != null ? partialUpsertStrategies : new HashMap<>();
- _defaultPartialUpsertStrategy =
- defaultPartialUpsertStrategy != null ? defaultPartialUpsertStrategy : Strategy.OVERWRITE;
- } else {
- _partialUpsertStrategies = null;
- _defaultPartialUpsertStrategy = null;
- }
-
- _comparisonColumn = comparisonColumn;
- _hashFunction = hashFunction == null ? HashFunction.NONE : hashFunction;
- }
-
public UpsertConfig(Mode mode) {
_mode = mode;
}
@@ -97,6 +75,10 @@ public class UpsertConfig extends BaseJsonConfig {
return _mode;
}
+ public void setMode(Mode mode) {
+ _mode = mode;
+ }
+
public HashFunction getHashFunction() {
return _hashFunction;
}
@@ -110,8 +92,8 @@ public class UpsertConfig extends BaseJsonConfig {
return _defaultPartialUpsertStrategy;
}
- public String getComparisonColumn() {
- return _comparisonColumn;
+ public List<String> getComparisonColumns() {
+ return _comparisonColumns;
}
public boolean isEnableSnapshot() {
@@ -154,10 +136,22 @@ public class UpsertConfig extends BaseJsonConfig {
* same primary key, the record with the larger value of the time column is picked as the
* latest update.
* However, there are cases when users need to use another column to determine the order.
- * In such case, you can use option comparisonColumn to override the column used for comparison.
+ * In such case, you can use option comparisonColumn to override the column used for comparison. When using
+ * multiple comparison columns, typically in the case of partial upserts, it is expected that input documents will
+ * each only have a singular non-null comparisonColumn. Multiple non-null values in an input document _will_ result
+ * in undefined behaviour. Typically, one comparisonColumn is allocated per distinct producer application of data
+ * in the case where there are multiple producers sinking to the same table.
*/
+ public void setComparisonColumns(List<String> comparisonColumns) {
+ if (CollectionUtils.isNotEmpty(comparisonColumns)) {
+ _comparisonColumns = comparisonColumns;
+ }
+ }
+
public void setComparisonColumn(String comparisonColumn) {
- _comparisonColumn = comparisonColumn;
+ if (comparisonColumn != null) {
+ _comparisonColumns = Collections.singletonList(comparisonColumn);
+ }
}
public void setEnableSnapshot(boolean enableSnapshot) {
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
index df5875fe33..1311de9d41 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.spi.config.table;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.testng.annotations.Test;
@@ -33,7 +34,7 @@ public class UpsertConfigTest {
assertEquals(upsertConfig1.getMode(), UpsertConfig.Mode.FULL);
upsertConfig1.setComparisonColumn("comparison");
- assertEquals(upsertConfig1.getComparisonColumn(), "comparison");
+ assertEquals(upsertConfig1.getComparisonColumns(), Collections.singletonList("comparison"));
upsertConfig1.setHashFunction(HashFunction.MURMUR3);
assertEquals(upsertConfig1.getHashFunction(), HashFunction.MURMUR3);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org