You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/11/22 23:34:07 UTC

[pinot] branch master updated: Add max merger and min mergers for partial upsert (#9665)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d62a867d46 Add max merger and min mergers for partial upsert (#9665)
d62a867d46 is described below

commit d62a867d46433567f7e2c488745013a6a847e4ab
Author: deemoliu <qi...@uber.com>
AuthorDate: Tue Nov 22 15:34:00 2022 -0800

    Add max merger and min mergers for partial upsert (#9665)
---
 .../segment/local/upsert/merger/MaxMerger.java     | 33 ++++++++++++++++++++++
 .../segment/local/upsert/merger/MinMerger.java     | 33 ++++++++++++++++++++++
 .../upsert/merger/PartialUpsertMergerFactory.java  |  6 ++++
 .../upsert/merger/PartialUpsertMergerTest.java     | 10 +++++++
 .../pinot/spi/config/table/UpsertConfig.java       |  2 +-
 5 files changed, 83 insertions(+), 1 deletion(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MaxMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MaxMerger.java
new file mode 100644
index 0000000000..9ab421b45d
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MaxMerger.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert.merger;
+
+public class MaxMerger implements PartialUpsertMerger {
+
+  MaxMerger() {
+  }
+
+  /**
+   * Keep the maximal value for the given field.
+   */
+  @Override
+  public Object merge(Object previousValue, Object currentValue) {
+    return ((Comparable) previousValue).compareTo((Comparable) currentValue) > 0 ? previousValue : currentValue;
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MinMerger.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MinMerger.java
new file mode 100644
index 0000000000..49f7f2380f
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/MinMerger.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert.merger;
+
+public class MinMerger implements PartialUpsertMerger {
+
+  MinMerger() {
+  }
+
+  /**
+   * Keep the minimal value for the given field.
+   */
+  @Override
+  public Object merge(Object previousValue, Object currentValue) {
+    return ((Comparable) previousValue).compareTo((Comparable) currentValue) < 0 ? previousValue : currentValue;
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
index 23dc985d9f..55e0912c8f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
@@ -29,6 +29,8 @@ public class PartialUpsertMergerFactory {
   private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger();
   private static final IgnoreMerger IGNORE_MERGER = new IgnoreMerger();
   private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger();
+  private static final MaxMerger MAX_MERGER = new MaxMerger();
+  private static final MinMerger MIN_MERGER = new MinMerger();
   private static final UnionMerger UNION_MERGER = new UnionMerger();
 
   public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) {
@@ -39,6 +41,10 @@ public class PartialUpsertMergerFactory {
         return INCREMENT_MERGER;
       case IGNORE:
         return IGNORE_MERGER;
+      case MAX:
+        return MAX_MERGER;
+      case MIN:
+        return MIN_MERGER;
       case OVERWRITE:
         return OVERWRITE_MERGER;
       case UNION:
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java
index 7a2c398977..ae75c81c4c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerTest.java
@@ -48,6 +48,16 @@ public class PartialUpsertMergerTest {
     assertEquals(3, ignoreMerger.merge(3, null));
   }
 
+  @Test
+  public void testMaxMinMergers() {
+    MaxMerger maxMerger = new MaxMerger();
+    MinMerger minMerger = new MinMerger();
+    assertEquals(1, maxMerger.merge(0, 1));
+    assertEquals(0, minMerger.merge(0, 1));
+    assertEquals(1, maxMerger.merge(1, 0));
+    assertEquals(0, minMerger.merge(1, 0));
+  }
+
   @Test
   public void testOverwriteMergers() {
     OverwriteMerger overwriteMerger = new OverwriteMerger();
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index ae0522f6cd..0c06d9e07d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -36,7 +36,7 @@ public class UpsertConfig extends BaseJsonConfig {
 
   public enum Strategy {
     // Todo: add CUSTOM strategies
-    APPEND, IGNORE, INCREMENT, OVERWRITE, UNION
+    APPEND, IGNORE, INCREMENT, MAX, MIN, OVERWRITE, UNION
   }
 
   @JsonPropertyDescription("Upsert mode.")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org