You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/13 08:55:19 UTC

[flink-table-store] 03/06: [FLINK-25628] Introduce ConcatRecordReader

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 610269895727efaa314d54253f5b1acfc3f769f5
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jan 12 19:44:01 2022 +0800

    [FLINK-25628] Introduce ConcatRecordReader
    
    Co-authored-by: Jane Chan <55...@users.noreply.github.com>
    Co-authored-by: tsreaper <ts...@gmail.com>
---
 .../file/mergetree/compact/ConcatRecordReader.java | 84 ++++++++++++++++++++++
 .../mergetree/compact/ConcatRecordReaderTest.java  | 67 +++++++++++++++++
 2 files changed, 151 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java
new file mode 100644
index 0000000..e8c6e44
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * This reader is to concatenate a list of {@link RecordReader}s and read them sequentially. The
+ * input list is already sorted by key and sequence number, and the key intervals do not overlap
+ * each other.
+ */
+public class ConcatRecordReader implements RecordReader {
+
+    private final Queue<ReaderSupplier> queue;
+
+    private RecordReader current;
+
+    protected ConcatRecordReader(List<ReaderSupplier> readerFactories) {
+        readerFactories.forEach(
+                supplier ->
+                        Preconditions.checkNotNull(supplier, "Reader factory must not be null."));
+        this.queue = new LinkedList<>(readerFactories);
+    }
+
+    public static RecordReader create(List<ReaderSupplier> readers) throws IOException {
+        return readers.size() == 1 ? readers.get(0).get() : new ConcatRecordReader(readers);
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator readBatch() throws IOException {
+        while (true) {
+            if (current != null) {
+                RecordIterator iterator = current.readBatch();
+                if (iterator != null) {
+                    return iterator;
+                }
+                current.close();
+                current = null;
+            } else if (queue.size() > 0) {
+                current = queue.poll().get();
+            } else {
+                return null;
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (current != null) {
+            current.close();
+        }
+    }
+
+    /** Supplier to get {@link RecordReader}. */
+    @FunctionalInterface
+    public interface ReaderSupplier {
+        RecordReader get() throws IOException;
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReaderTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReaderTest.java
new file mode 100644
index 0000000..6ed2edf
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReaderTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.ReusingTestData;
+import org.apache.flink.table.store.file.utils.TestReusingRecordReader;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Tests for {@link ConcatRecordReader}. */
+public class ConcatRecordReaderTest extends CombiningRecordReaderTestBase {
+
+    @Override
+    protected boolean addOnly() {
+        return false;
+    }
+
+    @Override
+    protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
+        return input;
+    }
+
+    @Override
+    protected RecordReader createRecordReader(List<TestReusingRecordReader> readers) {
+        return new ConcatRecordReader(
+                readers.stream()
+                        .map(r -> (ConcatRecordReader.ReaderSupplier) () -> r)
+                        .collect(Collectors.toList()));
+    }
+
+    @Test
+    public void testSmallData() throws IOException {
+        runTest(
+                parseData(
+                        "1, 1, +, 100 | 3, 2, +, 300 | 5, 3, -, 500 | "
+                                + "7, 4, +, 700 | 9, 20, +, 900",
+                        "",
+                        "12, 6, +, 1200 |  14, 7, +, 1400 |  16, 8, -, 1600 |  18, 9, -, 1800"));
+        runTest(
+                parseData(
+                        " 1, 10, +, 100 |  3, 20, +, 300 |  5, 30, -, 500 | "
+                                + " 7, 40, +, 700 |  9, 200, -, 900",
+                        "",
+                        " 12, 60, +, 1200 |  14, 70, -, 1400 |  16, 80, +, 1600 |  18, 90, -, 1800"));
+    }
+}