You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/01/15 02:20:34 UTC
[pinot] branch master updated: Add ignoreMerger for partial upsert (#7907)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7ec47c4 Add ignoreMerger for partial upsert (#7907)
7ec47c4 is described below
commit 7ec47c420be9c6aee6c8e95644266fe9b7fe7a2b
Author: deemoliu <qi...@uber.com>
AuthorDate: Fri Jan 14 18:20:18 2022 -0800
Add ignoreMerger for partial upsert (#7907)
---
.../segment/local/upsert/PartialUpsertHandler.java | 8 ++++++
...lUpsertMergerFactory.java => IgnoreMerger.java} | 32 ++++++----------------
.../upsert/merger/PartialUpsertMergerFactory.java | 3 ++
.../upsert/merger/PartialUpsertMergerTest.java | 7 +++++
.../pinot/spi/config/table/UpsertConfig.java | 2 +-
5 files changed, 28 insertions(+), 24 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
index 4822339..7bc6e6a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -119,6 +119,14 @@ public class PartialUpsertHandler {
/**
* Merges 2 records and returns the merged record.
+ * We used a map to indicate all configured fields for partial upsert. For these fields
+ * (1) If the prev value is null, return the new value
+ * (2) If the prev record is not null, the new value is null, return the prev value.
+ * (3) If neither values are not null, then merge the value and return.
+ * For un-configured fields, they are using default override behavior, regardless null values.
+ *
+ * For example, overwrite merger will only override the prev value if the new value is not null.
+ * Null values will override existing values if not configured. They can be ignored by using ignoreMerger.
*
* @param previousRecord the last derived full record during ingestion.
* @param newRecord the new consumed record.
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IgnoreMerger.java
similarity index 50%
copy from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IgnoreMerger.java
index 47fae1c..c9bb16f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IgnoreMerger.java
@@ -18,30 +18,16 @@
*/
package org.apache.pinot.segment.local.upsert.merger;
-import org.apache.pinot.spi.config.table.UpsertConfig;
-
-
-public class PartialUpsertMergerFactory {
- private PartialUpsertMergerFactory() {
+/**
+ * Merges 2 records and returns the merged record.
+ * By default, ignore the new value from incoming row. Then return the merged record.
+ */
+public class IgnoreMerger implements PartialUpsertMerger {
+ IgnoreMerger() {
}
- private static final AppendMerger APPEND_MERGER = new AppendMerger();
- private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger();
- private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger();
- private static final UnionMerger UNION_MERGER = new UnionMerger();
-
- public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) {
- switch (strategy) {
- case APPEND:
- return APPEND_MERGER;
- case INCREMENT:
- return INCREMENT_MERGER;
- case OVERWRITE:
- return OVERWRITE_MERGER;
- case UNION:
- return UNION_MERGER;
- default:
- throw new IllegalStateException("Unsupported partial upsert strategy: " + strategy);
- }
+ @Override
+ public Object merge(Object previousValue, Object currentValue) {
+ return previousValue;
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
index 47fae1c..23dc985 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
@@ -27,6 +27,7 @@ public class PartialUpsertMergerFactory {
private static final AppendMerger APPEND_MERGER = new AppendMerger();
private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger();
+ private static final IgnoreMerger IGNORE_MERGER = new IgnoreMerger();
private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger();
private static final UnionMerger UNION_MERGER = new UnionMerger();
@@ -36,6 +37,8 @@ public class PartialUpsertMergerFactory {
return APPEND_MERGER;
case INCREMENT:
return INCREMENT_MERGER;
+ case IGNORE:
+ return IGNORE_MERGER;
case OVERWRITE:
return OVERWRITE_MERGER;
case UNION:
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java
index 002b4a4..7a2c398 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java
@@ -42,6 +42,13 @@ public class PartialUpsertMergerTest {
}
@Test
+ public void testIgnoreMergers() {
+ IgnoreMerger ignoreMerger = new IgnoreMerger();
+ assertEquals(null, ignoreMerger.merge(null, 3));
+ assertEquals(3, ignoreMerger.merge(3, null));
+ }
+
+ @Test
public void testOverwriteMergers() {
OverwriteMerger overwriteMerger = new OverwriteMerger();
assertEquals("newValue", overwriteMerger.merge("oldValue", "newValue"));
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 7e0be94..9184bff 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -37,7 +37,7 @@ public class UpsertConfig extends BaseJsonConfig {
public enum Strategy {
// Todo: add CUSTOM strategies
- APPEND, INCREMENT, OVERWRITE, UNION
+ APPEND, IGNORE, INCREMENT, OVERWRITE, UNION
}
public enum HashFunction {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org