You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/01/15 02:20:34 UTC

[pinot] branch master updated: Add ignoreMerger for partial upsert (#7907)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ec47c4  Add ignoreMerger for partial upsert (#7907)
7ec47c4 is described below

commit 7ec47c420be9c6aee6c8e95644266fe9b7fe7a2b
Author: deemoliu <qi...@uber.com>
AuthorDate: Fri Jan 14 18:20:18 2022 -0800

    Add ignoreMerger for partial upsert (#7907)
---
 .../segment/local/upsert/PartialUpsertHandler.java |  8 ++++++
 ...lUpsertMergerFactory.java => IgnoreMerger.java} | 32 ++++++----------------
 .../upsert/merger/PartialUpsertMergerFactory.java  |  3 ++
 .../upsert/merger/PartialUpsertMergerTest.java     |  7 +++++
 .../pinot/spi/config/table/UpsertConfig.java       |  2 +-
 5 files changed, 28 insertions(+), 24 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
index 4822339..7bc6e6a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -119,6 +119,14 @@ public class PartialUpsertHandler {
 
   /**
    * Merges 2 records and returns the merged record.
+   * We used a map to indicate all configured fields for partial upsert. For these fields
+   * (1) If the prev value is null, return the new value
+   * (2) If the prev record is not null, the new value is null, return the prev value.
+   * (3) If neither values are not null, then merge the value and return.
+   * For un-configured fields, they are using default override behavior, regardless null values.
+   *
+   * For example, overwrite merger will only override the prev value if the new value is not null.
+   * Null values will override existing values if not configured. They can be ignored by using ignoreMerger.
    *
    * @param previousRecord the last derived full record during ingestion.
    * @param newRecord the new consumed record.
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IgnoreMerger.java
similarity index 50%
copy from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IgnoreMerger.java
index 47fae1c..c9bb16f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IgnoreMerger.java
@@ -18,30 +18,16 @@
  */
 package org.apache.pinot.segment.local.upsert.merger;
 
-import org.apache.pinot.spi.config.table.UpsertConfig;
-
-
-public class PartialUpsertMergerFactory {
-  private PartialUpsertMergerFactory() {
+/**
+ * Merges 2 records and returns the merged record.
+ * By default, ignore the new value from incoming row. Then return the merged record.
+ */
+public class IgnoreMerger implements PartialUpsertMerger {
+  IgnoreMerger() {
   }
 
-  private static final AppendMerger APPEND_MERGER = new AppendMerger();
-  private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger();
-  private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger();
-  private static final UnionMerger UNION_MERGER = new UnionMerger();
-
-  public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) {
-    switch (strategy) {
-      case APPEND:
-        return APPEND_MERGER;
-      case INCREMENT:
-        return INCREMENT_MERGER;
-      case OVERWRITE:
-        return OVERWRITE_MERGER;
-      case UNION:
-        return UNION_MERGER;
-      default:
-        throw new IllegalStateException("Unsupported partial upsert strategy: " + strategy);
-    }
+  @Override
+  public Object merge(Object previousValue, Object currentValue) {
+    return previousValue;
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
index 47fae1c..23dc985 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
@@ -27,6 +27,7 @@ public class PartialUpsertMergerFactory {
 
   private static final AppendMerger APPEND_MERGER = new AppendMerger();
   private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger();
+  private static final IgnoreMerger IGNORE_MERGER = new IgnoreMerger();
   private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger();
   private static final UnionMerger UNION_MERGER = new UnionMerger();
 
@@ -36,6 +37,8 @@ public class PartialUpsertMergerFactory {
         return APPEND_MERGER;
       case INCREMENT:
         return INCREMENT_MERGER;
+      case IGNORE:
+        return IGNORE_MERGER;
       case OVERWRITE:
         return OVERWRITE_MERGER;
       case UNION:
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java
index 002b4a4..7a2c398 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java
@@ -42,6 +42,13 @@ public class PartialUpsertMergerTest {
   }
 
   @Test
+  public void testIgnoreMergers() {
+    IgnoreMerger ignoreMerger = new IgnoreMerger();
+    assertEquals(null, ignoreMerger.merge(null, 3));
+    assertEquals(3, ignoreMerger.merge(3, null));
+  }
+
+  @Test
   public void testOverwriteMergers() {
     OverwriteMerger overwriteMerger = new OverwriteMerger();
     assertEquals("newValue", overwriteMerger.merge("oldValue", "newValue"));
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 7e0be94..9184bff 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -37,7 +37,7 @@ public class UpsertConfig extends BaseJsonConfig {
 
   public enum Strategy {
     // Todo: add CUSTOM strategies
-    APPEND, INCREMENT, OVERWRITE, UNION
+    APPEND, IGNORE, INCREMENT, OVERWRITE, UNION
   }
 
   public enum HashFunction {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org