You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/02/14 09:50:37 UTC
[flink-table-store] 03/05: [FLINK-25820] Introduce FileStoreSourceSplitReader
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit a7819f72ea94fce97552e5e4da56c91b92932929
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jan 26 18:28:31 2022 +0800
[FLINK-25820] Introduce FileStoreSourceSplitReader
---
.../source/FileStoreSourceSplitReader.java | 203 +++++++++++++++
.../source/FileStoreSourceSplitReaderTest.java | 284 +++++++++++++++++++++
.../store/connector/source/TestFileStoreRead.java | 117 +++++++++
3 files changed, 604 insertions(+)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
new file mode 100644
index 0000000..7db53e2
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.file.src.impl.FileRecords;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.utils.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/** The {@link SplitReader} implementation for the file store source. */
+public class FileStoreSourceSplitReader
+ implements SplitReader<RecordAndPosition<RowData>, FileStoreSourceSplit> {
+
+ private final FileStoreRead fileStoreRead;
+ private final boolean keyAsRecord;
+
+ private final Queue<FileStoreSourceSplit> splits;
+
+ private final Pool<FileStoreRecordIterator> pool;
+
+ @Nullable private RecordReader currentReader;
+ @Nullable private String currentSplitId;
+ private long currentNumRead;
+ private RecordReader.RecordIterator currentFirstBatch;
+
+ public FileStoreSourceSplitReader(FileStoreRead fileStoreRead, boolean keyAsRecord) {
+ this.fileStoreRead = fileStoreRead;
+ this.keyAsRecord = keyAsRecord;
+ this.splits = new LinkedList<>();
+ this.pool = new Pool<>(1);
+ this.pool.add(new FileStoreRecordIterator());
+ }
+
+ @Override
+ public RecordsWithSplitIds<RecordAndPosition<RowData>> fetch() throws IOException {
+ checkSplitOrStartNext();
+
+ // pool first, pool size is 1, the underlying implementation does not allow multiple batches
+ // to be read at the same time
+ FileStoreRecordIterator iterator = pool();
+
+ RecordReader.RecordIterator nextBatch;
+ if (currentFirstBatch != null) {
+ nextBatch = currentFirstBatch;
+ currentFirstBatch = null;
+ } else {
+ nextBatch = currentReader.readBatch();
+ }
+ if (nextBatch == null) {
+ pool.recycler().recycle(iterator);
+ return finishSplit();
+ }
+ return FileRecords.forRecords(currentSplitId, iterator.replace(nextBatch));
+ }
+
+ private FileStoreRecordIterator pool() throws IOException {
+ try {
+ return this.pool.pollEntry();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted");
+ }
+ }
+
+ @Override
+ public void handleSplitsChanges(SplitsChange<FileStoreSourceSplit> splitsChange) {
+ if (!(splitsChange instanceof SplitsAddition)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "The SplitChange type of %s is not supported.",
+ splitsChange.getClass()));
+ }
+
+ splits.addAll(splitsChange.splits());
+ }
+
+ @Override
+ public void wakeUp() {}
+
+ @Override
+ public void close() throws Exception {
+ if (currentReader != null) {
+ currentReader.close();
+ }
+ }
+
+ private void checkSplitOrStartNext() throws IOException {
+ if (currentReader != null) {
+ return;
+ }
+
+ final FileStoreSourceSplit nextSplit = splits.poll();
+ if (nextSplit == null) {
+ throw new IOException("Cannot fetch from another split - no split remaining");
+ }
+
+ currentSplitId = nextSplit.splitId();
+ currentReader =
+ fileStoreRead.createReader(
+ nextSplit.partition(), nextSplit.bucket(), nextSplit.files());
+ currentNumRead = nextSplit.recordsToSkip();
+ if (currentNumRead > 0) {
+ seek(currentNumRead);
+ }
+ }
+
+ private void seek(long toSkip) throws IOException {
+ while (true) {
+ RecordReader.RecordIterator nextBatch = currentReader.readBatch();
+ if (nextBatch == null) {
+ throw new RuntimeException(
+ String.format(
+ "skip(%s) more than the number of remaining elements.", toSkip));
+ }
+ while (toSkip > 0 && nextBatch.next() != null) {
+ toSkip--;
+ }
+ if (toSkip == 0) {
+ currentFirstBatch = nextBatch;
+ return;
+ }
+ nextBatch.releaseBatch();
+ }
+ }
+
+ private FileRecords<RowData> finishSplit() throws IOException {
+ if (currentReader != null) {
+ currentReader.close();
+ currentReader = null;
+ }
+
+ final FileRecords<RowData> finishRecords = FileRecords.finishedSplit(currentSplitId);
+ currentSplitId = null;
+ return finishRecords;
+ }
+
+ private class FileStoreRecordIterator implements BulkFormat.RecordIterator<RowData> {
+
+ private RecordReader.RecordIterator iterator;
+
+ private final MutableRecordAndPosition<RowData> recordAndPosition =
+ new MutableRecordAndPosition<>();
+
+ public FileStoreRecordIterator replace(RecordReader.RecordIterator iterator) {
+ this.iterator = iterator;
+ this.recordAndPosition.set(null, RecordAndPosition.NO_OFFSET, currentNumRead);
+ return this;
+ }
+
+ @Nullable
+ @Override
+ public RecordAndPosition<RowData> next() {
+ try {
+ KeyValue kv = iterator.next();
+ if (kv == null) {
+ return null;
+ }
+ recordAndPosition.setNext(keyAsRecord ? kv.key() : kv.value());
+ currentNumRead++;
+ return recordAndPosition;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void releaseBatch() {
+ this.iterator.releaseBatch();
+ pool.recycler().recycle(this);
+ }
+ }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
new file mode 100644
index 0000000..89068e6
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link FileStoreSourceSplitReader}. */
+public class FileStoreSourceSplitReaderTest {
+
+ private static ExecutorService service;
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private final AtomicInteger v = new AtomicInteger(0);
+
+ @BeforeAll
+ public static void before() {
+ service = Executors.newSingleThreadExecutor();
+ }
+
+ @AfterAll
+ public static void after() {
+ service.shutdownNow();
+ service = null;
+ }
+
+ @Test
+ public void testKeyAsRecord() throws Exception {
+ innerTestOnce(true);
+ }
+
+ @Test
+ public void testNonKeyAsRecord() throws Exception {
+ innerTestOnce(false);
+ }
+
+ private void innerTestOnce(boolean keyAsRecord) throws Exception {
+ TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service);
+ FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, keyAsRecord);
+
+ List<Tuple2<Integer, Integer>> input = kvs();
+ List<SstFileMeta> files = read.writeFiles(row(1), 0, input);
+
+ assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files));
+
+ RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch();
+ assertRecords(
+ records,
+ null,
+ "id1",
+ 0,
+ input.stream()
+ .map(tuple2 -> keyAsRecord ? tuple2._1 : tuple2._2)
+ .collect(Collectors.toList()));
+
+ records = reader.fetch();
+ assertRecords(records, "id1", "id1", 0, null);
+
+ reader.close();
+ }
+
+ @Test
+ public void testMultipleBatchInSplit() throws Exception {
+ TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service);
+ FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, false);
+
+ List<Tuple2<Integer, Integer>> input1 = kvs();
+ List<SstFileMeta> files = read.writeFiles(row(1), 0, input1);
+
+ List<Tuple2<Integer, Integer>> input2 = kvs();
+ List<SstFileMeta> files2 = read.writeFiles(row(1), 0, input2);
+ files.addAll(files2);
+
+ assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files));
+
+ RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch();
+ assertRecords(
+ records,
+ null,
+ "id1",
+ 0,
+ input1.stream().map(Tuple2::_2).collect(Collectors.toList()));
+
+ records = reader.fetch();
+ assertRecords(
+ records,
+ null,
+ "id1",
+ 5,
+ input2.stream().map(Tuple2::_2).collect(Collectors.toList()));
+
+ records = reader.fetch();
+ assertRecords(records, "id1", "id1", 0, null);
+
+ reader.close();
+ }
+
+ @Test
+ public void testRestore() throws Exception {
+ TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service);
+ FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, false);
+
+ List<Tuple2<Integer, Integer>> input = kvs();
+ List<SstFileMeta> files = read.writeFiles(row(1), 0, input);
+
+ assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files, 3));
+
+ RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch();
+ assertRecords(
+ records,
+ null,
+ "id1",
+ 3,
+ input.subList(3, input.size()).stream()
+ .map(Tuple2::_2)
+ .collect(Collectors.toList()));
+
+ records = reader.fetch();
+ assertRecords(records, "id1", "id1", 0, null);
+
+ reader.close();
+ }
+
+ @Test
+ public void testRestoreMultipleBatchInSplit() throws Exception {
+ TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service);
+ FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, false);
+
+ List<Tuple2<Integer, Integer>> input1 = kvs();
+ List<SstFileMeta> files = read.writeFiles(row(1), 0, input1);
+
+ List<Tuple2<Integer, Integer>> input2 = kvs();
+ List<SstFileMeta> files2 = read.writeFiles(row(1), 0, input2);
+ files.addAll(files2);
+
+ assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files, 7));
+
+ RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch();
+ assertRecords(
+ records,
+ null,
+ "id1",
+ 7,
+ input2.subList(2, input2.size()).stream()
+ .map(Tuple2::_2)
+ .collect(Collectors.toList()));
+
+ records = reader.fetch();
+ assertRecords(records, "id1", "id1", 0, null);
+
+ reader.close();
+ }
+
+ @Test
+ public void testMultipleSplits() throws Exception {
+ TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service);
+ FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, false);
+
+ List<Tuple2<Integer, Integer>> input1 = kvs();
+ List<SstFileMeta> files1 = read.writeFiles(row(1), 0, input1);
+ assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files1));
+
+ List<Tuple2<Integer, Integer>> input2 = kvs();
+ List<SstFileMeta> files2 = read.writeFiles(row(2), 1, input2);
+ assignSplit(reader, new FileStoreSourceSplit("id2", row(2), 1, files2));
+
+ RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch();
+ assertRecords(
+ records,
+ null,
+ "id1",
+ 0,
+ input1.stream().map(Tuple2::_2).collect(Collectors.toList()));
+
+ records = reader.fetch();
+ assertRecords(records, "id1", "id1", 0, null);
+
+ records = reader.fetch();
+ assertRecords(
+ records,
+ null,
+ "id2",
+ 0,
+ input2.stream().map(Tuple2::_2).collect(Collectors.toList()));
+
+ records = reader.fetch();
+ assertRecords(records, "id2", "id2", 0, null);
+
+ reader.close();
+ }
+
+ @Test
+ public void testNoSplit() throws Exception {
+ TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service);
+ FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, false);
+ assertThatThrownBy(reader::fetch).hasMessageContaining("no split remaining");
+ reader.close();
+ }
+
+ private void assertRecords(
+ RecordsWithSplitIds<RecordAndPosition<RowData>> records,
+ String finishedSplit,
+ String nextSplit,
+ long startRecordSkipCount,
+ List<Integer> expected) {
+ if (finishedSplit != null) {
+ assertThat(records.finishedSplits()).isEqualTo(Collections.singleton(finishedSplit));
+ return;
+ } else {
+ assertThat(records.finishedSplits()).isEmpty();
+ }
+
+ assertThat(records.nextSplit()).isEqualTo(nextSplit);
+
+ List<Integer> result = new ArrayList<>();
+ RecordAndPosition<RowData> record;
+ while ((record = records.nextRecordFromSplit()) != null) {
+ result.add(record.getRecord().getInt(0));
+ assertThat(record.getRecordSkipCount()).isEqualTo(++startRecordSkipCount);
+ }
+ records.recycle();
+
+ assertThat(result).isEqualTo(expected);
+ }
+
+ private List<Tuple2<Integer, Integer>> kvs() {
+ List<Tuple2<Integer, Integer>> kvs = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ kvs.add(new Tuple2<>(next(), next()));
+ }
+ return kvs;
+ }
+
+ private int next() {
+ return v.incrementAndGet();
+ }
+
+ private void assignSplit(FileStoreSourceSplitReader reader, FileStoreSourceSplit split) {
+ SplitsChange<FileStoreSourceSplit> splitsChange =
+ new SplitsAddition<>(Collections.singletonList(split));
+ reader.handleSplitsChanges(splitsChange);
+ }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestFileStoreRead.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestFileStoreRead.java
new file mode 100644
index 0000000..143d446
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestFileStoreRead.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import scala.Tuple2;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * Test {@link FileStoreRead}.
+ *
+ * <p>TODO: remove this, use FileStoreReadImpl.
+ */
+public class TestFileStoreRead implements FileStoreRead {
+
+ private static final Comparator<RowData> COMPARATOR = Comparator.comparingInt(o -> o.getInt(0));
+
+ private final FileStorePathFactory pathFactory;
+
+ private final ExecutorService service;
+
+ public TestFileStoreRead(Path root, ExecutorService service) {
+ this.pathFactory = new FileStorePathFactory(root, RowType.of(new IntType()), "default");
+ this.service = service;
+ }
+
+ @Override
+ public void withKeyProjection(int[][] projectedFields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void withValueProjection(int[][] projectedFields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RecordReader createReader(BinaryRowData partition, int bucket, List<SstFileMeta> files)
+ throws IOException {
+ MergeTree mergeTree = createMergeTree(partition, bucket);
+ List<List<SortedRun>> runs = new IntervalPartition(files, COMPARATOR).partition();
+ return mergeTree.createReader(runs, true);
+ }
+
+ private MergeTree createMergeTree(BinaryRowData partition, int bucket) {
+ MergeTreeOptions options = new MergeTreeOptions(new Configuration());
+ SstFile sstFile =
+ new SstFile.Factory(
+ new RowType(
+ singletonList(new RowType.RowField("k", new IntType()))),
+ new RowType(
+ singletonList(new RowType.RowField("v", new IntType()))),
+ FileFormat.fromIdentifier(
+ Thread.currentThread().getContextClassLoader(),
+ "avro",
+ new Configuration()),
+ pathFactory,
+ options.targetFileSize)
+ .create(partition, bucket);
+ return new MergeTree(options, sstFile, COMPARATOR, service, new DeduplicateAccumulator());
+ }
+
+ public List<SstFileMeta> writeFiles(
+ BinaryRowData partition, int bucket, List<Tuple2<Integer, Integer>> kvs)
+ throws Exception {
+ RecordWriter writer = createMergeTree(partition, bucket).createWriter(new ArrayList<>());
+ for (Tuple2<Integer, Integer> tuple2 : kvs) {
+ writer.write(ValueKind.ADD, GenericRowData.of(tuple2._1), GenericRowData.of(tuple2._2));
+ }
+ List<SstFileMeta> files = writer.prepareCommit().newFiles();
+ writer.close();
+ return new ArrayList<>(files);
+ }
+}