You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/07/01 18:17:21 UTC

[incubator-pinot] branch master updated: Implement Append merger for partial upsert (#7087)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fb9209  Implement Append merger for partial upsert (#7087)
0fb9209 is described below

commit 0fb92090efd9ae7fed071c35b6e33069287c35ef
Author: deemoliu <qi...@uber.com>
AuthorDate: Thu Jul 1 11:17:06 2021 -0700

    Implement Append merger for partial upsert (#7087)
    
    Add Append and Union merger for partial upsert
---
 .../{IncrementMerger.java => AppendMerger.java}    | 29 +++++-----
 .../local/upsert/merger/IncrementMerger.java       |  4 ++
 .../local/upsert/merger/OverwriteMerger.java       |  4 ++
 .../upsert/merger/PartialUpsertMergerFactory.java  | 12 +++-
 .../{IncrementMerger.java => UnionMerger.java}     | 33 ++++++-----
 .../segment/local/utils/TableConfigUtils.java      |  9 ++-
 .../upsert/merger/PartialUpsertMergerTest.java     | 59 +++++++++++++++++++
 .../segment/local/utils/TableConfigUtilsTest.java  |  9 +++
 .../pinot/spi/config/table/UpsertConfig.java       |  4 +-
 .../pinot/tools/PartialUpsertQuickStart.java       | 23 ++++++--
 ...t_partial_meetupRsvp_realtime_table_config.json |  4 +-
 .../upsert_partial_meetupRsvp_schema.json          | 67 ++++++++++++++++++++++
 12 files changed, 219 insertions(+), 38 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/AppendMerger.java
similarity index 57%
copy from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/AppendMerger.java
index 6d38ae2..3ed7c90 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/AppendMerger.java
@@ -18,27 +18,28 @@
  */
 package org.apache.pinot.segment.local.upsert.merger;
 
-public class IncrementMerger implements PartialUpsertMerger {
-  IncrementMerger() {
+/**
+ * Merges 2 records and returns the merged record.
+ * Append the new value from incoming row to the existing value from multi-value field. Then return the merged record.
+ * Append merger allows duplicated records in the multi-value field.
+ */
+public class AppendMerger implements PartialUpsertMerger {
+  AppendMerger() {
   }
 
   /**
-   * Increment the new value from incoming row to the given field of previous record.
+   * Append the new value from incoming row to the given multi-value field of previous record.
    */
   @Override
   public Object merge(Object previousValue, Object currentValue) {
-    return addNumbers((Number) previousValue, (Number) currentValue);
+    return append((Object[]) previousValue, (Object[]) currentValue);
   }
 
-  private static Number addNumbers(Number a, Number b) {
-    if (a instanceof Integer) {
-      return (Integer) a + (Integer) b;
-    } else if (a instanceof Long) {
-      return (Long) a + (Long) b;
-    } else if (a instanceof Float) {
-      return (Float) a + (Float) b;
-    } else {
-      return (Double) a + (Double) b;
-    }
+  private static Object append(Object[] a, Object[] b) {
+    Object[] merged = new Object[a.length + b.length];
+
+    System.arraycopy(a, 0, merged, 0, a.length);
+    System.arraycopy(b, 0, merged, a.length, b.length);
+    return merged;
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
index 6d38ae2..34b4d01 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pinot.segment.local.upsert.merger;
 
+/**
+ * Merges 2 records and returns the merged record.
+ * Add the new value from incoming row to the existing value from numeric field. Then return the merged record.
+ */
 public class IncrementMerger implements PartialUpsertMerger {
   IncrementMerger() {
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java
index a317d45..4879567 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pinot.segment.local.upsert.merger;
 
+/**
+ * Merges 2 records and returns the merged record.
+ * Overwrite the existing value for the given field. Then return the merged record.
+ */
 public class OverwriteMerger implements PartialUpsertMerger {
   OverwriteMerger() {
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
index 36112c4..47fae1c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
@@ -25,15 +25,21 @@ public class PartialUpsertMergerFactory {
   private PartialUpsertMergerFactory() {
   }
 
-  private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger();
+  private static final AppendMerger APPEND_MERGER = new AppendMerger();
   private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger();
+  private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger();
+  private static final UnionMerger UNION_MERGER = new UnionMerger();
 
   public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) {
     switch (strategy) {
-      case OVERWRITE:
-        return OVERWRITE_MERGER;
+      case APPEND:
+        return APPEND_MERGER;
       case INCREMENT:
         return INCREMENT_MERGER;
+      case OVERWRITE:
+        return OVERWRITE_MERGER;
+      case UNION:
+        return UNION_MERGER;
       default:
         throw new IllegalStateException("Unsupported partial upsert strategy: " + strategy);
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/UnionMerger.java
similarity index 55%
copy from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/UnionMerger.java
index 6d38ae2..0abcb23 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/UnionMerger.java
@@ -18,27 +18,34 @@
  */
 package org.apache.pinot.segment.local.upsert.merger;
 
-public class IncrementMerger implements PartialUpsertMerger {
-  IncrementMerger() {
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Merges 2 records and returns the merged record.
+ * Added the new value from incoming row to the existing value from multi-value field. Then return the merged record.
+ * Union merger will dedup duplicated records in the multi-value field.
+ */
+public class UnionMerger implements PartialUpsertMerger {
+  UnionMerger() {
   }
 
   /**
-   * Increment the new value from incoming row to the given field of previous record.
+   * Union the new value from incoming row to the given multi-value field of previous record.
    */
   @Override
   public Object merge(Object previousValue, Object currentValue) {
-    return addNumbers((Number) previousValue, (Number) currentValue);
+    return union((Object[]) previousValue, (Object[]) currentValue);
   }
 
-  private static Number addNumbers(Number a, Number b) {
-    if (a instanceof Integer) {
-      return (Integer) a + (Integer) b;
-    } else if (a instanceof Long) {
-      return (Long) a + (Long) b;
-    } else if (a instanceof Float) {
-      return (Float) a + (Float) b;
-    } else {
-      return (Double) a + (Double) b;
+  private static Object union(Object[] a, Object[] b) {
+    Set<Object> union = new TreeSet<>();
+    for (Object value: a) {
+      union.add(value);
+    }
+    for (Object value: b) {
+      union.add(value);
     }
+    return union.toArray();
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index a57ca84..63a59a0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -354,6 +354,8 @@ public final class TableConfigUtils {
    *  - Merger cannot be applied to private key columns
    *  - Merger cannot be applied to non-existing columns
    *  - INCREMENT merger must be applied to numeric columns
+   *  - APPEND/UNION merger cannot be applied to single-value columns
+   *  - INCREMENT merger cannot be applied to date time column
    */
   @VisibleForTesting
   static void validatePartialUpsertStrategies(TableConfig tableConfig, Schema schema) {
@@ -371,16 +373,21 @@ public final class TableConfigUtils {
     List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
     for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
       String column = entry.getKey();
+      UpsertConfig.Strategy columnStrategy = entry.getValue();
       Preconditions.checkState(!primaryKeyColumns.contains(column), "Merger cannot be applied to primary key columns");
 
       FieldSpec fieldSpec = schema.getFieldSpecFor(column);
       Preconditions.checkState(fieldSpec != null, "Merger cannot be applied to non-existing column: %s", column);
 
-      if (entry.getValue() == UpsertConfig.Strategy.INCREMENT) {
+      if (columnStrategy == UpsertConfig.Strategy.INCREMENT) {
         Preconditions.checkState(fieldSpec.getDataType().getStoredType().isNumeric(),
             "INCREMENT merger cannot be applied to non-numeric column: %s", column);
         Preconditions.checkState(!schema.getDateTimeNames().contains(column),
             "INCREMENT merger cannot be applied to date time column: %s", column);
+      } else if (columnStrategy == UpsertConfig.Strategy.APPEND || columnStrategy == UpsertConfig.Strategy.UNION) {
+        Preconditions
+            .checkState(!fieldSpec.isSingleValueField(), "%s merger cannot be applied to single-value column: %s",
+                columnStrategy.toString(), column);
       }
     }
   }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java
new file mode 100644
index 0000000..311c2d4
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert.merger;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class PartialUpsertMergerTest {
+
+  @Test
+  public void testAppendMergers() {
+    AppendMerger appendMerger = new AppendMerger();
+
+    Integer array1[] = {1, 2, 3};
+    Integer array2[] = {3, 4, 6};
+
+    assertEquals(new Integer[]{1, 2, 3, 3, 4, 6}, appendMerger.merge(array1, array2));
+  }
+
+  @Test
+  public void testIncrementMergers() {
+    IncrementMerger incrementMerger = new IncrementMerger();
+    assertEquals(3, incrementMerger.merge(1, 2));
+  }
+
+  @Test
+  public void testOverwriteMergers() {
+    OverwriteMerger overwriteMerger = new OverwriteMerger();
+    assertEquals("newValue", overwriteMerger.merge("oldValue", "newValue"));
+  }
+
+  @Test
+  public void testUnionMergers() {
+    UnionMerger unionMerger = new UnionMerger();
+
+    String array1[] = {"a", "b", "c"};
+    String array2[] = {"c", "d", "e"};
+
+    assertEquals(new String[]{"a", "b", "c", "d", "e"}, unionMerger.merge(array1, array2));
+  }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 6d76c88..d25bc62 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1157,6 +1157,15 @@ public class TableConfigUtilsTest {
     }
 
     partialUpsertStratgies.clear();
+    partialUpsertStratgies.put("myCol2", UpsertConfig.Strategy.APPEND);
+    try {
+      TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(), "APPEND merger cannot be applied to single-value column: myCol2");
+    }
+
+    partialUpsertStratgies.clear();
     partialUpsertStratgies.put("myTimeCol", UpsertConfig.Strategy.INCREMENT);
     try {
       TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index cf3e841..74cc876 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -33,8 +33,8 @@ public class UpsertConfig extends BaseJsonConfig {
   }
 
   public enum Strategy {
-    // Todo: add APPEND, CUSTOM strategies
-    OVERWRITE, INCREMENT
+    // Todo: add CUSTOM strategies
+    APPEND, INCREMENT, OVERWRITE, UNION
   }
 
   private final Mode _mode;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
index b77cd5d..c9bb919 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
@@ -57,7 +57,7 @@ public class PartialUpsertQuickStart {
     File tableConfigFile = new File(bootstrapTableDir, "meetupRsvp_realtime_table_config.json");
 
     ClassLoader classLoader = Quickstart.class.getClassLoader();
-    URL resource = classLoader.getResource("examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json");
+    URL resource = classLoader.getResource("examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json");
     Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, schemaFile);
     resource =
@@ -101,19 +101,34 @@ public class PartialUpsertQuickStart {
     Thread.sleep(15000);
 
     printStatus(Color.YELLOW, "***** Upsert quickstart setup complete *****");
-    printStatus(Color.YELLOW, "***** The expected behavior for total number of documents per PK should be 1 *****");
-    printStatus(Color.YELLOW,
-        "***** The expected behavior for total number of rsvp_counts per PK should >=1 since it's incremented and updated. *****");
 
     // The expected behavior for total number of documents per PK should be 1.
     // The expected behavior for total number of rsvp_counts per PK should >=1 since it's incremented and updated.
+    // The expected behavior for nums of values in group_name fields should equals to rsvp_counts.
     String q1 =
         "select event_id, count(*), sum(rsvp_count) from meetupRsvp group by event_id order by sum(rsvp_count) desc limit 10";
     printStatus(Color.YELLOW, "Total number of documents, total number of rsvp_counts per event_id in the table");
+    printStatus(Color.YELLOW, "***** The expected behavior for total number of documents per PK should be 1 *****");
+    printStatus(Color.YELLOW,
+        "***** The expected behavior for total number of rsvp_counts per PK should >=1 since it's incremented and updated. *****");
+    printStatus(Color.YELLOW,
+        "***** The expected behavior for nums of values in group_name fields should equals to rsvp_counts. *****");
     printStatus(Color.CYAN, "Query : " + q1);
     printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
     printStatus(Color.GREEN, "***************************************************");
 
+    // The expected behavior for nums of values in group_name fields should equals to rsvp_counts.
+    String q2 =
+        "select event_id, group_name, venue_name, rsvp_count from meetupRsvp where rsvp_count > 1 order by rsvp_count desc limit 10";
+    printStatus(Color.YELLOW, "Event_id, group_name, venue_name, rsvp_count per per event_id in the table");
+    printStatus(Color.YELLOW,
+        "***** Nums of values in group_name fields should less than or equals to rsvp_count. Duplicate records are not allowed. *****");
+    printStatus(Color.YELLOW,
+        "***** Nums of values in renue_name fields should equals to rsvp_count. Duplicates are allowed. *****");
+    printStatus(Color.CYAN, "Query : " + q2);
+    printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q2)));
+    printStatus(Color.GREEN, "***************************************************");
+
     printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console");
   }
 }
diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
index 62eb14a..18b01ca 100644
--- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
+++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
@@ -47,7 +47,9 @@
   "upsertConfig": {
     "mode": "PARTIAL",
     "partialUpsertStrategies":{
-      "rsvp_count": "INCREMENT"
+      "rsvp_count": "INCREMENT",
+      "group_name": "UNION",
+      "venue_name": "APPEND"
     }
   }
 }
diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json
new file mode 100644
index 0000000..d418960
--- /dev/null
+++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json
@@ -0,0 +1,67 @@
+{
+  "metricFieldSpecs": [
+    {
+      "dataType": "INT",
+      "name": "rsvp_count"
+    }
+  ],
+  "dimensionFieldSpecs": [
+    {
+      "dataType": "STRING",
+      "name": "venue_name",
+      "singleValueField": false
+    },
+    {
+      "dataType": "STRING",
+      "name": "event_name"
+    },
+    {
+      "dataType": "STRING",
+      "name": "event_id"
+    },
+    {
+      "dataType": "TIMESTAMP",
+      "name": "event_time"
+    },
+    {
+      "dataType": "STRING",
+      "name": "group_city"
+    },
+    {
+      "dataType": "STRING",
+      "name": "group_country"
+    },
+    {
+      "dataType": "LONG",
+      "name": "group_id"
+    },
+    {
+      "dataType": "STRING",
+      "name": "group_name",
+      "singleValueField": false
+    },
+    {
+      "dataType": "DOUBLE",
+      "name": "group_lat"
+    },
+    {
+      "dataType": "DOUBLE",
+      "name": "group_lon"
+    },
+    {
+      "dataType": "BYTES",
+      "name": "location",
+      "transformFunction": "toSphericalGeography(stPoint(group_lon,group_lat))"
+    }
+  ],
+  "dateTimeFieldSpecs": [
+    {
+      "name": "mtime",
+      "dataType": "TIMESTAMP",
+      "format": "1:MILLISECONDS:TIMESTAMP",
+      "granularity": "1:MILLISECONDS"
+    }
+  ],
+  "schemaName": "meetupRsvp",
+  "primaryKeyColumns": ["event_id"]
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org