You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by yu...@apache.org on 2022/01/20 00:45:37 UTC

[pinot] branch master updated: Add global strategy for partial upsert (#7906)

This is an automated email from the ASF dual-hosted git repository.

yupeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d317c59  Add global strategy for partial upsert (#7906)
d317c59 is described below

commit d317c5909454a3732716f24008d2bcf96305ef90
Author: deemoliu <qi...@uber.com>
AuthorDate: Wed Jan 19 16:44:09 2022 -0800

    Add global strategy for partial upsert (#7906)
    
    * Add global strategy for partial upsert
    
    * fix UT setup
    
    * try fix lint
    
    * fix tests
    
    * handle empty globalUpsertStrategy
    
    * update defaultValue for full upsert to be null
    
    * update _globalUpsertStrategy to _defaultPartialUpsertStrategy
    
    * try fix lint
    
    * fix checkstyle
    
    * add taskConfig test setup code
    
    * include all physical columns (including date time columns) except for primary key columns and comparison column
    
    * fix partial upsert handler merge tests
    
    * Annotate comparison column as nullable, use main time column
    
    * simplified partialUpsertHandler (comparison column is non-null)
    
    * fix checkstyle
---
 .../common/utils/config/TableConfigSerDeTest.java  |   9 +-
 .../controller/helix/PinotResourceManagerTest.java |   2 +-
 .../manager/realtime/RealtimeTableDataManager.java |   9 +-
 .../tests/BaseClusterIntegrationTest.java          |   2 +-
 .../segment/local/upsert/PartialUpsertHandler.java |  14 ++-
 .../MutableSegmentImplUpsertComparisonColTest.java |   4 +-
 .../mutable/MutableSegmentImplUpsertTest.java      |   4 +-
 .../local/upsert/PartialUpsertHandlerTest.java     |  71 ++++++++++++-
 .../segment/local/utils/TableConfigUtilsTest.java  | 110 ++++++++++-----------
 .../pinot/spi/config/table/UpsertConfig.java       |  11 +++
 .../pinot/spi/config/table/UpsertConfigTest.java   |  11 ++-
 11 files changed, 170 insertions(+), 77 deletions(-)

diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index 17ce0dd..164b978 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -228,10 +228,8 @@ public class TableConfigSerDeTest {
       Map<String, String> properties = new HashMap<>();
       properties.put("foo", "bar");
       properties.put("foobar", "potato");
-      List<FieldConfig> fieldConfigList = Arrays.asList(
-          new FieldConfig("column1", FieldConfig.EncodingType.DICTIONARY, Lists.newArrayList(
-              FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE), null,
-              properties),
+      List<FieldConfig> fieldConfigList = Arrays.asList(new FieldConfig("column1", FieldConfig.EncodingType.DICTIONARY,
+              Lists.newArrayList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE), null, properties),
           new FieldConfig("column2", null, Collections.emptyList(), null, null),
           new FieldConfig("column3", FieldConfig.EncodingType.RAW, Collections.emptyList(),
               FieldConfig.CompressionCodec.SNAPPY, null));
@@ -251,7 +249,8 @@ public class TableConfigSerDeTest {
     {
       // with upsert config
       UpsertConfig upsertConfig =
-          new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison", UpsertConfig.HashFunction.NONE);
+          new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "comparison",
+              UpsertConfig.HashFunction.NONE);
 
       TableConfig tableConfig = tableConfigBuilder.setUpsertConfig(upsertConfig).build();
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index a0938d3..56cc48b 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -68,7 +68,7 @@ public class PinotResourceManagerTest {
     realtimeTableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_STRING);
     realtimeTableConfig.getValidationConfig()
         .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
-    realtimeTableConfig.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null));
+    realtimeTableConfig.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null));
     ControllerTestUtils.getHelixResourceManager().addTable(realtimeTableConfig);
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index b74a9d7..7ca8275 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -164,8 +164,13 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
 
       PartialUpsertHandler partialUpsertHandler = null;
       if (isPartialUpsertEnabled()) {
-        partialUpsertHandler =
-            new PartialUpsertHandler(_helixManager, _tableNameWithType, upsertConfig.getPartialUpsertStrategies());
+        String comparisonColumn = upsertConfig.getComparisonColumn();
+        if (comparisonColumn == null) {
+          comparisonColumn = tableConfig.getValidationConfig().getTimeColumnName();
+        }
+        partialUpsertHandler = new PartialUpsertHandler(_helixManager, _tableNameWithType, schema,
+            upsertConfig.getPartialUpsertStrategies(), upsertConfig.getDefaultPartialUpsertStrategy(),
+            comparisonColumn);
       }
       UpsertConfig.HashFunction hashFunction = upsertConfig.getHashFunction();
       _tableUpsertMetadataManager =
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index b98c7c4..a453fdc 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -386,7 +386,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
         .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap))
         .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).build();
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)).build();
   }
 
   /**
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
index 7bc6e6a..9a23b73 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -29,6 +29,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
 import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.slf4j.Logger;
@@ -48,13 +49,22 @@ public class PartialUpsertHandler {
   private final String _tableNameWithType;
   private boolean _allSegmentsLoaded;
 
-  public PartialUpsertHandler(HelixManager helixManager, String tableNameWithType,
-      Map<String, UpsertConfig.Strategy> partialUpsertStrategies) {
+  public PartialUpsertHandler(HelixManager helixManager, String tableNameWithType, Schema schema,
+      Map<String, UpsertConfig.Strategy> partialUpsertStrategies, UpsertConfig.Strategy defaultPartialUpsertStrategy,
+      String comparisonColumn) {
     _helixManager = helixManager;
     _tableNameWithType = tableNameWithType;
     for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
       _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
     }
+    // For all physical columns (including date time columns) except for primary key columns and comparison column.
+    // If no comparison column is configured, use main time column as the comparison time.
+    for (String columnName : schema.getPhysicalColumnNames()) {
+      if (!schema.getPrimaryKeyColumns().contains(columnName) && !_column2Mergers.containsKey(columnName)
+          && !comparisonColumn.equals(columnName)) {
+        _column2Mergers.put(columnName, PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy));
+      }
+    }
   }
 
   /**
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index a65e7a9..57f1c83 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -57,7 +57,7 @@ public class MutableSegmentImplUpsertComparisonColTest {
     URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
     _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
     _tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, "offset", null)).build();
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "offset", null)).build();
     _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
     _partitionUpsertMetadataManager =
@@ -65,7 +65,7 @@ public class MutableSegmentImplUpsertComparisonColTest {
             UpsertConfig.HashFunction.NONE).getOrCreatePartitionManager(0);
     _mutableSegmentImpl = MutableSegmentImplTestUtils
         .createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
-            false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, "offset", null), "secondsSinceEpoch",
+            false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "offset", null), "secondsSinceEpoch",
             _partitionUpsertMetadataManager);
     GenericRow reuse = new GenericRow();
     try (RecordReader recordReader = RecordReaderFactory
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index fc8b230..0bcbb01 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -55,7 +55,7 @@ public class MutableSegmentImplUpsertTest {
     URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
     _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
     _tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, hashFunction)).build();
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, hashFunction)).build();
     _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
     _partitionUpsertMetadataManager =
@@ -63,7 +63,7 @@ public class MutableSegmentImplUpsertTest {
             .getOrCreatePartitionManager(0);
     _mutableSegmentImpl = MutableSegmentImplTestUtils
         .createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
-            false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, null, hashFunction), "secondsSinceEpoch",
+            false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, hashFunction), "secondsSinceEpoch",
             _partitionUpsertMetadataManager);
     GenericRow reuse = new GenericRow();
     try (RecordReader recordReader = RecordReaderFactory
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
index 5335d51..471a46c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
@@ -36,10 +39,18 @@ public class PartialUpsertHandlerTest {
   @Test
   public void testMerge() {
     HelixManager helixManager = Mockito.mock(HelixManager.class);
+
+    Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("field1", FieldSpec.DataType.LONG)
+        .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS")
+        .setPrimaryKeyColumns(Arrays.asList("pk")).build();
+
     String realtimeTableName = "testTable_REALTIME";
     Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>();
     partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT);
-    PartialUpsertHandler handler = new PartialUpsertHandler(helixManager, realtimeTableName, partialUpsertStrategies);
+    PartialUpsertHandler handler =
+        new PartialUpsertHandler(helixManager, realtimeTableName, schema, partialUpsertStrategies,
+            UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch");
 
     // both records are null.
     GenericRow previousRecord = new GenericRow();
@@ -61,13 +72,18 @@ public class PartialUpsertHandlerTest {
     assertEquals(newRecord.getValue("field1"), 2);
 
     // newRecord is default null value, while previousRecord is not.
+    // field1 should not be incremented since the newRecord is null.
+    // special case: field2 should be overrided by null value because we didn't enabled default partial upsert strategy.
     previousRecord.clear();
     incomingRecord.clear();
     previousRecord.putValue("field1", 1);
+    previousRecord.putValue("field2", 2);
     incomingRecord.putDefaultNullValue("field1", 2);
+    incomingRecord.putDefaultNullValue("field2", 0);
     newRecord = handler.merge(previousRecord, incomingRecord);
     assertFalse(newRecord.isNullValue("field1"));
     assertEquals(newRecord.getValue("field1"), 1);
+    assertTrue(newRecord.isNullValue("field2"));
 
     // neither of records is null.
     previousRecord.clear();
@@ -78,4 +94,57 @@ public class PartialUpsertHandlerTest {
     assertFalse(newRecord.isNullValue("field1"));
     assertEquals(newRecord.getValue("field1"), 3);
   }
+
+  @Test
+  public void testMergeWithDefaultPartialUpsertStrategy() {
+    HelixManager helixManager = Mockito.mock(HelixManager.class);
+
+    Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("field1", FieldSpec.DataType.LONG).addMetric("field2", FieldSpec.DataType.LONG)
+        .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS")
+        .setPrimaryKeyColumns(Arrays.asList("pk")).build();
+
+    String realtimeTableName = "testTable_REALTIME";
+    Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>();
+    partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT);
+    PartialUpsertHandler handler =
+        new PartialUpsertHandler(helixManager, realtimeTableName, schema, partialUpsertStrategies,
+            UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch");
+
+    // previousRecord is null default value, while newRecord is not.
+    GenericRow previousRecord = new GenericRow();
+    GenericRow incomingRecord = new GenericRow();
+    previousRecord.putDefaultNullValue("field1", 1);
+    previousRecord.putDefaultNullValue("field2", 2);
+    incomingRecord.putValue("field1", 2);
+    incomingRecord.putValue("field2", 1);
+    GenericRow newRecord = handler.merge(previousRecord, incomingRecord);
+    assertFalse(newRecord.isNullValue("field1"));
+    assertEquals(newRecord.getValue("field1"), 2);
+    assertEquals(newRecord.getValue("field2"), 1);
+
+    // newRecord is default null value, while previousRecord is not.
+    // field1 should not be incremented since the newRecord is null.
+    // field2 should not be overrided by null value since we have default partial upsert strategy.
+    previousRecord.clear();
+    incomingRecord.clear();
+    previousRecord.putValue("field1", 8);
+    previousRecord.putValue("field2", 8);
+    incomingRecord.putDefaultNullValue("field1", 1);
+    incomingRecord.putDefaultNullValue("field2", 0);
+    newRecord = handler.merge(previousRecord, incomingRecord);
+    assertEquals(newRecord.getValue("field1"), 8);
+    assertEquals(newRecord.getValue("field2"), 8);
+
+    // neither of records is null.
+    previousRecord.clear();
+    incomingRecord.clear();
+    previousRecord.putValue("field1", 1);
+    previousRecord.putValue("field2", 100);
+    incomingRecord.putValue("field1", 2);
+    incomingRecord.putValue("field2", 1000);
+    newRecord = handler.merge(previousRecord, incomingRecord);
+    assertEquals(newRecord.getValue("field1"), 3);
+    assertEquals(newRecord.getValue("field2"), 1000);
+  }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index eacb587..928a01a 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -357,8 +357,7 @@ public class TableConfigUtilsTest {
 
     // input field name used as destination field
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-            new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", "reverse(myCol)")),
-                null))
+        new IngestionConfig(null, null, null, Lists.newArrayList(new TransformConfig("myCol", "reverse(myCol)")), null))
         .build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
@@ -505,14 +504,14 @@ public class TableConfigUtilsTest {
     TableConfigUtils.validate(tableConfig, schema);
 
     // 1 tier configs
-    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
-        Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+        .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
             TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null))).build();
     TableConfigUtils.validate(tableConfig, schema);
 
     // 2 tier configs, case insensitive check
-    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
-        Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE.toLowerCase(), "30d",
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+        .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE.toLowerCase(), "30d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null),
             new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE.toLowerCase(), "tier2_tag_OFFLINE", null, null))).build();
@@ -527,8 +526,8 @@ public class TableConfigUtilsTest {
     TableConfigUtils.validate(tableConfig, schema);
 
     // tier name empty
-    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
-        Lists.newArrayList(
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+        .newArrayList(
             new TierConfig("", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", TierFactory.PINOT_SERVER_STORAGE_TYPE,
                 "tier1_tag_OFFLINE", null, null))).build();
     try {
@@ -539,8 +538,8 @@ public class TableConfigUtilsTest {
     }
 
     // tier name repeats
-    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
-        Lists.newArrayList(new TierConfig("sameTierName", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+        .newArrayList(new TierConfig("sameTierName", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null),
             new TierConfig("sameTierName", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "100d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build();
@@ -552,8 +551,8 @@ public class TableConfigUtilsTest {
     }
 
     // segmentSelectorType invalid
-    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
-        Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+        .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null),
             new TierConfig("tier2", "unsupportedSegmentSelector", "40d", TierFactory.PINOT_SERVER_STORAGE_TYPE,
                 "tier2_tag_OFFLINE", null, null))).build();
@@ -565,8 +564,8 @@ public class TableConfigUtilsTest {
     }
 
     // segmentAge not provided for TIME segmentSelectorType
-    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
-        Lists.newArrayList(
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+        .newArrayList(
             new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, null, TierFactory.PINOT_SERVER_STORAGE_TYPE,
                 "tier1_tag_OFFLINE", null, null), new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build();
@@ -578,8 +577,8 @@ public class TableConfigUtilsTest {
     }
 
     // segmentAge invalid
-    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
-        Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+        .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null),
             new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "3600",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build();
@@ -592,11 +591,10 @@ public class TableConfigUtilsTest {
     }
 
     // storageType invalid
-    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
-        Lists.newArrayList(
-            new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", "unsupportedStorageType",
-                "tier1_tag_OFFLINE", null, null), new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d",
-                TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build();
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+        .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", "unsupportedStorageType",
+            "tier1_tag_OFFLINE", null, null), new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d",
+            TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build();
 
     try {
       TableConfigUtils.validate(tableConfig, schema);
@@ -606,8 +604,8 @@ public class TableConfigUtilsTest {
     }
 
     // serverTag not provided for PINOT_SERVER storageType
-    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
-        Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+        .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null),
             new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, null, null, null))).build();
@@ -619,8 +617,8 @@ public class TableConfigUtilsTest {
     }
 
     // serverTag invalid
-    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(
-        Lists.newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+        .newArrayList(new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag", null, null),
             new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier2_tag_OFFLINE", null, null))).build();
@@ -1046,7 +1044,7 @@ public class TableConfigUtilsTest {
         new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
             .build();
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).build();
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
       Assert.fail();
@@ -1055,7 +1053,7 @@ public class TableConfigUtilsTest {
     }
 
     tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).build();
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
       Assert.fail();
@@ -1070,14 +1068,14 @@ public class TableConfigUtilsTest {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
       Assert.fail();
     } catch (IllegalStateException e) {
-      Assert.assertEquals(e.getMessage(),
-          "Could not find streamConfigs for REALTIME table: " + TABLE_NAME + "_REALTIME");
+      Assert
+          .assertEquals(e.getMessage(), "Could not find streamConfigs for REALTIME table: " + TABLE_NAME + "_REALTIME");
     }
 
     Map<String, String> streamConfigs = getStreamConfigs();
     tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).setStreamConfigs(streamConfigs)
-        .build();
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null))
+        .setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
       Assert.fail();
@@ -1087,8 +1085,8 @@ public class TableConfigUtilsTest {
 
     streamConfigs.put("stream.kafka.consumer.type", "simple");
     tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).setStreamConfigs(streamConfigs)
-        .build();
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null))
+        .setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
       Assert.fail();
@@ -1098,16 +1096,15 @@ public class TableConfigUtilsTest {
     }
 
     tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null))
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null))
         .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setStreamConfigs(streamConfigs).build();
     TableConfigUtils.validateUpsertConfig(tableConfig, schema);
 
-    StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Lists.newArrayList("myCol"), null,
-        Collections.singletonList(
-            new AggregationFunctionColumnPair(AggregationFunctionType.COUNT, "myCol").toColumnName()), 10);
+    StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Lists.newArrayList("myCol"), null, Collections
+        .singletonList(new AggregationFunctionColumnPair(AggregationFunctionType.COUNT, "myCol").toColumnName()), 10);
     tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null))
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null))
         .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig)).setStreamConfigs(streamConfigs).build();
     try {
@@ -1131,9 +1128,9 @@ public class TableConfigUtilsTest {
     Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new HashMap<>();
     partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
 
-    TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, null, null))
-        .setNullHandlingEnabled(false)
+    TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(
+        new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, UpsertConfig.Strategy.OVERWRITE, null,
+            null)).setNullHandlingEnabled(false)
         .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setStreamConfigs(streamConfigs).build();
     try {
@@ -1191,18 +1188,17 @@ public class TableConfigUtilsTest {
   @Test
   public void testTaskConfig() {
     Schema schema =
-        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
-            .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
             .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
-            .setPrimaryKeyColumns(Lists.newArrayList("myCol"))
-            .build();
-    Map<String, String> realtimeToOfflineTaskConfig =
-        ImmutableMap.of("schedule", "0 */10 * ? * * *", "bucketTimePeriod", "6h", "bufferTimePeriod", "5d", "mergeType",
-            "rollup", "myCol.aggregationType", "max");
+            .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
+    Map<String, String> realtimeToOfflineTaskConfig = ImmutableMap
+        .of("schedule", "0 */10 * ? * * *", "bucketTimePeriod", "6h", "bufferTimePeriod", "5d", "mergeType", "rollup",
+            "myCol.aggregationType", "max");
     Map<String, String> segmentGenerationAndPushTaskConfig = ImmutableMap.of("schedule", "0 */10 * ? * * *");
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(
-        new TableTaskConfig(ImmutableMap.of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig,
-            "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build();
+        new TableTaskConfig(ImmutableMap
+            .of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig, "SegmentGenerationAndPushTask",
+                segmentGenerationAndPushTaskConfig))).build();
 
     // validate valid config
     TableConfigUtils.validateTaskConfigs(tableConfig, schema);
