You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/04 08:37:33 UTC

[flink-table-store] branch master updated: [FLINK-27336] Avoid merging when there is only one record

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new badc4cf2 [FLINK-27336] Avoid merging when there is only one record
badc4cf2 is described below

commit badc4cf2d6d3e24c84e0e6c474fb96b90646cd09
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Mon Jul 4 16:37:29 2022 +0800

    [FLINK-27336] Avoid merging when there is only one record
    
    This closes #104
---
 .../store/file/mergetree/SortBufferMemTable.java   | 13 +--
 .../mergetree/compact/MergeFunctionHelper.java     | 66 +++++++++++++++
 .../file/mergetree/compact/SortMergeReader.java    | 10 +--
 .../compact/MergeFunctionHelperTestBase.java       | 97 ++++++++++++++++++++++
 .../table/store/file/utils/ReusingTestData.java    |  9 +-
 5 files changed, 182 insertions(+), 13 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
index 4cb43a3e..87186888 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.store.codegen.CodeGenUtils;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializer;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunctionHelper;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -115,7 +116,7 @@ public class SortBufferMemTable implements MemTable {
     private class MergeIterator implements Iterator<KeyValue> {
         private final MutableObjectIterator<BinaryRowData> kvIter;
         private final Comparator<RowData> keyComparator;
-        private final MergeFunction mergeFunction;
+        private final MergeFunctionHelper mergeFunctionHelper;
 
         // holds the merged value
         private KeyValueSerializer previous;
@@ -131,7 +132,7 @@ public class SortBufferMemTable implements MemTable {
                 MergeFunction mergeFunction) {
             this.kvIter = kvIter;
             this.keyComparator = keyComparator;
-            this.mergeFunction = mergeFunction;
+            this.mergeFunctionHelper = new MergeFunctionHelper(mergeFunction);
 
             int totalFieldCount = keyType.getFieldCount() + 2 + valueType.getFieldCount();
             this.previous = new KeyValueSerializer(keyType, valueType);
@@ -170,8 +171,8 @@ public class SortBufferMemTable implements MemTable {
                 if (previousRow == null) {
                     return;
                 }
-                mergeFunction.reset();
-                mergeFunction.add(previous.getReusedKv().value());
+                mergeFunctionHelper.reset();
+                mergeFunctionHelper.add(previous.getReusedKv().value());
 
                 while (readOnce()) {
                     if (keyComparator.compare(
@@ -179,10 +180,10 @@ public class SortBufferMemTable implements MemTable {
                             != 0) {
                         break;
                     }
-                    mergeFunction.add(current.getReusedKv().value());
+                    mergeFunctionHelper.add(current.getReusedKv().value());
                     swapSerializers();
                 }
-                result = mergeFunction.getValue();
+                result = mergeFunctionHelper.getValue();
             } while (result == null);
             previous.getReusedKv().setValue(result);
         }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelper.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelper.java
new file mode 100644
index 00000000..f3ae6192
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.RowData;
+
+/** Helper functions for the interaction with {@link MergeFunction}. */
+public class MergeFunctionHelper {
+
+    private final MergeFunction mergeFunction;
+
+    private RowData rowData;
+    private boolean isInitialized;
+
+    public MergeFunctionHelper(MergeFunction mergeFunction) {
+        this.mergeFunction = mergeFunction;
+    }
+
+    /**
+     * Resets the {@link MergeFunction} helper to its default state: 1. Clears the one record which
+     * the helper maintains. 2. Resets the {@link MergeFunction} to its default state. 3. Clears the
+     * initialized state of the {@link MergeFunction}.
+     */
+    public void reset() {
+        rowData = null;
+        mergeFunction.reset();
+        isInitialized = false;
+    }
+
+    /** Adds the given {@link RowData} to the {@link MergeFunction} helper. */
+    public void add(RowData value) {
+        if (rowData == null) {
+            rowData = value;
+        } else {
+            if (!isInitialized) {
+                mergeFunction.add(rowData);
+                isInitialized = true;
+            }
+            mergeFunction.add(value);
+        }
+    }
+
+    /**
+     * Get current value of the {@link MergeFunction} helper. Return null if the value should be
+     * skipped.
+     */
+    public RowData getValue() {
+        return isInitialized ? mergeFunction.getValue() : rowData;
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
index 3ad22100..64112d70 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
@@ -42,7 +42,7 @@ public class SortMergeReader implements RecordReader<KeyValue> {
 
     private final List<RecordReader<KeyValue>> nextBatchReaders;
     private final Comparator<RowData> userKeyComparator;
-    private final MergeFunction mergeFunction;
+    private final MergeFunctionHelper mergeFunctionHelper;
 
     private final PriorityQueue<Element> minHeap;
     private final List<Element> polled;
@@ -53,7 +53,7 @@ public class SortMergeReader implements RecordReader<KeyValue> {
             MergeFunction mergeFunction) {
         this.nextBatchReaders = new ArrayList<>(readers);
         this.userKeyComparator = userKeyComparator;
-        this.mergeFunction = mergeFunction;
+        this.mergeFunctionHelper = new MergeFunctionHelper(mergeFunction);
 
         this.minHeap =
                 new PriorityQueue<>(
@@ -130,7 +130,7 @@ public class SortMergeReader implements RecordReader<KeyValue> {
                 if (!hasMore) {
                     return null;
                 }
-                RowData mergedValue = mergeFunction.getValue();
+                RowData mergedValue = mergeFunctionHelper.getValue();
                 if (mergedValue != null) {
                     return polled.get(polled.size() - 1).kv.setValue(mergedValue);
                 }
@@ -163,7 +163,7 @@ public class SortMergeReader implements RecordReader<KeyValue> {
                 return false;
             }
 
-            mergeFunction.reset();
+            mergeFunctionHelper.reset();
             RowData key =
                     Preconditions.checkNotNull(minHeap.peek(), "Min heap is empty. This is a bug.")
                             .kv
@@ -177,7 +177,7 @@ public class SortMergeReader implements RecordReader<KeyValue> {
                     break;
                 }
                 minHeap.poll();
-                mergeFunction.add(element.kv.value());
+                mergeFunctionHelper.add(element.kv.value());
                 polled.add(element);
             }
             return true;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelperTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelperTestBase.java
new file mode 100644
index 00000000..debc67f3
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelperTestBase.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.store.file.data.DataFileTestUtils.row;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for {@link MergeFunctionHelper}. */
+public abstract class MergeFunctionHelperTestBase {
+
+    protected MergeFunctionHelper mergeFunctionHelper;
+
+    protected abstract MergeFunction createMergeFunction();
+
+    protected abstract RowData getExpected(List<RowData> rows);
+
+    @BeforeEach
+    void setUp() {
+        mergeFunctionHelper = new MergeFunctionHelper(createMergeFunction());
+    }
+
+    @MethodSource("provideMergedRowData")
+    @ParameterizedTest
+    public void testMergeFunctionHelper(List<RowData> rows) {
+        rows.forEach(r -> mergeFunctionHelper.add(r));
+        assertEquals(getExpected(rows), mergeFunctionHelper.getValue());
+    }
+
+    public static Stream<Arguments> provideMergedRowData() {
+        return Stream.of(
+                Arguments.of(Collections.singletonList(row(1))),
+                Arguments.of(Arrays.asList(row(-1), row(1))),
+                Arguments.of(Arrays.asList(row(1), row(2))));
+    }
+
+    /** Tests for {@link MergeFunctionHelper} with {@link DeduplicateMergeFunction}. */
+    public static class WithDeduplicateMergeFunctionTest extends MergeFunctionHelperTestBase {
+
+        @Override
+        protected MergeFunction createMergeFunction() {
+            return new DeduplicateMergeFunction();
+        }
+
+        @Override
+        protected RowData getExpected(List<RowData> rows) {
+            return rows.get(rows.size() - 1);
+        }
+    }
+
+    /** Tests for {@link MergeFunctionHelper} with {@link ValueCountMergeFunction}. */
+    public static class WithValueRecordMergeFunctionTest extends MergeFunctionHelperTestBase {
+
+        @Override
+        protected MergeFunction createMergeFunction() {
+            return new ValueCountMergeFunction();
+        }
+
+        @Override
+        protected RowData getExpected(List<RowData> rows) {
+            if (rows.size() == 1) {
+                return rows.get(0);
+            } else {
+                long total = rows.stream().mapToLong(r -> r.getLong(0)).sum();
+                return total == 0 ? null : GenericRowData.of(total);
+            }
+        }
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
index 44c1555f..e785b98f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
@@ -105,7 +105,7 @@ public class ReusingTestData implements Comparable<ReusingTestData> {
                             random.nextInt(numRecords),
                             generator.next(),
                             random.nextBoolean() || onlyAdd ? RowKind.INSERT : RowKind.DELETE,
-                            random.nextInt(10) - 5));
+                            getValue(random)));
         }
         return result;
     }
@@ -123,11 +123,16 @@ public class ReusingTestData implements Comparable<ReusingTestData> {
                             key,
                             generator.next(),
                             random.nextBoolean() || onlyAdd ? RowKind.INSERT : RowKind.DELETE,
-                            random.nextInt(10) - 5));
+                            getValue(random)));
         }
         return new ArrayList<>(result.values());
     }
 
+    private static long getValue(ThreadLocalRandom random) {
+        int value = random.nextInt(10) - 5;
+        return value == 0 ? getValue(random) : value;
+    }
+
     private static class SequenceNumberGenerator {
         private final Set<Long> usedSequenceNumber;