You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/07/19 04:08:58 UTC

[incubator-paimon] branch master updated: [core] Fix Object reusing bug in PartialUpdateMergeFunction (#1594)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d6a9152ca [core] Fix Object reusing bug in PartialUpdateMergeFunction (#1594)
d6a9152ca is described below

commit d6a9152ca3e01add45f00f357776f09c90001ff8
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed Jul 19 12:08:54 2023 +0800

    [core] Fix Object reusing bug in PartialUpdateMergeFunction (#1594)
---
 .../paimon/mergetree/compact/MergeFunction.java    |  4 +-
 .../compact/PartialUpdateMergeFunction.java        | 27 ++++---
 .../mergetree/SortBufferWriteBufferTestBase.java   | 91 +++++++++++++++++++++-
 .../mergetree/compact/MergeFunctionTestUtils.java  | 52 +++++++++++++
 4 files changed, 159 insertions(+), 15 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunction.java
index 1e8c7be8b..2141cd64e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunction.java
@@ -29,8 +29,8 @@ import javax.annotation.Nullable;
  *
  * <ul>
  *   <li>Please don't save KeyValue and InternalRow references to the List: the KeyValue of the
- *       first two articles and the InternalRow object inside them are safe, but the reference of
- *       the third article may overwrite the reference of the first article.
+ *       first two objects and the InternalRow object inside them are safe, but the reference of the
+ *       third object may overwrite the reference of the first object.
  *   <li>You can save fields references: fields don't reuse their objects.
  * </ul>
  *
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 3bbcafd00..d99d96586 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -56,7 +56,9 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
     private final boolean ignoreDelete;
     private final Map<Integer, SequenceGenerator> fieldSequences;
 
-    private KeyValue latestKv;
+    private InternalRow currentKey;
+    private long latestSequenceNumber;
+    private boolean isEmpty;
     private GenericRow row;
     private KeyValue reused;
 
@@ -71,13 +73,18 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
 
     @Override
     public void reset() {
-        this.latestKv = null;
+        this.currentKey = null;
         this.row = new GenericRow(getters.length);
+        this.isEmpty = true;
     }
 
     @Override
     public void add(KeyValue kv) {
-        if (kv.valueKind() == RowKind.UPDATE_BEFORE || kv.valueKind() == RowKind.DELETE) {
+        // refresh key object to avoid reference overwritten
+        currentKey = kv.key();
+
+        // ignore delete?
+        if (kv.valueKind().isRetract()) {
             if (ignoreDelete) {
                 return;
             }
@@ -91,7 +98,8 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
                     "Partial update can not accept delete records. Partial delete is not supported!");
         }
 
-        latestKv = kv;
+        latestSequenceNumber = kv.sequenceNumber();
+        isEmpty = false;
         if (fieldSequences.isEmpty()) {
             updateNonNullFields(kv);
         } else {
@@ -131,19 +139,14 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
     @Override
     @Nullable
     public KeyValue getResult() {
-        if (latestKv == null) {
-            if (ignoreDelete) {
-                return null;
-            }
-
-            throw new IllegalArgumentException(
-                    "Trying to get result from merge function without any input. This is unexpected.");
+        if (isEmpty) {
+            return null;
         }
 
         if (reused == null) {
             reused = new KeyValue();
         }
-        return reused.replace(latestKv.key(), latestKv.sequenceNumber(), RowKind.INSERT, row);
+        return reused.replace(currentKey, latestSequenceNumber, RowKind.INSERT, row);
     }
 
     public static MergeFunctionFactory<KeyValue> factory(Options options, RowType rowType) {
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index e4aaa3dc2..0e51dbe8e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -18,16 +18,23 @@
 
 package org.apache.paimon.mergetree;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordComparator;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.mergetree.compact.LookupMergeFunction;
 import org.apache.paimon.mergetree.compact.MergeFunction;
+import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import org.apache.paimon.mergetree.compact.MergeFunctionTestUtils;
+import org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction;
 import org.apache.paimon.mergetree.compact.ValueCountMergeFunction;
+import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.sort.BinaryInMemorySortBuffer;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ReusingKeyValue;
@@ -106,7 +113,10 @@ public abstract class SortBufferWriteBufferTestBase {
                 KEY_COMPARATOR,
                 createMergeFunction(),
                 null,
-                kv -> expected.poll().assertEquals(kv));
+                kv -> {
+                    assertThat(expected.isEmpty()).isFalse();
+                    expected.poll().assertEquals(kv);
+                });
         assertThat(expected).isEmpty();
     }
 
@@ -177,4 +187,83 @@ public abstract class SortBufferWriteBufferTestBase {
             runTest(ReusingTestData.parse("1, 2, +, 100 | 1, 1, +, -100"));
         }
     }
+
+    /** Test for {@link SortBufferWriteBuffer} with {@link PartialUpdateMergeFunction}. */
+    public static class WithPartialUpdateMergeFunctionTest extends SortBufferWriteBufferTestBase {
+
+        @Override
+        protected boolean addOnly() {
+            return false;
+        }
+
+        @Override
+        protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
+            return MergeFunctionTestUtils.getExpectedForPartialUpdate(input);
+        }
+
+        @Override
+        protected MergeFunction<KeyValue> createMergeFunction() {
+            Options options = new Options();
+            options.set(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE, true);
+            return PartialUpdateMergeFunction.factory(options, RowType.of(DataTypes.BIGINT()))
+                    .create();
+        }
+    }
+
+    /** Test for {@link SortBufferWriteBuffer} with {@link AggregateMergeFunction}. */
+    public static class WithAggMergeFunctionTest extends SortBufferWriteBufferTestBase {
+
+        @Override
+        protected boolean addOnly() {
+            return false;
+        }
+
+        @Override
+        protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
+            return MergeFunctionTestUtils.getExpectedForAggSum(input);
+        }
+
+        @Override
+        protected MergeFunction<KeyValue> createMergeFunction() {
+            Options options = new Options();
+            options.set("fields.value.aggregate-function", "sum");
+            return AggregateMergeFunction.factory(
+                            options,
+                            Collections.singletonList("value"),
+                            Collections.singletonList(DataTypes.BIGINT()),
+                            Collections.emptyList())
+                    .create();
+        }
+    }
+
+    /** Test for {@link SortBufferWriteBuffer} with {@link LookupMergeFunction}. */
+    public static class WithLookupFunctionTest extends SortBufferWriteBufferTestBase {
+
+        @Override
+        protected boolean addOnly() {
+            return false;
+        }
+
+        @Override
+        protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
+            return MergeFunctionTestUtils.getExpectedForAggSum(input);
+        }
+
+        @Override
+        protected MergeFunction<KeyValue> createMergeFunction() {
+            Options options = new Options();
+            options.set("fields.value.aggregate-function", "sum");
+            MergeFunctionFactory<KeyValue> aggMergeFunction =
+                    AggregateMergeFunction.factory(
+                            options,
+                            Collections.singletonList("value"),
+                            Collections.singletonList(DataTypes.BIGINT()),
+                            Collections.emptyList());
+            return LookupMergeFunction.wrap(
+                            aggMergeFunction,
+                            RowType.of(DataTypes.INT()),
+                            RowType.of(DataTypes.BIGINT()))
+                    .create();
+        }
+    }
 }
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
index 813f16ee3..4b9da284c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
@@ -25,6 +25,7 @@ import org.apache.paimon.utils.ReusingTestData;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -69,6 +70,57 @@ public class MergeFunctionTestUtils {
         return expected;
     }
 