@@ -1221,11 +1217,11 @@ public class TableConfigUtilsTest {
     }
 
     // invalid Upsert config with RealtimeToOfflineTask
-    tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setTimeColumnName(TIME_COLUMN)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).setTaskConfig(new TableTaskConfig(
-            ImmutableMap.of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig,
-                "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build();
+    tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null)).setTaskConfig(
+            new TableTaskConfig(ImmutableMap
+                .of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig, "SegmentGenerationAndPushTask",
+                    segmentGenerationAndPushTaskConfig))).build();
     try {
       TableConfigUtils.validateTaskConfigs(tableConfig, schema);
       Assert.fail();
@@ -1293,8 +1289,8 @@ public class TableConfigUtilsTest {
     streamConfigs.put("streamType", "kafka");
     streamConfigs.put("stream.kafka.consumer.type", "highLevel");
     streamConfigs.put("stream.kafka.topic.name", "test");
-    streamConfigs.put("stream.kafka.decoder.class.name",
-        "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+    streamConfigs
+        .put("stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
     return streamConfigs;
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 9184bff..5afbe9c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -53,12 +53,16 @@ public class UpsertConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Partial update strategies.")
   private final Map<String, Strategy> _partialUpsertStrategies;
 
+  @JsonPropertyDescription("default upsert strategy for partial mode")
+  private final Strategy _defaultPartialUpsertStrategy;
+
   @JsonPropertyDescription("Column for upsert comparison, default to time column")
   private final String _comparisonColumn;
 
   @JsonCreator
   public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode mode,
       @JsonProperty("partialUpsertStrategies") @Nullable Map<String, Strategy> partialUpsertStrategies,
+      @JsonProperty("defaultPartialUpsertStrategy") @Nullable Strategy defaultPartialUpsertStrategy,
       @JsonProperty("comparisonColumn") @Nullable String comparisonColumn,
       @JsonProperty("hashFunction") @Nullable HashFunction hashFunction) {
     Preconditions.checkArgument(mode != null, "Upsert mode must be configured");
@@ -66,8 +70,11 @@ public class UpsertConfig extends BaseJsonConfig {
 
     if (mode == Mode.PARTIAL) {
       _partialUpsertStrategies = partialUpsertStrategies != null ? partialUpsertStrategies : new HashMap<>();
+      _defaultPartialUpsertStrategy =
+          defaultPartialUpsertStrategy != null ? defaultPartialUpsertStrategy : Strategy.OVERWRITE;
     } else {
       _partialUpsertStrategies = null;
+      _defaultPartialUpsertStrategy = null;
     }
 
     _comparisonColumn = comparisonColumn;
@@ -87,6 +94,10 @@ public class UpsertConfig extends BaseJsonConfig {
     return _partialUpsertStrategies;
   }
 
+  public Strategy getDefaultPartialUpsertStrategy() {
+    return _defaultPartialUpsertStrategy;
+  }
+
   public String getComparisonColumn() {
     return _comparisonColumn;
   }
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
index 37b18a2..d7d53d2 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
@@ -29,18 +29,21 @@ public class UpsertConfigTest {
 
   @Test
   public void testUpsertConfig() {
-    UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null);
+    UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null);
     assertEquals(upsertConfig1.getMode(), UpsertConfig.Mode.FULL);
 
-    upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison", null);
+    upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "comparison", null);
     assertEquals(upsertConfig1.getComparisonColumn(), "comparison");
 
-    upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison", UpsertConfig.HashFunction.MURMUR3);
+    upsertConfig1 =
+        new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "comparison", UpsertConfig.HashFunction.MURMUR3);
     assertEquals(upsertConfig1.getHashFunction(), UpsertConfig.HashFunction.MURMUR3);
 
     Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new HashMap<>();
     partialUpsertStratgies.put("myCol", UpsertConfig.Strategy.INCREMENT);
-    UpsertConfig upsertConfig2 = new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, null, null);
+    UpsertConfig upsertConfig2 =
+        new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, UpsertConfig.Strategy.OVERWRITE, null,
+            null);
     assertEquals(upsertConfig2.getPartialUpsertStrategies(), partialUpsertStratgies);
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org