You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/13 08:55:19 UTC
[flink-table-store] 03/06: [FLINK-25628] Introduce ConcatRecordReader
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 610269895727efaa314d54253f5b1acfc3f769f5
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jan 12 19:44:01 2022 +0800
[FLINK-25628] Introduce ConcatRecordReader
Co-authored-by: Jane Chan <55...@users.noreply.github.com>
Co-authored-by: tsreaper <ts...@gmail.com>
---
.../file/mergetree/compact/ConcatRecordReader.java | 84 ++++++++++++++++++++++
.../mergetree/compact/ConcatRecordReaderTest.java | 67 +++++++++++++++++
2 files changed, 151 insertions(+)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java
new file mode 100644
index 0000000..e8c6e44
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * This reader is to concatenate a list of {@link RecordReader}s and read them sequentially. The
+ * input list is already sorted by key and sequence number, and the key intervals do not overlap
+ * each other.
+ */
+public class ConcatRecordReader implements RecordReader {
+
+ private final Queue<ReaderSupplier> queue;
+
+ private RecordReader current;
+
+ protected ConcatRecordReader(List<ReaderSupplier> readerFactories) {
+ readerFactories.forEach(
+ supplier ->
+ Preconditions.checkNotNull(supplier, "Reader factory must not be null."));
+ this.queue = new LinkedList<>(readerFactories);
+ }
+
+ public static RecordReader create(List<ReaderSupplier> readers) throws IOException {
+ return readers.size() == 1 ? readers.get(0).get() : new ConcatRecordReader(readers);
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator readBatch() throws IOException {
+ while (true) {
+ if (current != null) {
+ RecordIterator iterator = current.readBatch();
+ if (iterator != null) {
+ return iterator;
+ }
+ current.close();
+ current = null;
+ } else if (queue.size() > 0) {
+ current = queue.poll().get();
+ } else {
+ return null;
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (current != null) {
+ current.close();
+ }
+ }
+
+ /** Supplier to get {@link RecordReader}. */
+ @FunctionalInterface
+ public interface ReaderSupplier {
+ RecordReader get() throws IOException;
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReaderTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReaderTest.java
new file mode 100644
index 0000000..6ed2edf
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReaderTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.ReusingTestData;
+import org.apache.flink.table.store.file.utils.TestReusingRecordReader;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Tests for {@link ConcatRecordReader}. */
+public class ConcatRecordReaderTest extends CombiningRecordReaderTestBase {
+
+ @Override
+ protected boolean addOnly() {
+ return false;
+ }
+
+ @Override
+ protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
+ return input;
+ }
+
+ @Override
+ protected RecordReader createRecordReader(List<TestReusingRecordReader> readers) {
+ return new ConcatRecordReader(
+ readers.stream()
+ .map(r -> (ConcatRecordReader.ReaderSupplier) () -> r)
+ .collect(Collectors.toList()));
+ }
+
+ @Test
+ public void testSmallData() throws IOException {
+ runTest(
+ parseData(
+ "1, 1, +, 100 | 3, 2, +, 300 | 5, 3, -, 500 | "
+ + "7, 4, +, 700 | 9, 20, +, 900",
+ "",
+ "12, 6, +, 1200 | 14, 7, +, 1400 | 16, 8, -, 1600 | 18, 9, -, 1800"));
+ runTest(
+ parseData(
+ " 1, 10, +, 100 | 3, 20, +, 300 | 5, 30, -, 500 | "
+ + " 7, 40, +, 700 | 9, 200, -, 900",
+ "",
+ " 12, 60, +, 1200 | 14, 70, -, 1400 | 16, 80, +, 1600 | 18, 90, -, 1800"));
+ }
+}