You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/06/14 07:16:52 UTC

[GitHub] [incubator-pinot] deemoliu commented on a change in pull request #6899: Add partial upsert config and mergers

deemoliu commented on a change in pull request #6899:
URL: https://github.com/apache/incubator-pinot/pull/6899#discussion_r650307998



##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
##########
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert.merger;
+
+import org.apache.pinot.spi.config.table.UpsertConfig;
+
+
+public class PartialUpsertMergerFactory {
+  private PartialUpsertMergerFactory() {
+  }
+
+  private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger();
+  private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger();
+
+  public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) {
+    switch (strategy) {
+      case OVERWRITE:
+        return OVERWRITE_MERGER;
+      case INCREMENT:
+        return INCREMENT_MERGER;

Review comment:
       hi @yupeng9 i added one merger tests here for null values https://github.com/apache/incubator-pinot/pull/6899/commits/ec409cb739ffbe42b424003596cf9443a31d8ebf

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
##########
@@ -30,16 +32,37 @@
     FULL, PARTIAL, NONE
   }
 
+  public enum Strategy {
+    OVERWRITE, INCREMENT

Review comment:
       gotcha resolved in https://github.com/apache/incubator-pinot/pull/6899/commits/61356a46850d3c0eeb9313845ec77d10769486ba

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
##########
@@ -350,6 +357,36 @@ public static void validateUpsertConfig(TableConfig tableConfig, Schema schema)
             .getIndexingConfig().isEnableDefaultStarTree(), "The upsert table cannot have star-tree index.");
   }
 
+  /**
+   * Validates the partial upsert-related configurations
+   *  - INCREMENT merger cannot be applied to PK.
+   *  - INCREMENT merger should be numeric data types.
+   *  - enforce nullValueHandling for partial upsert tables.
+   */
+  private static void validatePartialUpsertStrategies(Schema schema, TableConfig tableConfig) {
+    if (tableConfig.getUpsertMode() != UpsertConfig.Mode.PARTIAL) {
+      return;
+    }
+
+    Preconditions.checkState(tableConfig.getIndexingConfig().isNullHandlingEnabled(),
+        "NullValueHandling is required to be enabled for partial upsert tables.");
+
+    Map<String, UpsertConfig.Strategy> partialUpsertStrategies =
+        tableConfig.getUpsertConfig().getPartialUpsertStrategies();
+
+    for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
+      Set<FieldSpec.DataType> numericsDataType = new HashSet<>(Arrays.asList(INT, LONG, FLOAT, DOUBLE));

Review comment:
       thanks @yupeng9 ,  I added one more validation rules to avoid date time field with numeric datatype get "incremented". ​
   https://github.com/apache/incubator-pinot/pull/6899/commits/372e8e639b18418917f0e965cc9b6e2f189c7c02
   

##########
File path: pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
##########
@@ -984,17 +989,19 @@ public void testValidateIndexingConfig() {
       // Expected
     }
 
-    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setVarLengthDictionaryColumns(Arrays.asList("intCol")).
-        build();
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setVarLengthDictionaryColumns(Arrays.asList("intCol")).

Review comment:
       Thanks @yupeng9 @Jackie-Jiang, my IDE formatter might not be defined correctly. Is there any guide on this?

##########
File path: pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
##########
@@ -1098,4 +1105,46 @@ public void testValidateUpsertConfig() {
       Assert.assertEquals(e.getMessage(), "The upsert table cannot have star-tree index.");
     }
   }
+
+  @Test
+  public void testValidatePartialUpsertConfig() {
+    Schema schema =
+        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol1", FieldSpec.DataType.LONG)
+            .addSingleValueDimension("myCol2", FieldSpec.DataType.STRING)
+            .setPrimaryKeyColumns(Lists.newArrayList("myCol1")).build();
+
+    Map<String, String> streamConfigs = new HashMap<>();
+    streamConfigs.put("stream.kafka.consumer.type", "highLevel");
+    streamConfigs.put("streamType", "kafka");
+    streamConfigs.put("stream.kafka.topic.name", "test");
+    streamConfigs
+        .put("stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+    streamConfigs.put("stream.kafka.consumer.type", "simple");
+
+    TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.PARTIAL)).setNullHandlingEnabled(false)
+        .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setStreamConfigs(streamConfigs).build();
+    try {
+      TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "NullValueHandling is required to be enabled for partial upsert tables.");
+    }
+
+    Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new HashMap<>();
+    partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
+    try {
+      TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "INCREMENT merger cannot be applied to PK.");
+    }
+
+    partialUpsertStratgies = new HashMap<>();
+    partialUpsertStratgies.put("myCol2", UpsertConfig.Strategy.INCREMENT);
+    try {
+      TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "INCREMENT merger should be numeric data types.");

Review comment:
       Thanks @yupeng9 for pointing this out. I don't see a use case that requires increment on timestamp for now. Please let me know there is any use case require this.
   Since time type can be Numeric Datatype, I added one more validation rules to avoid Numeric timestamp value get incremented.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org