You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/04 08:37:33 UTC
[flink-table-store] branch master updated: [FLINK-27336] Avoid merging when there is only one record
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new badc4cf2 [FLINK-27336] Avoid merging when there is only one record
badc4cf2 is described below
commit badc4cf2d6d3e24c84e0e6c474fb96b90646cd09
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Mon Jul 4 16:37:29 2022 +0800
[FLINK-27336] Avoid merging when there is only one record
This closes #104
---
.../store/file/mergetree/SortBufferMemTable.java | 13 +--
.../mergetree/compact/MergeFunctionHelper.java | 66 +++++++++++++++
.../file/mergetree/compact/SortMergeReader.java | 10 +--
.../compact/MergeFunctionHelperTestBase.java | 97 ++++++++++++++++++++++
.../table/store/file/utils/ReusingTestData.java | 9 +-
5 files changed, 182 insertions(+), 13 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
index 4cb43a3e..87186888 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.store.codegen.CodeGenUtils;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializer;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunctionHelper;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -115,7 +116,7 @@ public class SortBufferMemTable implements MemTable {
private class MergeIterator implements Iterator<KeyValue> {
private final MutableObjectIterator<BinaryRowData> kvIter;
private final Comparator<RowData> keyComparator;
- private final MergeFunction mergeFunction;
+ private final MergeFunctionHelper mergeFunctionHelper;
// holds the merged value
private KeyValueSerializer previous;
@@ -131,7 +132,7 @@ public class SortBufferMemTable implements MemTable {
MergeFunction mergeFunction) {
this.kvIter = kvIter;
this.keyComparator = keyComparator;
- this.mergeFunction = mergeFunction;
+ this.mergeFunctionHelper = new MergeFunctionHelper(mergeFunction);
int totalFieldCount = keyType.getFieldCount() + 2 + valueType.getFieldCount();
this.previous = new KeyValueSerializer(keyType, valueType);
@@ -170,8 +171,8 @@ public class SortBufferMemTable implements MemTable {
if (previousRow == null) {
return;
}
- mergeFunction.reset();
- mergeFunction.add(previous.getReusedKv().value());
+ mergeFunctionHelper.reset();
+ mergeFunctionHelper.add(previous.getReusedKv().value());
while (readOnce()) {
if (keyComparator.compare(
@@ -179,10 +180,10 @@ public class SortBufferMemTable implements MemTable {
!= 0) {
break;
}
- mergeFunction.add(current.getReusedKv().value());
+ mergeFunctionHelper.add(current.getReusedKv().value());
swapSerializers();
}
- result = mergeFunction.getValue();
+ result = mergeFunctionHelper.getValue();
} while (result == null);
previous.getReusedKv().setValue(result);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelper.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelper.java
new file mode 100644
index 00000000..f3ae6192
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.RowData;
+
+/** Helper functions for the interaction with {@link MergeFunction}. */
+public class MergeFunctionHelper {
+
+ private final MergeFunction mergeFunction;
+
+ private RowData rowData;
+ private boolean isInitialized;
+
+ public MergeFunctionHelper(MergeFunction mergeFunction) {
+ this.mergeFunction = mergeFunction;
+ }
+
+ /**
+ * Resets the {@link MergeFunction} helper to its default state: 1. Clears the one record which
+ * the helper maintains. 2. Resets the {@link MergeFunction} to its default state. 3. Clears the
+ * initialized state of the {@link MergeFunction}.
+ */
+ public void reset() {
+ rowData = null;
+ mergeFunction.reset();
+ isInitialized = false;
+ }
+
+ /** Adds the given {@link RowData} to the {@link MergeFunction} helper. */
+ public void add(RowData value) {
+ if (rowData == null) {
+ rowData = value;
+ } else {
+ if (!isInitialized) {
+ mergeFunction.add(rowData);
+ isInitialized = true;
+ }
+ mergeFunction.add(value);
+ }
+ }
+
+ /**
+ * Get current value of the {@link MergeFunction} helper. Return null if the value should be
+ * skipped.
+ */
+ public RowData getValue() {
+ return isInitialized ? mergeFunction.getValue() : rowData;
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
index 3ad22100..64112d70 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
@@ -42,7 +42,7 @@ public class SortMergeReader implements RecordReader<KeyValue> {
private final List<RecordReader<KeyValue>> nextBatchReaders;
private final Comparator<RowData> userKeyComparator;
- private final MergeFunction mergeFunction;
+ private final MergeFunctionHelper mergeFunctionHelper;
private final PriorityQueue<Element> minHeap;
private final List<Element> polled;
@@ -53,7 +53,7 @@ public class SortMergeReader implements RecordReader<KeyValue> {
MergeFunction mergeFunction) {
this.nextBatchReaders = new ArrayList<>(readers);
this.userKeyComparator = userKeyComparator;
- this.mergeFunction = mergeFunction;
+ this.mergeFunctionHelper = new MergeFunctionHelper(mergeFunction);
this.minHeap =
new PriorityQueue<>(
@@ -130,7 +130,7 @@ public class SortMergeReader implements RecordReader<KeyValue> {
if (!hasMore) {
return null;
}
- RowData mergedValue = mergeFunction.getValue();
+ RowData mergedValue = mergeFunctionHelper.getValue();
if (mergedValue != null) {
return polled.get(polled.size() - 1).kv.setValue(mergedValue);
}
@@ -163,7 +163,7 @@ public class SortMergeReader implements RecordReader<KeyValue> {
return false;
}
- mergeFunction.reset();
+ mergeFunctionHelper.reset();
RowData key =
Preconditions.checkNotNull(minHeap.peek(), "Min heap is empty. This is a bug.")
.kv
@@ -177,7 +177,7 @@ public class SortMergeReader implements RecordReader<KeyValue> {
break;
}
minHeap.poll();
- mergeFunction.add(element.kv.value());
+ mergeFunctionHelper.add(element.kv.value());
polled.add(element);
}
return true;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelperTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelperTestBase.java
new file mode 100644
index 00000000..debc67f3
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelperTestBase.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.store.file.data.DataFileTestUtils.row;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for {@link MergeFunctionHelper}. */
+public abstract class MergeFunctionHelperTestBase {
+
+ protected MergeFunctionHelper mergeFunctionHelper;
+
+ protected abstract MergeFunction createMergeFunction();
+
+ protected abstract RowData getExpected(List<RowData> rows);
+
+ @BeforeEach
+ void setUp() {
+ mergeFunctionHelper = new MergeFunctionHelper(createMergeFunction());
+ }
+
+ @MethodSource("provideMergedRowData")
+ @ParameterizedTest
+ public void testMergeFunctionHelper(List<RowData> rows) {
+ rows.forEach(r -> mergeFunctionHelper.add(r));
+ assertEquals(getExpected(rows), mergeFunctionHelper.getValue());
+ }
+
+ public static Stream<Arguments> provideMergedRowData() {
+ return Stream.of(
+ Arguments.of(Collections.singletonList(row(1))),
+ Arguments.of(Arrays.asList(row(-1), row(1))),
+ Arguments.of(Arrays.asList(row(1), row(2))));
+ }
+
+ /** Tests for {@link MergeFunctionHelper} with {@link DeduplicateMergeFunction}. */
+ public static class WithDeduplicateMergeFunctionTest extends MergeFunctionHelperTestBase {
+
+ @Override
+ protected MergeFunction createMergeFunction() {
+ return new DeduplicateMergeFunction();
+ }
+
+ @Override
+ protected RowData getExpected(List<RowData> rows) {
+ return rows.get(rows.size() - 1);
+ }
+ }
+
+ /** Tests for {@link MergeFunctionHelper} with {@link ValueCountMergeFunction}. */
+ public static class WithValueRecordMergeFunctionTest extends MergeFunctionHelperTestBase {
+
+ @Override
+ protected MergeFunction createMergeFunction() {
+ return new ValueCountMergeFunction();
+ }
+
+ @Override
+ protected RowData getExpected(List<RowData> rows) {
+ if (rows.size() == 1) {
+ return rows.get(0);
+ } else {
+ long total = rows.stream().mapToLong(r -> r.getLong(0)).sum();
+ return total == 0 ? null : GenericRowData.of(total);
+ }
+ }
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
index 44c1555f..e785b98f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
@@ -105,7 +105,7 @@ public class ReusingTestData implements Comparable<ReusingTestData> {
random.nextInt(numRecords),
generator.next(),
random.nextBoolean() || onlyAdd ? RowKind.INSERT : RowKind.DELETE,
- random.nextInt(10) - 5));
+ getValue(random)));
}
return result;
}
@@ -123,11 +123,16 @@ public class ReusingTestData implements Comparable<ReusingTestData> {
key,
generator.next(),
random.nextBoolean() || onlyAdd ? RowKind.INSERT : RowKind.DELETE,
- random.nextInt(10) - 5));
+ getValue(random)));
}
return new ArrayList<>(result.values());
}
+ private static long getValue(ThreadLocalRandom random) {
+ int value = random.nextInt(10) - 5;
+ return value == 0 ? getValue(random) : value;
+ }
+
private static class SequenceNumberGenerator {
private final Set<Long> usedSequenceNumber;