You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/02/14 09:50:37 UTC

[flink-table-store] 03/05: [FLINK-25820] Introduce FileStoreSourceSplitReader

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit a7819f72ea94fce97552e5e4da56c91b92932929
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jan 26 18:28:31 2022 +0800

    [FLINK-25820] Introduce FileStoreSourceSplitReader
---
 .../source/FileStoreSourceSplitReader.java         | 203 +++++++++++++++
 .../source/FileStoreSourceSplitReaderTest.java     | 284 +++++++++++++++++++++
 .../store/connector/source/TestFileStoreRead.java  | 117 +++++++++
 3 files changed, 604 insertions(+)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
new file mode 100644
index 0000000..7db53e2
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.file.src.impl.FileRecords;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.utils.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/** The {@link SplitReader} implementation for the file store source. */
+public class FileStoreSourceSplitReader
+        implements SplitReader<RecordAndPosition<RowData>, FileStoreSourceSplit> {
+
+    private final FileStoreRead fileStoreRead;
+    private final boolean keyAsRecord;
+
+    private final Queue<FileStoreSourceSplit> splits;
+
+    private final Pool<FileStoreRecordIterator> pool;
+
+    @Nullable private RecordReader currentReader;
+    @Nullable private String currentSplitId;
+    private long currentNumRead;
+    private RecordReader.RecordIterator currentFirstBatch;
+
+    public FileStoreSourceSplitReader(FileStoreRead fileStoreRead, boolean keyAsRecord) {
+        this.fileStoreRead = fileStoreRead;
+        this.keyAsRecord = keyAsRecord;
+        this.splits = new LinkedList<>();
+        this.pool = new Pool<>(1);
+        this.pool.add(new FileStoreRecordIterator());
+    }
+
+    @Override
+    public RecordsWithSplitIds<RecordAndPosition<RowData>> fetch() throws IOException {
+        checkSplitOrStartNext();
+
+        // pool first, pool size is 1, the underlying implementation does not allow multiple batches
+        // to be read at the same time
+        FileStoreRecordIterator iterator = pool();
+
+        RecordReader.RecordIterator nextBatch;
+        if (currentFirstBatch != null) {
+            nextBatch = currentFirstBatch;
+            currentFirstBatch = null;
+        } else {
+            nextBatch = currentReader.readBatch();
+        }
+        if (nextBatch == null) {
+            pool.recycler().recycle(iterator);
+            return finishSplit();
+        }
+        return FileRecords.forRecords(currentSplitId, iterator.replace(nextBatch));
+    }
+
+    private FileStoreRecordIterator pool() throws IOException {
+        try {
+            return this.pool.pollEntry();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Interrupted");
+        }
+    }
+
+    @Override
+    public void handleSplitsChanges(SplitsChange<FileStoreSourceSplit> splitsChange) {
+        if (!(splitsChange instanceof SplitsAddition)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "The SplitChange type of %s is not supported.",
+                            splitsChange.getClass()));
+        }
+
+        splits.addAll(splitsChange.splits());
+    }
+
+    @Override
+    public void wakeUp() {}
+
+    @Override
+    public void close() throws Exception {
+        if (currentReader != null) {
+            currentReader.close();
+        }
+    }
+
+    private void checkSplitOrStartNext() throws IOException {
+        if (currentReader != null) {
+            return;
+        }
+
+        final FileStoreSourceSplit nextSplit = splits.poll();
+        if (nextSplit == null) {
+            throw new IOException("Cannot fetch from another split - no split remaining");
+        }
+
+        currentSplitId = nextSplit.splitId();
+        currentReader =
+                fileStoreRead.createReader(
+                        nextSplit.partition(), nextSplit.bucket(), nextSplit.files());
+        currentNumRead = nextSplit.recordsToSkip();
+        if (currentNumRead > 0) {
+            seek(currentNumRead);
+        }
+    }
+
+    private void seek(long toSkip) throws IOException {
+        while (true) {
+            RecordReader.RecordIterator nextBatch = currentReader.readBatch();
+            if (nextBatch == null) {
+                throw new RuntimeException(
+                        String.format(
+                                "skip(%s) more than the number of remaining elements.", toSkip));
+            }
+            while (toSkip > 0 && nextBatch.next() != null) {
+                toSkip--;
+            }
+            if (toSkip == 0) {
+                currentFirstBatch = nextBatch;
+                return;
+            }
+            nextBatch.releaseBatch();
+        }
+    }
+
+    private FileRecords<RowData> finishSplit() throws IOException {
+        if (currentReader != null) {
+            currentReader.close();
+            currentReader = null;
+        }
+
+        final FileRecords<RowData> finishRecords = FileRecords.finishedSplit(currentSplitId);
+        currentSplitId = null;
+        return finishRecords;
+    }
+
+    private class FileStoreRecordIterator implements BulkFormat.RecordIterator<RowData> {
+
+        private RecordReader.RecordIterator iterator;
+
+        private final MutableRecordAndPosition<RowData> recordAndPosition =
+                new MutableRecordAndPosition<>();
+
+        public FileStoreRecordIterator replace(RecordReader.RecordIterator iterator) {
+            this.iterator = iterator;
+            this.recordAndPosition.set(null, RecordAndPosition.NO_OFFSET, currentNumRead);
+            return this;
+        }
+
+        @Nullable
+        @Override
+        public RecordAndPosition<RowData> next() {
+            try {
+                KeyValue kv = iterator.next();
+                if (kv == null) {
+                    return null;
+                }
+                recordAndPosition.setNext(keyAsRecord ? kv.key() : kv.value());
+                currentNumRead++;
+                return recordAndPosition;
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void releaseBatch() {
+            this.iterator.releaseBatch();
+            pool.recycler().recycle(this);
+        }
+    }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
new file mode 100644
index 0000000..89068e6
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link FileStoreSourceSplitReader}. */
+public class FileStoreSourceSplitReaderTest {
+
+    private static ExecutorService service;
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private final AtomicInteger v = new AtomicInteger(0);
+
+    @BeforeAll
+    public static void before() {
+        service = Executors.newSingleThreadExecutor();
+    }
+
+    @AfterAll
+    public static void after() {
+        service.shutdownNow();
+        service = null;
+    }
+
+    @Test
+    public void testKeyAsRecord() throws Exception {
+        innerTestOnce(true);
+    }
+
+    @Test
+    public void testNonKeyAsRecord() throws Exception {
+        innerTestOnce(false);
+    }
+
+    private void innerTestOnce(boolean keyAsRecord) throws Exception {
+        TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service);
+        FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, keyAsRecord);
+
+        List<Tuple2<Integer, Integer>> input = kvs();
+        List<SstFileMeta> files = read.writeFiles(row(1), 0, input);
+
+        assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files));
+
+        RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch();
+        assertRecords(
+                records,
+                null,
+                "id1",
+                0,
+                input.stream()
+                        .map(tuple2 -> keyAsRecord ? tuple2._1 : tuple2._2)
+                        .collect(Collectors.toList()));
+
+        records = reader.fetch();
+        assertRecords(records, "id1", "id1", 0, null);
+
+        reader.close();
+    }
+
+    @Test
+    public void testMultipleBatchInSplit() throws Exception {
+        TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service);
+        FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, false);
+
+        List<Tuple2<Integer, Integer>> input1 = kvs();
+        List<SstFileMeta> files = read.writeFiles(row(1), 0, input1);
+
+        List<Tuple2<Integer, Integer>> input2 = kvs();
+        List<SstFileMeta> files2 = read.writeFiles(row(1), 0, input2);
+        files.addAll(files2);
+
+        assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files));
+
+        RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch();
+        assertRecords(
+                records,
+                null,
+                "id1",
+                0,
+                input1.stream().map(Tuple2::_2).collect(Collectors.toList()));
+
+        records = reader.fetch();
+        assertRecords(
+                records,
+                null,
+                "id1",
+                5,
+                input2.stream().map(Tuple2::_2).collect(Collectors.toList()));
+
+        records = reader.fetch();
+        assertRecords(records, "id1", "id1", 0, null);
+
+        reader.close();
+    }
+
+    @Test
+    public void testRestore() throws Exception {
+        TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service);
+        FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, false);
+
+        List<Tuple2<Integer, Integer>> input = kvs();
+        List<SstFileMeta> files = read.writeFiles(row(1), 0, input);
+
+        assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files, 3));
+
+        RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch();
+        assertRecords(
+                records,
+                null,
+                "id1",
+                3,
+                input.subList(3, input.size()).stream()
+                        .map(Tuple2::_2)
+                        .collect(Collectors.toList()));
+
+        records = reader.fetch();
+        assertRecords(records, "id1", "id1", 0, null);
+
+        reader.close();
+    }
+
+    @Test
+    public void testRestoreMultipleBatchInSplit() throws Exception {
+        TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service);
+        FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, false);
+
+        List<Tuple2<Integer, Integer>> input1 = kvs();
+        List<SstFileMeta> files = read.writeFiles(row(1), 0, input1);
+
+        List<Tuple2<Integer, Integer>> input2 = kvs();
+        List<SstFileMeta> files2 = read.writeFiles(row(1), 0, input2);
+        files.addAll(files2);
+
+        assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files, 7));
+
+        RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch();
+        assertRecords(
+                records,
+                null,
+                "id1",
+                7,
+                input2.subList(2, input2.size()).stream()
+                        .map(Tuple2::_2)
+                        .collect(Collectors.toList()));
+
+        records = reader.fetch();
+        assertRecords(records, "id1", "id1", 0, null);
+
+        reader.close();
+    }
+
+    @Test
+    public void testMultipleSplits() throws Exception {
+        TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service);
+        FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, false);
+
+        List<Tuple2<Integer, Integer>> input1 = kvs();
+        List<SstFileMeta> files1 = read.writeFiles(row(1), 0, input1);
+        assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files1));
+
+        List<Tuple2<Integer, Integer>> input2 = kvs();
+        List<SstFileMeta> files2 = read.writeFiles(row(2), 1, input2);
+        assignSplit(reader, new FileStoreSourceSplit("id2", row(2), 1, files2));
+
+        RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch();
+        assertRecords(
+                records,
+                null,
+                "id1",
+                0,
+                input1.stream().map(Tuple2::_2).collect(Collectors.toList()));
+
+        records = reader.fetch();
+        assertRecords(records, "id1", "id1", 0, null);
+
+        records = reader.fetch();
+        assertRecords(
+                records,
+                null,
+                "id2",
+                0,
+                input2.stream().map(Tuple2::_2).collect(Collectors.toList()));
+
+        records = reader.fetch();
+        assertRecords(records, "id2", "id2", 0, null);
+
+        reader.close();
+    }
+
+    @Test
+    public void testNoSplit() throws Exception {
+        TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service);
+        FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, false);
+        assertThatThrownBy(reader::fetch).hasMessageContaining("no split remaining");
+        reader.close();
+    }
+
+    private void assertRecords(
+            RecordsWithSplitIds<RecordAndPosition<RowData>> records,
+            String finishedSplit,
+            String nextSplit,
+            long startRecordSkipCount,
+            List<Integer> expected) {
+        if (finishedSplit != null) {
+            assertThat(records.finishedSplits()).isEqualTo(Collections.singleton(finishedSplit));
+            return;
+        } else {
+            assertThat(records.finishedSplits()).isEmpty();
+        }
+
+        assertThat(records.nextSplit()).isEqualTo(nextSplit);
+
+        List<Integer> result = new ArrayList<>();
+        RecordAndPosition<RowData> record;
+        while ((record = records.nextRecordFromSplit()) != null) {
+            result.add(record.getRecord().getInt(0));
+            assertThat(record.getRecordSkipCount()).isEqualTo(++startRecordSkipCount);
+        }
+        records.recycle();
+
+        assertThat(result).isEqualTo(expected);
+    }
+
+    private List<Tuple2<Integer, Integer>> kvs() {
+        List<Tuple2<Integer, Integer>> kvs = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            kvs.add(new Tuple2<>(next(), next()));
+        }
+        return kvs;
+    }
+
+    private int next() {
+        return v.incrementAndGet();
+    }
+
+    private void assignSplit(FileStoreSourceSplitReader reader, FileStoreSourceSplit split) {
+        SplitsChange<FileStoreSourceSplit> splitsChange =
+                new SplitsAddition<>(Collections.singletonList(split));
+        reader.handleSplitsChanges(splitsChange);
+    }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestFileStoreRead.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestFileStoreRead.java
new file mode 100644
index 0000000..143d446
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestFileStoreRead.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import scala.Tuple2;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * Test {@link FileStoreRead}.
+ *
+ * <p>TODO: remove this, use FileStoreReadImpl.
+ */
+public class TestFileStoreRead implements FileStoreRead {
+
+    private static final Comparator<RowData> COMPARATOR = Comparator.comparingInt(o -> o.getInt(0));
+
+    private final FileStorePathFactory pathFactory;
+
+    private final ExecutorService service;
+
+    public TestFileStoreRead(Path root, ExecutorService service) {
+        this.pathFactory = new FileStorePathFactory(root, RowType.of(new IntType()), "default");
+        this.service = service;
+    }
+
+    @Override
+    public void withKeyProjection(int[][] projectedFields) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void withValueProjection(int[][] projectedFields) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public RecordReader createReader(BinaryRowData partition, int bucket, List<SstFileMeta> files)
+            throws IOException {
+        MergeTree mergeTree = createMergeTree(partition, bucket);
+        List<List<SortedRun>> runs = new IntervalPartition(files, COMPARATOR).partition();
+        return mergeTree.createReader(runs, true);
+    }
+
+    private MergeTree createMergeTree(BinaryRowData partition, int bucket) {
+        MergeTreeOptions options = new MergeTreeOptions(new Configuration());
+        SstFile sstFile =
+                new SstFile.Factory(
+                                new RowType(
+                                        singletonList(new RowType.RowField("k", new IntType()))),
+                                new RowType(
+                                        singletonList(new RowType.RowField("v", new IntType()))),
+                                FileFormat.fromIdentifier(
+                                        Thread.currentThread().getContextClassLoader(),
+                                        "avro",
+                                        new Configuration()),
+                                pathFactory,
+                                options.targetFileSize)
+                        .create(partition, bucket);
+        return new MergeTree(options, sstFile, COMPARATOR, service, new DeduplicateAccumulator());
+    }
+
+    public List<SstFileMeta> writeFiles(
+            BinaryRowData partition, int bucket, List<Tuple2<Integer, Integer>> kvs)
+            throws Exception {
+        RecordWriter writer = createMergeTree(partition, bucket).createWriter(new ArrayList<>());
+        for (Tuple2<Integer, Integer> tuple2 : kvs) {
+            writer.write(ValueKind.ADD, GenericRowData.of(tuple2._1), GenericRowData.of(tuple2._2));
+        }
+        List<SstFileMeta> files = writer.prepareCommit().newFiles();
+        writer.close();
+        return new ArrayList<>(files);
+    }
+}