You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by yu...@apache.org on 2022/01/20 00:45:37 UTC
[pinot] branch master updated: Add global strategy for partial upsert (#7906)
This is an automated email from the ASF dual-hosted git repository.
yupeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d317c59 Add global strategy for partial upsert (#7906)
d317c59 is described below
commit d317c5909454a3732716f24008d2bcf96305ef90
Author: deemoliu <qi...@uber.com>
AuthorDate: Wed Jan 19 16:44:09 2022 -0800
Add global strategy for partial upsert (#7906)
* Add global strategy for partial upsert
* fix UT setup
* try fix lint
* fix tests
* handle empty globalUpsertStrategy
* update defaultValue for full upsert to be null
* update _globalUpsertStrategy to _defaultPartialUpsertStrategy
* try fix lint
* fix checkstyle
* add taskConfig test setup code
* include all physical columns (including date time columns) except for primary key columns and comparison column
* fix partial upsert handler merge tests
* Annotate comparison column as nullable, use main time column
* simplified partialUpsertHandler (comparison column is non-null)
* fix checkstyle
---
.../common/utils/config/TableConfigSerDeTest.java | 9 +-
.../controller/helix/PinotResourceManagerTest.java | 2 +-
.../manager/realtime/RealtimeTableDataManager.java | 9 +-
.../tests/BaseClusterIntegrationTest.java | 2 +-
.../segment/local/upsert/PartialUpsertHandler.java | 14 ++-
.../MutableSegmentImplUpsertComparisonColTest.java | 4 +-
.../mutable/MutableSegmentImplUpsertTest.java | 4 +-
.../local/upsert/PartialUpsertHandlerTest.java | 71 ++++++++++++-
.../segment/local/utils/TableConfigUtilsTest.java | 110 ++++++++++-----------
.../pinot/spi/config/table/UpsertConfig.java | 11 +++
.../pinot/spi/config/table/UpsertConfigTest.java | 11 ++-
11 files changed, 170 insertions(+), 77 deletions(-)
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index 17ce0dd..164b978 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -228,10 +228,8 @@ public class TableConfigSerDeTest {
Map<String, String> properties = new HashMap<>();
properties.put("foo", "bar");
properties.put("foobar", "potato");
- List<FieldConfig> fieldConfigList = Arrays.asList(
- new FieldConfig("column1", FieldConfig.EncodingType.DICTIONARY, Lists.newArrayList(
- FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE), null,
- properties),
+ List<FieldConfig> fieldConfigList = Arrays.asList(new FieldConfig("column1", FieldConfig.EncodingType.DICTIONARY,
+ Lists.newArrayList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE), null, properties),
new FieldConfig("column2", null, Collections.emptyList(), null, null),
new FieldConfig("column3", FieldConfig.EncodingType.RAW, Collections.emptyList(),
FieldConfig.CompressionCodec.SNAPPY, null));
@@ -251,7 +249,8 @@ public class TableConfigSerDeTest {
{
// with upsert config
UpsertConfig upsertConfig =
- new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison", UpsertConfig.HashFunction.NONE);
+ new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "comparison",
+ UpsertConfig.HashFunction.NONE);
TableConfig tableConfig = tableConfigBuilder.setUpsertConfig(upsertConfig).build();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index a0938d3..56cc48b 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -68,7 +68,7 @@ public class PinotResourceManagerTest {
realtimeTableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_STRING);
realtimeTableConfig.getValidationConfig()
.setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
- realtimeTableConfig.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null));
+ realtimeTableConfig.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null));
ControllerTestUtils.getHelixResourceManager().addTable(realtimeTableConfig);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index b74a9d7..7ca8275 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -164,8 +164,13 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
PartialUpsertHandler partialUpsertHandler = null;
if (isPartialUpsertEnabled()) {
- partialUpsertHandler =
- new PartialUpsertHandler(_helixManager, _tableNameWithType, upsertConfig.getPartialUpsertStrategies());
+ String comparisonColumn = upsertConfig.getComparisonColumn();
+ if (comparisonColumn == null) {
+ comparisonColumn = tableConfig.getValidationConfig().getTimeColumnName();
+ }
+ partialUpsertHandler = new PartialUpsertHandler(_helixManager, _tableNameWithType, schema,
+ upsertConfig.getPartialUpsertStrategies(), upsertConfig.getDefaultPartialUpsertStrategy(),
+ comparisonColumn);
}
UpsertConfig.HashFunction hashFunction = upsertConfig.getHashFunction();
_tableUpsertMetadataManager =
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index b98c7c4..a453fdc 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -386,7 +386,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
.setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap))
.setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).build();
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)).build();
}
/**
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
index 7bc6e6a..9a23b73 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -29,6 +29,7 @@ import org.apache.helix.model.LiveInstance;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.slf4j.Logger;
@@ -48,13 +49,22 @@ public class PartialUpsertHandler {
private final String _tableNameWithType;
private boolean _allSegmentsLoaded;
- public PartialUpsertHandler(HelixManager helixManager, String tableNameWithType,
- Map<String, UpsertConfig.Strategy> partialUpsertStrategies) {
+ public PartialUpsertHandler(HelixManager helixManager, String tableNameWithType, Schema schema,
+ Map<String, UpsertConfig.Strategy> partialUpsertStrategies, UpsertConfig.Strategy defaultPartialUpsertStrategy,
+ String comparisonColumn) {
_helixManager = helixManager;
_tableNameWithType = tableNameWithType;
for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
_column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
}
+ // For all physical columns (including date time columns) except for primary key columns and comparison column.
+ // If no comparison column is configured, use main time column as the comparison time.
+ for (String columnName : schema.getPhysicalColumnNames()) {
+ if (!schema.getPrimaryKeyColumns().contains(columnName) && !_column2Mergers.containsKey(columnName)
+ && !comparisonColumn.equals(columnName)) {
+ _column2Mergers.put(columnName, PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy));
+ }
+ }
}
/**
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index a65e7a9..57f1c83 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -57,7 +57,7 @@ public class MutableSegmentImplUpsertComparisonColTest {
URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
_schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
_tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, "offset", null)).build();
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "offset", null)).build();
_recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
_partitionUpsertMetadataManager =
@@ -65,7 +65,7 @@ public class MutableSegmentImplUpsertComparisonColTest {
UpsertConfig.HashFunction.NONE).getOrCreatePartitionManager(0);
_mutableSegmentImpl = MutableSegmentImplTestUtils
.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
- false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, "offset", null), "secondsSinceEpoch",
+ false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "offset", null), "secondsSinceEpoch",
_partitionUpsertMetadataManager);
GenericRow reuse = new GenericRow();
try (RecordReader recordReader = RecordReaderFactory
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index fc8b230..0bcbb01 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -55,7 +55,7 @@ public class MutableSegmentImplUpsertTest {
URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
_schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
_tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, hashFunction)).build();
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, hashFunction)).build();
_recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
_partitionUpsertMetadataManager =
@@ -63,7 +63,7 @@ public class MutableSegmentImplUpsertTest {
.getOrCreatePartitionManager(0);
_mutableSegmentImpl = MutableSegmentImplTestUtils
.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
- false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, null, hashFunction), "secondsSinceEpoch",
+ false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, hashFunction), "secondsSinceEpoch",
_partitionUpsertMetadataManager);
GenericRow reuse = new GenericRow();
try (RecordReader recordReader = RecordReaderFactory
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
index 5335d51..471a46c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
@@ -18,10 +18,13 @@
*/
package org.apache.pinot.segment.local.upsert;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.helix.HelixManager;
import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.mockito.Mockito;
import org.testng.annotations.Test;
@@ -36,10 +39,18 @@ public class PartialUpsertHandlerTest {
@Test
public void testMerge() {
HelixManager helixManager = Mockito.mock(HelixManager.class);
+
+ Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("field1", FieldSpec.DataType.LONG)
+ .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS")
+ .setPrimaryKeyColumns(Arrays.asList("pk")).build();
+
String realtimeTableName = "testTable_REALTIME";
Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>();
partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT);
- PartialUpsertHandler handler = new PartialUpsertHandler(helixManager, realtimeTableName, partialUpsertStrategies);
+ PartialUpsertHandler handler =
+ new PartialUpsertHandler(helixManager, realtimeTableName, schema, partialUpsertStrategies,
+ UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch");
// both records are null.
GenericRow previousRecord = new GenericRow();
@@ -61,13 +72,18 @@ public class PartialUpsertHandlerTest {
assertEquals(newRecord.getValue("field1"), 2);
// newRecord is default null value, while previousRecord is not.
+ // field1 should not be incremented since the newRecord is null.
+ // special case: field2 should be overrided by null value because we didn't enabled default partial upsert strategy.
previousRecord.clear();
incomingRecord.clear();
previousRecord.putValue("field1", 1);
+ previousRecord.putValue("field2", 2);
incomingRecord.putDefaultNullValue("field1", 2);
+ incomingRecord.putDefaultNullValue("field2", 0);
newRecord = handler.merge(previousRecord, incomingRecord);
assertFalse(newRecord.isNullValue("field1"));
assertEquals(newRecord.getValue("field1"), 1);
+ assertTrue(newRecord.isNullValue("field2"));
// neither of records is null.
previousRecord.clear();
@@ -78,4 +94,57 @@ public class PartialUpsertHandlerTest {
assertFalse(newRecord.isNullValue("field1"));
assertEquals(newRecord.getValue("field1"), 3);
}
+
+ @Test
+ public void testMergeWithDefaultPartialUpsertStrategy() {
+ HelixManager helixManager = Mockito.mock(HelixManager.class);
+
+ Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("field1", FieldSpec.DataType.LONG).addMetric("field2", FieldSpec.DataType.LONG)
+ .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS")
+ .setPrimaryKeyColumns(Arrays.asList("pk")).build();
+
+ String realtimeTableName = "testTable_REALTIME";
+ Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>();
+ partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT);
+ PartialUpsertHandler handler =
+ new PartialUpsertHandler(helixManager, realtimeTableName, schema, partialUpsertStrategies,
+ UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch");
+
+ // previousRecord is null default value, while newRecord is not.
+ GenericRow previousRecord = new GenericRow();
+ GenericRow incomingRecord = new GenericRow();
+ previousRecord.putDefaultNullValue("field1", 1);
+ previousRecord.putDefaultNullValue("field2", 2);
+ incomingRecord.putValue("field1", 2);
+ incomingRecord.putValue("field2", 1);
+ GenericRow newRecord = handler.merge(previousRecord, incomingRecord);
+ assertFalse(newRecord.isNullValue("field1"));
+ assertEquals(newRecord.getValue("field1"), 2);
+ assertEquals(newRecord.getValue("field2"), 1);
+
+ // newRecord is default null value, while previousRecord is not.
+ // field1 should not be incremented since the newRecord is null.
+ // field2 should not be overrided by null value since we have default partial upsert strategy.
+ previousRecord.clear();
+ incomingRecord.clear();
+ previousRecord.putValue("field1", 8);
+ previousRecord.putValue("field2", 8);
+ incomingRecord.putDefaultNullValue("field1", 1);
+ incomingRecord.putDefaultNullValue("field2", 0);
+ newRecord = handler.merge(previousRecord, incomingRecord);
+ assertEquals(newRecord.getValue("field1"), 8);
+ assertEquals(newRecord.getValue("field2"), 8);
+
+ // neither of records is null.
+ previousRecord.clear();
+ incomingRecord.clear();
+ previousRecord.putValue("field1", 1);
+ previousRecord.putValue("field2", 100);
+ incomingRecord.putValue("field1", 2);
+ incomingRecord.putValue("field2", 1000);
+ newRecord = handler.merge(previousRecord, incomingRecord);
+ assertEquals(newRecord.getValue("field1"), 3);
+ assertEquals(newRecord.getValue("field2"), 1000);
+ }
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index eacb587..928a01a 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -357,8 +357,7 @@ public class TableConfigUtilsTest {
// input field name used as destination field
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", "reverse(myCol)")),
- null))
+ new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", "reverse(myCol)")), null))
.build();
try {
TableConfigUtils.validate(tableConfig, schema);
@@ -505,14 +504,14 @@ public class TableConfigUtilsTest {
TableConfigUtils.validate(tableConfig, schema);
// 1 tier configs
- tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
- Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+ .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null))).build();
TableConfigUtils.validate(tableConfig, schema);
// 2 tier configs, case insensitive check
- tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
- Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE.toLowerCase(), "30d",
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+ .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE.toLowerCase(), "30d",
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null),
new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d",
TierFactory.PINOT_SERVER_STORAGE_TYPE.toLowerCase(), "tier2_tag_OFFLINE", null, null))).build();
@@ -527,8 +526,8 @@ public class TableConfigUtilsTest {
TableConfigUtils.validate(tableConfig, schema);
// tier name empty
- tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
- Lists.newArrayList(
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+ .newArrayList(
new TierConfig("", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", TierFactory.PINOT_SERVER_STORAGE_TYPE,
"tier1_tag_OFFLINE", null, null))).build();
try {
@@ -539,8 +538,8 @@ public class TableConfigUtilsTest {
}
// tier name repeats
- tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
- Lists.newArrayList(new TierConfig("sameTierName", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+ .newArrayList(new TierConfig("sameTierName", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null),
new TierConfig("sameTierName", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "100d",
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build();
@@ -552,8 +551,8 @@ public class TableConfigUtilsTest {
}
// segmentSelectorType invalid
- tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
- Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+ .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null),
new TierConfig("tier2", "unsupportedSegmentSelector", "40d", TierFactory.PINOT_SERVER_STORAGE_TYPE,
"tier2_tag_OFFLINE", null, null))).build();
@@ -565,8 +564,8 @@ public class TableConfigUtilsTest {
}
// segmentAge not provided for TIME segmentSelectorType
- tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
- Lists.newArrayList(
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+ .newArrayList(
new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, null, TierFactory.PINOT_SERVER_STORAGE_TYPE,
"tier1_tag_OFFLINE", null, null), new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d",
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build();
@@ -578,8 +577,8 @@ public class TableConfigUtilsTest {
}
// segmentAge invalid
- tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
- Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+ .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null),
new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "3600",
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build();
@@ -592,11 +591,10 @@ public class TableConfigUtilsTest {
}
// storageType invalid
- tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
- Lists.newArrayList(
- new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", "unsupportedStorageType",
- "tier1_tag_OFFLINE", null, null), new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d",
- TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build();
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+ .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", "unsupportedStorageType",
+ "tier1_tag_OFFLINE", null, null), new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d",
+ TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build();
try {
TableConfigUtils.validate(tableConfig, schema);
@@ -606,8 +604,8 @@ public class TableConfigUtilsTest {
}
// serverTag not provided for PINOT_SERVER storageType
- tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
- Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+ .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null),
new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d",
TierFactory.PINOT_SERVER_STORAGE_TYPE, null, null, null))).build();
@@ -619,8 +617,8 @@ public class TableConfigUtilsTest {
}
// serverTag invalid
- tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
- Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+ .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag", null, null),
new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d",
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build();
@@ -1046,7 +1044,7 @@ public class TableConfigUtilsTest {
new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
.build();
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).build();
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)).build();
try {
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
Assert.fail();
@@ -1055,7 +1053,7 @@ public class TableConfigUtilsTest {
}
tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).build();
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)).build();
try {
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
Assert.fail();
@@ -1070,14 +1068,14 @@ public class TableConfigUtilsTest {
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
Assert.fail();
} catch (IllegalStateException e) {
- Assert.assertEquals(e.getMessage(),
- "Could not find streamConfigs for REALTIME table: " + TABLE_NAME + "_REALTIME");
+ Assert
+ .assertEquals(e.getMessage(), "Could not find streamConfigs for REALTIME table: " + TABLE_NAME + "_REALTIME");
}
Map<String, String> streamConfigs = getStreamConfigs();
tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).setStreamConfigs(streamConfigs)
- .build();
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null))
+ .setStreamConfigs(streamConfigs).build();
try {
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
Assert.fail();
@@ -1087,8 +1085,8 @@ public class TableConfigUtilsTest {
streamConfigs.put("stream.kafka.consumer.type", "simple");
tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).setStreamConfigs(streamConfigs)
- .build();
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null))
+ .setStreamConfigs(streamConfigs).build();
try {
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
Assert.fail();
@@ -1098,16 +1096,15 @@ public class TableConfigUtilsTest {
}
tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null))
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null))
.setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setStreamConfigs(streamConfigs).build();
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
- StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Lists.newArrayList("myCol"), null,
- Collections.singletonList(
- new AggregationFunctionColumnPair(AggregationFunctionType.COUNT, "myCol").toColumnName()), 10);
+ StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Lists.newArrayList("myCol"), null, Collections
+ .singletonList(new AggregationFunctionColumnPair(AggregationFunctionType.COUNT, "myCol").toColumnName()), 10);
tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null))
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null))
.setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig)).setStreamConfigs(streamConfigs).build();
try {
@@ -1131,9 +1128,9 @@ public class TableConfigUtilsTest {
Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new HashMap<>();
partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
- TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, null, null))
- .setNullHandlingEnabled(false)
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(
+ new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, UpsertConfig.Strategy.OVERWRITE, null,
+ null)).setNullHandlingEnabled(false)
.setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setStreamConfigs(streamConfigs).build();
try {
@@ -1191,18 +1188,17 @@ public class TableConfigUtilsTest {
@Test
public void testTaskConfig() {
Schema schema =
- new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
- .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
.addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
- .setPrimaryKeyColumns(Lists.newArrayList("myCol"))
- .build();
- Map<String, String> realtimeToOfflineTaskConfig =
- ImmutableMap.of("schedule", "0 */10 * ? * * *", "bucketTimePeriod", "6h", "bufferTimePeriod", "5d", "mergeType",
- "rollup", "myCol.aggregationType", "max");
+ .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
+ Map<String, String> realtimeToOfflineTaskConfig = ImmutableMap
+ .of("schedule", "0 */10 * ? * * *", "bucketTimePeriod", "6h", "bufferTimePeriod", "5d", "mergeType", "rollup",
+ "myCol.aggregationType", "max");
Map<String, String> segmentGenerationAndPushTaskConfig = ImmutableMap.of("schedule", "0 */10 * ? * * *");
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(
- new TableTaskConfig(ImmutableMap.of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig,
- "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build();
+ new TableTaskConfig(ImmutableMap
+ .of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig, "SegmentGenerationAndPushTask",
+ segmentGenerationAndPushTaskConfig))).build();
// validate valid config
TableConfigUtils.validateTaskConfigs(tableConfig, schema);
@@ -1221,11 +1217,11 @@ public class TableConfigUtilsTest {
}
// invalid Upsert config with RealtimeToOfflineTask
- tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setTimeColumnName(TIME_COLUMN)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).setTaskConfig(new TableTaskConfig(
- ImmutableMap.of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig,
- "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build();
+ tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)).setTaskConfig(
+ new TableTaskConfig(ImmutableMap
+ .of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig, "SegmentGenerationAndPushTask",
+ segmentGenerationAndPushTaskConfig))).build();
try {
TableConfigUtils.validateTaskConfigs(tableConfig, schema);
Assert.fail();
@@ -1293,8 +1289,8 @@ public class TableConfigUtilsTest {
streamConfigs.put("streamType", "kafka");
streamConfigs.put("stream.kafka.consumer.type", "highLevel");
streamConfigs.put("stream.kafka.topic.name", "test");
- streamConfigs.put("stream.kafka.decoder.class.name",
- "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+ streamConfigs
+ .put("stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
return streamConfigs;
}
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 9184bff..5afbe9c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -53,12 +53,16 @@ public class UpsertConfig extends BaseJsonConfig {
@JsonPropertyDescription("Partial update strategies.")
private final Map<String, Strategy> _partialUpsertStrategies;
+ @JsonPropertyDescription("default upsert strategy for partial mode")
+ private final Strategy _defaultPartialUpsertStrategy;
+
@JsonPropertyDescription("Column for upsert comparison, default to time column")
private final String _comparisonColumn;
@JsonCreator
public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode mode,
@JsonProperty("partialUpsertStrategies") @Nullable Map<String, Strategy> partialUpsertStrategies,
+ @JsonProperty("defaultPartialUpsertStrategy") @Nullable Strategy defaultPartialUpsertStrategy,
@JsonProperty("comparisonColumn") @Nullable String comparisonColumn,
@JsonProperty("hashFunction") @Nullable HashFunction hashFunction) {
Preconditions.checkArgument(mode != null, "Upsert mode must be configured");
@@ -66,8 +70,11 @@ public class UpsertConfig extends BaseJsonConfig {
if (mode == Mode.PARTIAL) {
_partialUpsertStrategies = partialUpsertStrategies != null ? partialUpsertStrategies : new HashMap<>();
+ _defaultPartialUpsertStrategy =
+ defaultPartialUpsertStrategy != null ? defaultPartialUpsertStrategy : Strategy.OVERWRITE;
} else {
_partialUpsertStrategies = null;
+ _defaultPartialUpsertStrategy = null;
}
_comparisonColumn = comparisonColumn;
@@ -87,6 +94,10 @@ public class UpsertConfig extends BaseJsonConfig {
return _partialUpsertStrategies;
}
+ public Strategy getDefaultPartialUpsertStrategy() {
+ return _defaultPartialUpsertStrategy;
+ }
+
public String getComparisonColumn() {
return _comparisonColumn;
}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
index 37b18a2..d7d53d2 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
@@ -29,18 +29,21 @@ public class UpsertConfigTest {
@Test
public void testUpsertConfig() {
- UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null);
+ UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null);
assertEquals(upsertConfig1.getMode(), UpsertConfig.Mode.FULL);
- upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison", null);
+ upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "comparison", null);
assertEquals(upsertConfig1.getComparisonColumn(), "comparison");
- upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison", UpsertConfig.HashFunction.MURMUR3);
+ upsertConfig1 =
+ new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "comparison", UpsertConfig.HashFunction.MURMUR3);
assertEquals(upsertConfig1.getHashFunction(), UpsertConfig.HashFunction.MURMUR3);
Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new HashMap<>();
partialUpsertStratgies.put("myCol", UpsertConfig.Strategy.INCREMENT);
- UpsertConfig upsertConfig2 = new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, null, null);
+ UpsertConfig upsertConfig2 =
+ new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, UpsertConfig.Strategy.OVERWRITE, null,
+ null);
assertEquals(upsertConfig2.getPartialUpsertStrategies(), partialUpsertStratgies);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org