+    public static List<ReusingTestData> getExpectedForPartialUpdate(List<ReusingTestData> input) {
+        input = new ArrayList<>(input);
+        Collections.sort(input);
+
+        LinkedHashMap<Integer, List<ReusingTestData>> groups = new LinkedHashMap<>();
+        for (ReusingTestData d : input) {
+            groups.computeIfAbsent(d.key, k -> new ArrayList<>()).add(d);
+        }
+
+        List<ReusingTestData> expected = new ArrayList<>();
+        for (List<ReusingTestData> group : groups.values()) {
+            if (group.size() == 1) {
+                // due to ReducerMergeFunctionWrapper
+                expected.add(group.get(0));
+            } else {
+                group.stream()
+                        .filter(d -> d.valueKind.isAdd())
+                        .reduce((first, second) -> second)
+                        .ifPresent(expected::add);
+            }
+        }
+        return expected;
+    }
+
+    public static List<ReusingTestData> getExpectedForAggSum(List<ReusingTestData> input) {
+        input = new ArrayList<>(input);
+        Collections.sort(input);
+
+        LinkedHashMap<Integer, List<ReusingTestData>> groups = new LinkedHashMap<>();
+        for (ReusingTestData d : input) {
+            groups.computeIfAbsent(d.key, k -> new ArrayList<>()).add(d);
+        }
+
+        List<ReusingTestData> expected = new ArrayList<>();
+        for (List<ReusingTestData> group : groups.values()) {
+            if (group.size() == 1) {
+                // due to ReducerMergeFunctionWrapper
+                expected.add(group.get(0));
+            } else {
+                long sum =
+                        group.stream()
+                                .mapToLong(d -> d.valueKind.isAdd() ? d.value : -d.value)
+                                .sum();
+                ReusingTestData last = group.get(group.size() - 1);
+                expected.add(
+                        new ReusingTestData(last.key, last.sequenceNumber, RowKind.INSERT, sum));
+            }
+        }
+        return expected;
+    }
+
     public static void assertKvsEquals(List<KeyValue> expected, List<KeyValue> actual) {
         assertThat(actual).hasSize(expected.size());
         for (int i = 0; i < actual.size(); i++) {