You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/07/01 18:17:21 UTC
[incubator-pinot] branch master updated: Implement Append merger
for partial upsert (#7087)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0fb9209 Implement Append merger for partial upsert (#7087)
0fb9209 is described below
commit 0fb92090efd9ae7fed071c35b6e33069287c35ef
Author: deemoliu <qi...@uber.com>
AuthorDate: Thu Jul 1 11:17:06 2021 -0700
Implement Append merger for partial upsert (#7087)
Add Append and Union merger for partial upsert
---
.../{IncrementMerger.java => AppendMerger.java} | 29 +++++-----
.../local/upsert/merger/IncrementMerger.java | 4 ++
.../local/upsert/merger/OverwriteMerger.java | 4 ++
.../upsert/merger/PartialUpsertMergerFactory.java | 12 +++-
.../{IncrementMerger.java => UnionMerger.java} | 33 ++++++-----
.../segment/local/utils/TableConfigUtils.java | 9 ++-
.../upsert/merger/PartialUpsertMergerTest.java | 59 +++++++++++++++++++
.../segment/local/utils/TableConfigUtilsTest.java | 9 +++
.../pinot/spi/config/table/UpsertConfig.java | 4 +-
.../pinot/tools/PartialUpsertQuickStart.java | 23 ++++++--
...t_partial_meetupRsvp_realtime_table_config.json | 4 +-
.../upsert_partial_meetupRsvp_schema.json | 67 ++++++++++++++++++++++
12 files changed, 219 insertions(+), 38 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/AppendMerger.java
similarity index 57%
copy from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/AppendMerger.java
index 6d38ae2..3ed7c90 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/AppendMerger.java
@@ -18,27 +18,28 @@
*/
package org.apache.pinot.segment.local.upsert.merger;
-public class IncrementMerger implements PartialUpsertMerger {
- IncrementMerger() {
+/**
+ * Merges 2 records and returns the merged record.
+ * Append the new value from incoming row to the existing value from multi-value field. Then return the merged record.
+ * Append merger allows duplicated records in the multi-value field.
+ */
+public class AppendMerger implements PartialUpsertMerger {
+ AppendMerger() {
}
/**
- * Increment the new value from incoming row to the given field of previous record.
+ * Append the new value from incoming row to the given multi-value field of previous record.
*/
@Override
public Object merge(Object previousValue, Object currentValue) {
- return addNumbers((Number) previousValue, (Number) currentValue);
+ return append((Object[]) previousValue, (Object[]) currentValue);
}
- private static Number addNumbers(Number a, Number b) {
- if (a instanceof Integer) {
- return (Integer) a + (Integer) b;
- } else if (a instanceof Long) {
- return (Long) a + (Long) b;
- } else if (a instanceof Float) {
- return (Float) a + (Float) b;
- } else {
- return (Double) a + (Double) b;
- }
+ private static Object append(Object[] a, Object[] b) {
+ Object[] merged = new Object[a.length + b.length];
+
+ System.arraycopy(a, 0, merged, 0, a.length);
+ System.arraycopy(b, 0, merged, a.length, b.length);
+ return merged;
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
index 6d38ae2..34b4d01 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
@@ -18,6 +18,10 @@
*/
package org.apache.pinot.segment.local.upsert.merger;
+/**
+ * Merges 2 records and returns the merged record.
+ * Add the new value from incoming row to the existing value from numeric field. Then return the merged record.
+ */
public class IncrementMerger implements PartialUpsertMerger {
IncrementMerger() {
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java
index a317d45..4879567 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/OverwriteMerger.java
@@ -18,6 +18,10 @@
*/
package org.apache.pinot.segment.local.upsert.merger;
+/**
+ * Merges 2 records and returns the merged record.
+ * Overwrite the existing value for the given field. Then return the merged record.
+ */
public class OverwriteMerger implements PartialUpsertMerger {
OverwriteMerger() {
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
index 36112c4..47fae1c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
@@ -25,15 +25,21 @@ public class PartialUpsertMergerFactory {
private PartialUpsertMergerFactory() {
}
- private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger();
+ private static final AppendMerger APPEND_MERGER = new AppendMerger();
private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger();
+ private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger();
+ private static final UnionMerger UNION_MERGER = new UnionMerger();
public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) {
switch (strategy) {
- case OVERWRITE:
- return OVERWRITE_MERGER;
+ case APPEND:
+ return APPEND_MERGER;
case INCREMENT:
return INCREMENT_MERGER;
+ case OVERWRITE:
+ return OVERWRITE_MERGER;
+ case UNION:
+ return UNION_MERGER;
default:
throw new IllegalStateException("Unsupported partial upsert strategy: " + strategy);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/UnionMerger.java
similarity index 55%
copy from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/UnionMerger.java
index 6d38ae2..0abcb23 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/UnionMerger.java
@@ -18,27 +18,34 @@
*/
package org.apache.pinot.segment.local.upsert.merger;
-public class IncrementMerger implements PartialUpsertMerger {
- IncrementMerger() {
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Merges 2 records and returns the merged record.
+ * Added the new value from incoming row to the existing value from multi-value field. Then return the merged record.
+ * Union merger will dedup duplicated records in the multi-value field.
+ */
+public class UnionMerger implements PartialUpsertMerger {
+ UnionMerger() {
}
/**
- * Increment the new value from incoming row to the given field of previous record.
+ * Union the new value from incoming row to the given multi-value field of previous record.
*/
@Override
public Object merge(Object previousValue, Object currentValue) {
- return addNumbers((Number) previousValue, (Number) currentValue);
+ return union((Object[]) previousValue, (Object[]) currentValue);
}
- private static Number addNumbers(Number a, Number b) {
- if (a instanceof Integer) {
- return (Integer) a + (Integer) b;
- } else if (a instanceof Long) {
- return (Long) a + (Long) b;
- } else if (a instanceof Float) {
- return (Float) a + (Float) b;
- } else {
- return (Double) a + (Double) b;
+ private static Object union(Object[] a, Object[] b) {
+ Set<Object> union = new TreeSet<>();
+ for (Object value: a) {
+ union.add(value);
+ }
+ for (Object value: b) {
+ union.add(value);
}
+ return union.toArray();
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index a57ca84..63a59a0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -354,6 +354,8 @@ public final class TableConfigUtils {
* - Merger cannot be applied to private key columns
* - Merger cannot be applied to non-existing columns
* - INCREMENT merger must be applied to numeric columns
+ * - APPEND/UNION merger cannot be applied to single-value columns
+ * - INCREMENT merger cannot be applied to date time column
*/
@VisibleForTesting
static void validatePartialUpsertStrategies(TableConfig tableConfig, Schema schema) {
@@ -371,16 +373,21 @@ public final class TableConfigUtils {
List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
String column = entry.getKey();
+ UpsertConfig.Strategy columnStrategy = entry.getValue();
Preconditions.checkState(!primaryKeyColumns.contains(column), "Merger cannot be applied to primary key columns");
FieldSpec fieldSpec = schema.getFieldSpecFor(column);
Preconditions.checkState(fieldSpec != null, "Merger cannot be applied to non-existing column: %s", column);
- if (entry.getValue() == UpsertConfig.Strategy.INCREMENT) {
+ if (columnStrategy == UpsertConfig.Strategy.INCREMENT) {
Preconditions.checkState(fieldSpec.getDataType().getStoredType().isNumeric(),
"INCREMENT merger cannot be applied to non-numeric column: %s", column);
Preconditions.checkState(!schema.getDateTimeNames().contains(column),
"INCREMENT merger cannot be applied to date time column: %s", column);
+ } else if (columnStrategy == UpsertConfig.Strategy.APPEND || columnStrategy == UpsertConfig.Strategy.UNION) {
+ Preconditions
+ .checkState(!fieldSpec.isSingleValueField(), "%s merger cannot be applied to single-value column: %s",
+ columnStrategy.toString(), column);
}
}
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java
new file mode 100644
index 0000000..311c2d4
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert.merger;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class PartialUpsertMergerTest {
+
+ @Test
+ public void testAppendMergers() {
+ AppendMerger appendMerger = new AppendMerger();
+
+ Integer array1[] = {1, 2, 3};
+ Integer array2[] = {3, 4, 6};
+
+ assertEquals(new Integer[]{1, 2, 3, 3, 4, 6}, appendMerger.merge(array1, array2));
+ }
+
+ @Test
+ public void testIncrementMergers() {
+ IncrementMerger incrementMerger = new IncrementMerger();
+ assertEquals(3, incrementMerger.merge(1, 2));
+ }
+
+ @Test
+ public void testOverwriteMergers() {
+ OverwriteMerger overwriteMerger = new OverwriteMerger();
+ assertEquals("newValue", overwriteMerger.merge("oldValue", "newValue"));
+ }
+
+ @Test
+ public void testUnionMergers() {
+ UnionMerger unionMerger = new UnionMerger();
+
+ String array1[] = {"a", "b", "c"};
+ String array2[] = {"c", "d", "e"};
+
+ assertEquals(new String[]{"a", "b", "c", "d", "e"}, unionMerger.merge(array1, array2));
+ }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 6d76c88..d25bc62 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1157,6 +1157,15 @@ public class TableConfigUtilsTest {
}
partialUpsertStratgies.clear();
+ partialUpsertStratgies.put("myCol2", UpsertConfig.Strategy.APPEND);
+ try {
+ TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(), "APPEND merger cannot be applied to single-value column: myCol2");
+ }
+
+ partialUpsertStratgies.clear();
partialUpsertStratgies.put("myTimeCol", UpsertConfig.Strategy.INCREMENT);
try {
TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index cf3e841..74cc876 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -33,8 +33,8 @@ public class UpsertConfig extends BaseJsonConfig {
}
public enum Strategy {
- // Todo: add APPEND, CUSTOM strategies
- OVERWRITE, INCREMENT
+ // Todo: add CUSTOM strategies
+ APPEND, INCREMENT, OVERWRITE, UNION
}
private final Mode _mode;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
index b77cd5d..c9bb919 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PartialUpsertQuickStart.java
@@ -57,7 +57,7 @@ public class PartialUpsertQuickStart {
File tableConfigFile = new File(bootstrapTableDir, "meetupRsvp_realtime_table_config.json");
ClassLoader classLoader = Quickstart.class.getClassLoader();
- URL resource = classLoader.getResource("examples/stream/meetupRsvp/upsert_meetupRsvp_schema.json");
+ URL resource = classLoader.getResource("examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json");
Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, schemaFile);
resource =
@@ -101,19 +101,34 @@ public class PartialUpsertQuickStart {
Thread.sleep(15000);
printStatus(Color.YELLOW, "***** Upsert quickstart setup complete *****");
- printStatus(Color.YELLOW, "***** The expected behavior for total number of documents per PK should be 1 *****");
- printStatus(Color.YELLOW,
- "***** The expected behavior for total number of rsvp_counts per PK should >=1 since it's incremented and updated. *****");
// The expected behavior for total number of documents per PK should be 1.
// The expected behavior for total number of rsvp_counts per PK should >=1 since it's incremented and updated.
+ // The expected behavior for nums of values in group_name fields should equals to rsvp_counts.
String q1 =
"select event_id, count(*), sum(rsvp_count) from meetupRsvp group by event_id order by sum(rsvp_count) desc limit 10";
printStatus(Color.YELLOW, "Total number of documents, total number of rsvp_counts per event_id in the table");
+ printStatus(Color.YELLOW, "***** The expected behavior for total number of documents per PK should be 1 *****");
+ printStatus(Color.YELLOW,
+ "***** The expected behavior for total number of rsvp_counts per PK should >=1 since it's incremented and updated. *****");
+ printStatus(Color.YELLOW,
+ "***** The expected behavior for nums of values in group_name fields should equals to rsvp_counts. *****");
printStatus(Color.CYAN, "Query : " + q1);
printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));
printStatus(Color.GREEN, "***************************************************");
+ // The expected behavior for nums of values in group_name fields should equals to rsvp_counts.
+ String q2 =
+ "select event_id, group_name, venue_name, rsvp_count from meetupRsvp where rsvp_count > 1 order by rsvp_count desc limit 10";
+ printStatus(Color.YELLOW, "Event_id, group_name, venue_name, rsvp_count per per event_id in the table");
+ printStatus(Color.YELLOW,
+ "***** Nums of values in group_name fields should less than or equals to rsvp_count. Duplicate records are not allowed. *****");
+ printStatus(Color.YELLOW,
+ "***** Nums of values in renue_name fields should equals to rsvp_count. Duplicates are allowed. *****");
+ printStatus(Color.CYAN, "Query : " + q2);
+ printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q2)));
+ printStatus(Color.GREEN, "***************************************************");
+
printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console");
}
}
diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
index 62eb14a..18b01ca 100644
--- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
+++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_realtime_table_config.json
@@ -47,7 +47,9 @@
"upsertConfig": {
"mode": "PARTIAL",
"partialUpsertStrategies":{
- "rsvp_count": "INCREMENT"
+ "rsvp_count": "INCREMENT",
+ "group_name": "UNION",
+ "venue_name": "APPEND"
}
}
}
diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json
new file mode 100644
index 0000000..d418960
--- /dev/null
+++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_partial_meetupRsvp_schema.json
@@ -0,0 +1,67 @@
+{
+ "metricFieldSpecs": [
+ {
+ "dataType": "INT",
+ "name": "rsvp_count"
+ }
+ ],
+ "dimensionFieldSpecs": [
+ {
+ "dataType": "STRING",
+ "name": "venue_name",
+ "singleValueField": false
+ },
+ {
+ "dataType": "STRING",
+ "name": "event_name"
+ },
+ {
+ "dataType": "STRING",
+ "name": "event_id"
+ },
+ {
+ "dataType": "TIMESTAMP",
+ "name": "event_time"
+ },
+ {
+ "dataType": "STRING",
+ "name": "group_city"
+ },
+ {
+ "dataType": "STRING",
+ "name": "group_country"
+ },
+ {
+ "dataType": "LONG",
+ "name": "group_id"
+ },
+ {
+ "dataType": "STRING",
+ "name": "group_name",
+ "singleValueField": false
+ },
+ {
+ "dataType": "DOUBLE",
+ "name": "group_lat"
+ },
+ {
+ "dataType": "DOUBLE",
+ "name": "group_lon"
+ },
+ {
+ "dataType": "BYTES",
+ "name": "location",
+ "transformFunction": "toSphericalGeography(stPoint(group_lon,group_lat))"
+ }
+ ],
+ "dateTimeFieldSpecs": [
+ {
+ "name": "mtime",
+ "dataType": "TIMESTAMP",
+ "format": "1:MILLISECONDS:TIMESTAMP",
+ "granularity": "1:MILLISECONDS"
+ }
+ ],
+ "schemaName": "meetupRsvp",
+ "primaryKeyColumns": ["event_id"]
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org