You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/13 08:55:16 UTC

[flink-table-store] branch master updated (8845586 -> 7ac835f)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git.


    from 8845586  [FLINK-25627] Add MemTable and Accumulator
     new 6194ecb  [FLINK-25628] Introduce RecordReader interface
     new 77c61b3  [FLINK-25628] Introduce SortMergeReader
     new 6102698  [FLINK-25628] Introduce ConcatRecordReader
     new 4ddd90c  [FLINK-25628] Introduce FieldStats
     new 9182e60  [FLINK-25628] Introduce SstPathFactory
     new 7ac835f  [FLINK-25628] Introduce SstFile and SstFileMeta

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../file/mergetree/compact/ConcatRecordReader.java |  84 +++++++
 .../file/mergetree/compact/SortMergeReader.java    | 213 ++++++++++++++++
 .../table/store/file/mergetree/sst/SstFile.java    | 269 +++++++++++++++++++++
 .../store/file/mergetree/sst/SstFileMeta.java      | 186 ++++++++++++++
 .../file/mergetree/sst/SstFileMetaSerializer.java  |  69 ++++++
 .../store/file/mergetree/sst/SstPathFactory.java   |  59 +++++
 .../flink/table/store/file/stats/FieldStats.java   |  70 ++++++
 .../file/stats/FieldStatsArraySerializer.java      | 100 ++++++++
 .../store/file/stats/FieldStatsCollector.java      |  82 +++++++
 .../flink/table/store/file/utils/FileUtils.java    |  76 ++++++
 .../flink/table/store/file/utils/RecordReader.java |  63 +++++
 .../store/file/utils/RecordReaderIterator.java     |  94 +++++++
 .../compact/CombiningRecordReaderTestBase.java     | 106 ++++++++
 .../mergetree/compact/ConcatRecordReaderTest.java  |  67 +++++
 .../mergetree/compact/SortMergeReaderTestBase.java | 123 ++++++++++
 .../mergetree/sst/SstFileMetaSerializerTest.java}  |  38 +--
 .../store/file/mergetree/sst/SstFileTest.java      | 245 +++++++++++++++++++
 .../file/mergetree/sst/SstPathFactoryTest.java     |  66 +++++
 .../file/mergetree/sst/SstTestDataGenerator.java   | 178 ++++++++++++++
 .../file/stats/FieldStatsArraySerializerTest.java  |  62 +++++
 .../store/file/stats/FieldStatsCollectorTest.java  |  98 ++++++++
 .../file/utils/FailingAtomicRenameFileSystem.java  | 143 +++++++++++
 .../store/file/utils/FileStorePathFactory.java     | 112 +++++++++
 .../file/utils/TestAtomicRenameFileSystem.java     | 121 +++++++++
 .../store/file/utils/TestReusingRecordReader.java  | 117 +++++++++
 .../org.apache.flink.core.fs.FileSystemFactory     |  17 ++
 26 files changed, 2832 insertions(+), 26 deletions(-)
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstPathFactory.java
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStats.java
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java
 create mode 100644 flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CombiningRecordReaderTestBase.java
 create mode 100644 flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReaderTest.java
 create mode 100644 flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java
 copy flink-table-store-core/src/{main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateAccumulator.java => test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializerTest.java} (52%)
 create mode 100644 flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
 create mode 100644 flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstPathFactoryTest.java
 create mode 100644 flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
 create mode 100644 flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
 create mode 100644 flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
 create mode 100644 flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
 create mode 100644 flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
 create mode 100644 flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestAtomicRenameFileSystem.java
 create mode 100644 flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestReusingRecordReader.java
 create mode 100644 flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory

[flink-table-store] 01/06: [FLINK-25628] Introduce RecordReader interface

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 6194ecba528d0d257bc1123ee1b1e3f5666b493e
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jan 12 19:40:12 2022 +0800

    [FLINK-25628] Introduce RecordReader interface
    
    Co-authored-by: tsreaper <ts...@gmail.com>
    Co-authored-by: Jane Chan <55...@users.noreply.github.com>
---
 .../flink/table/store/file/utils/RecordReader.java | 64 +++++++++++++++
 .../store/file/utils/RecordReaderIterator.java     | 95 ++++++++++++++++++++++
 2 files changed, 159 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java
new file mode 100644
index 0000000..89194e1
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.store.file.KeyValue;
+
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+/** The reader that reads the batches of records. */
+public interface RecordReader extends Closeable {
+
+    /**
+     * Reads one batch. The method should return null when reaching the end of the input.
+     *
+     * <p>The returned iterator object and any contained objects may be held onto by the source for
+     * some time, so it should not be immediately reused by the reader.
+     */
+    @Nullable
+    RecordIterator readBatch() throws IOException;
+
+    /** Closes the reader and should release all resources. */
+    @Override
+    void close() throws IOException;
+
+    /**
+     * An internal iterator interface which presents a more restrictive API than {@link Iterator}.
+     */
+    interface RecordIterator {
+
+        /**
+         * Gets the next record from the iterator. Returns null if this iterator has no more
+         * elements.
+         */
+        KeyValue next() throws IOException;
+
+        /**
+         * Releases the batch that this iterator iterated over. This is not supposed to close the
+         * reader and its resources, but is simply a signal that this iterator is not used anymore.
+         * This method can be used as a hook to recycle/reuse heavyweight object structures.
+         */
+        void releaseBatch();
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java
new file mode 100644
index 0000000..58ad8e2
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.util.CloseableIterator;
+
+
+import java.io.IOException;
+
+/** Wrap a {@link RecordReader} as an {@link CloseableIterator}. */
+public class RecordReaderIterator implements CloseableIterator<KeyValue> {
+
+    private final RecordReader reader;
+    private RecordReader.RecordIterator currentIterator;
+    private boolean advanced;
+    private KeyValue currentResult;
+
+    public RecordReaderIterator(RecordReader reader) {
+        this.reader = reader;
+        try {
+            this.currentIterator = reader.readBatch();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        this.advanced = false;
+        this.currentResult = null;
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (currentIterator == null) {
+            return false;
+        }
+        advanceIfNeeded();
+        return currentResult != null;
+    }
+
+    @Override
+    public KeyValue next() {
+        if (!hasNext()) {
+            return null;
+        }
+        advanced = false;
+        return currentResult;
+    }
+
+    private void advanceIfNeeded() {
+        if (advanced) {
+            return;
+        }
+        advanced = true;
+
+        try {
+            while (true) {
+                currentResult = currentIterator.next();
+                if (currentResult != null) {
+                    break;
+                } else {
+                    currentIterator.releaseBatch();
+                    currentIterator = reader.readBatch();
+                    if (currentIterator == null) {
+                        break;
+                    }
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (currentIterator != null) {
+            currentIterator.releaseBatch();
+        }
+        reader.close();
+    }
+}

[flink-table-store] 02/06: [FLINK-25628] Introduce SortMergeReader

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 77c61b33d0aa7c7a11dd9ace8159db49f803aa9b
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jan 12 19:42:52 2022 +0800

    [FLINK-25628] Introduce SortMergeReader
    
    Co-authored-by: Jane Chan <55...@users.noreply.github.com>
    Co-authored-by: tsreaper <ts...@gmail.com>
---
 .../file/mergetree/compact/SortMergeReader.java    | 213 +++++++++++++++++++++
 .../flink/table/store/file/utils/RecordReader.java |   1 -
 .../store/file/utils/RecordReaderIterator.java     |   1 -
 .../compact/CombiningRecordReaderTestBase.java     | 106 ++++++++++
 .../mergetree/compact/SortMergeReaderTestBase.java | 123 ++++++++++++
 .../store/file/utils/TestReusingRecordReader.java  | 117 +++++++++++
 6 files changed, 559 insertions(+), 2 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
new file mode 100644
index 0000000..0f3c184
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * This reader is to read a list of {@link RecordReader}, which is already sorted by key and
+ * sequence number, and perform a sort merge algorithm. {@link KeyValue}s with the same key will
+ * also be combined during sort merging.
+ *
+ * <p>NOTE: {@link KeyValue}s from the same {@link RecordReader} must not contain the same key.
+ */
+public class SortMergeReader implements RecordReader {
+
+    private final List<RecordReader> nextBatchReaders;
+    private final Comparator<RowData> userKeyComparator;
+    private final Accumulator accumulator;
+
+    private final PriorityQueue<Element> minHeap;
+    private final List<Element> polled;
+
+    protected SortMergeReader(
+            List<RecordReader> readers,
+            Comparator<RowData> userKeyComparator,
+            Accumulator accumulator) {
+        this.nextBatchReaders = new ArrayList<>(readers);
+        this.userKeyComparator = userKeyComparator;
+        this.accumulator = accumulator;
+
+        this.minHeap =
+                new PriorityQueue<>(
+                        (e1, e2) -> {
+                            int result = userKeyComparator.compare(e1.kv.key(), e2.kv.key());
+                            if (result != 0) {
+                                return result;
+                            }
+                            return Long.compare(e1.kv.sequenceNumber(), e2.kv.sequenceNumber());
+                        });
+        this.polled = new ArrayList<>();
+    }
+
+    public static RecordReader create(
+            List<RecordReader> readers,
+            Comparator<RowData> userKeyComparator,
+            Accumulator accumulator) {
+        return readers.size() == 1
+                ? readers.get(0)
+                : new SortMergeReader(readers, userKeyComparator, accumulator);
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator readBatch() throws IOException {
+        for (RecordReader reader : nextBatchReaders) {
+            while (true) {
+                RecordIterator iterator = reader.readBatch();
+                if (iterator == null) {
+                    // no more batches, permanently remove this reader
+                    reader.close();
+                    break;
+                }
+                KeyValue kv = iterator.next();
+                if (kv == null) {
+                    // empty iterator, clean up and try next batch
+                    iterator.releaseBatch();
+                } else {
+                    // found next kv
+                    minHeap.offer(new Element(kv, iterator, reader));
+                    break;
+                }
+            }
+        }
+        nextBatchReaders.clear();
+
+        return minHeap.isEmpty() ? null : new SortMergeIterator();
+    }
+
+    @Override
+    public void close() throws IOException {
+        for (RecordReader reader : nextBatchReaders) {
+            reader.close();
+        }
+        for (Element element : minHeap) {
+            element.iterator.releaseBatch();
+            element.reader.close();
+        }
+        for (Element element : polled) {
+            element.iterator.releaseBatch();
+            element.reader.close();
+        }
+    }
+
+    /** The iterator iterates on {@link SortMergeReader}. */
+    private class SortMergeIterator implements RecordIterator {
+
+        private boolean released = false;
+
+        @Override
+        public KeyValue next() throws IOException {
+            while (true) {
+                boolean hasMore = nextImpl();
+                if (!hasMore) {
+                    return null;
+                }
+                RowData accumulatedValue = accumulator.getValue();
+                if (accumulatedValue != null) {
+                    return polled.get(polled.size() - 1).kv.setValue(accumulatedValue);
+                }
+            }
+        }
+
+        private boolean nextImpl() throws IOException {
+            Preconditions.checkState(
+                    !released, "SortMergeIterator#advanceNext is called after release");
+            Preconditions.checkState(
+                    nextBatchReaders.isEmpty(),
+                    "SortMergeIterator#advanceNext is called even if the last call returns null. "
+                            + "This is a bug.");
+
+            // add previously polled elements back to priority queue
+            for (Element element : polled) {
+                if (element.update()) {
+                    // still kvs left, add back to priority queue
+                    minHeap.offer(element);
+                } else {
+                    // reach end of batch, clean up
+                    element.iterator.releaseBatch();
+                    nextBatchReaders.add(element.reader);
+                }
+            }
+            polled.clear();
+
+            // there are readers reaching end of batch, so we end current batch
+            if (!nextBatchReaders.isEmpty()) {
+                return false;
+            }
+
+            accumulator.reset();
+            RowData key =
+                    Preconditions.checkNotNull(minHeap.peek(), "Min heap is empty. This is a bug.")
+                            .kv
+                            .key();
+
+            // fetch all elements with the same key
+            // note that the same iterator should not produce the same keys, so this code is correct
+            while (!minHeap.isEmpty()) {
+                Element element = minHeap.peek();
+                if (userKeyComparator.compare(key, element.kv.key()) != 0) {
+                    break;
+                }
+                minHeap.poll();
+                accumulator.add(element.kv.value());
+                polled.add(element);
+            }
+            return true;
+        }
+
+        @Override
+        public void releaseBatch() {
+            released = true;
+        }
+    }
+
+    private static class Element {
+        private KeyValue kv;
+        private final RecordIterator iterator;
+        private final RecordReader reader;
+
+        private Element(KeyValue kv, RecordIterator iterator, RecordReader reader) {
+            this.kv = kv;
+            this.iterator = iterator;
+            this.reader = reader;
+        }
+
+        // IMPORTANT: Must not call this for elements still in priority queue!
+        private boolean update() throws IOException {
+            KeyValue nextKv = iterator.next();
+            if (nextKv == null) {
+                return false;
+            }
+            kv = nextKv;
+            return true;
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java
index 89194e1..5d9113d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.file.utils;
 
 import org.apache.flink.table.store.file.KeyValue;
 
-
 import javax.annotation.Nullable;
 
 import java.io.Closeable;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java
index 58ad8e2..8353965 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.utils;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.util.CloseableIterator;
 
-
 import java.io.IOException;
 
 /** Wrap a {@link RecordReader} as an {@link CloseableIterator}. */
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CombiningRecordReaderTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CombiningRecordReaderTestBase.java
new file mode 100644
index 0000000..6fb2ac6
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CombiningRecordReaderTestBase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.ReusingTestData;
+import org.apache.flink.table.store.file.utils.TestReusingRecordReader;
+
+import org.junit.jupiter.api.RepeatedTest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RecordReader}s which combines several other {@link RecordReader}s. */
+public abstract class CombiningRecordReaderTestBase {
+
+    protected static final RecordComparator KEY_COMPARATOR =
+            (a, b) -> Integer.compare(a.getInt(0), b.getInt(0));
+
+    protected abstract boolean addOnly();
+
+    protected abstract List<ReusingTestData> getExpected(List<ReusingTestData> input);
+
+    protected abstract RecordReader createRecordReader(List<TestReusingRecordReader> readers);
+
+    @RepeatedTest(100)
+    public void testRandom() throws IOException {
+        runTest(generateRandomData());
+    }
+
+    protected List<List<ReusingTestData>> parseData(String... stringsData) {
+        List<List<ReusingTestData>> readersData = new ArrayList<>();
+        for (String stringData : stringsData) {
+            readersData.add(ReusingTestData.parse(stringData));
+        }
+        return readersData;
+    }
+
+    protected List<List<ReusingTestData>> generateRandomData() {
+        Random random = new Random();
+        int numReaders = random.nextInt(20) + 1;
+        List<List<ReusingTestData>> readersData = new ArrayList<>();
+        for (int i = 0; i < numReaders; i++) {
+            readersData.add(
+                    ReusingTestData.generateOrderedNoDuplicatedKeys(
+                            random.nextInt(100) + 1, addOnly()));
+        }
+        return readersData;
+    }
+
+    protected void runTest(List<List<ReusingTestData>> readersData) throws IOException {
+        Iterator<ReusingTestData> expectedIterator =
+                getExpected(
+                                readersData.stream()
+                                        .flatMap(Collection::stream)
+                                        .collect(Collectors.toList()))
+                        .iterator();
+        List<TestReusingRecordReader> readers = new ArrayList<>();
+        for (List<ReusingTestData> readerData : readersData) {
+            readers.add(new TestReusingRecordReader(readerData));
+        }
+        RecordReader recordReader = createRecordReader(readers);
+
+        RecordReader.RecordIterator batch;
+        while ((batch = recordReader.readBatch()) != null) {
+            KeyValue kv;
+            while ((kv = batch.next()) != null) {
+                assertThat(expectedIterator.hasNext()).isTrue();
+                ReusingTestData expected = expectedIterator.next();
+                expected.assertEquals(kv);
+            }
+            batch.releaseBatch();
+        }
+        assertThat(expectedIterator.hasNext()).isFalse();
+        recordReader.close();
+
+        for (TestReusingRecordReader reader : readers) {
+            reader.assertCleanUp();
+        }
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java
new file mode 100644
index 0000000..d77fba0
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReaderTestBase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.ReusingTestData;
+import org.apache.flink.table.store.file.utils.TestReusingRecordReader;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Tests for {@link SortMergeReader}. */
+public abstract class SortMergeReaderTestBase extends CombiningRecordReaderTestBase {
+
+    protected abstract Accumulator createAccumulator();
+
+    @Override
+    protected RecordReader createRecordReader(List<TestReusingRecordReader> readers) {
+        return new SortMergeReader(new ArrayList<>(readers), KEY_COMPARATOR, createAccumulator());
+    }
+
+    @Test
+    public void testEmpty() throws IOException {
+        runTest(parseData(""));
+        runTest(parseData("", "", ""));
+    }
+
+    @Test
+    public void testAlternateKeys() throws IOException {
+        runTest(
+                parseData(
+                        "1, 1, +, 100 | 3, 2, +, 300 | 5, 3, +, 200 | 7, 4, +, 600 | 9, 20, +, 400",
+                        "0, 5, +, 0",
+                        "0, 10, +, 0",
+                        "",
+                        "2, 6, +, 200 | 4, 7, +, 400 | 6, 8, +, 600 | 8, 9, +, 800"));
+    }
+
+    @Test
+    public void testDuplicateKeys() throws IOException {
+        runTest(parseData("1, 1, +, 100 | 3, 3, +, 300", "1, 4, +, 200 | 3, 5, +, 300"));
+    }
+
+    @Test
+    public void testLongTailRecords() throws IOException {
+        runTest(
+                parseData(
+                        "1, 1, +, 100 | 2, 500, +, 200",
+                        "1, 3, +, 100 | 3, 4, +, 300 | 5, 501, +, 500 | 7, 503, +, 700 | "
+                                + "8, 504, +, 800 | 9, 505, +, 900 | 10, 506, +, 1000 | "
+                                + "11, 507, +, 1100 | 12, 508, +, 1200 | 13, 509, +, 1300"));
+    }
+
+    /** Tests for {@link SortMergeReader} with {@link DeduplicateAccumulator}. */
+    public static class WithDeduplicateAccumulator extends SortMergeReaderTestBase {
+
+        @Override
+        protected boolean addOnly() {
+            return false;
+        }
+
+        @Override
+        protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
+            return AccumulatorTestUtils.getExpectedForDeduplicate(input);
+        }
+
+        @Override
+        protected Accumulator createAccumulator() {
+            return new DeduplicateAccumulator();
+        }
+    }
+
+    /** Tests for {@link SortMergeReader} with {@link ValueCountAccumulator}. */
+    public static class WithValueRecordAccumulatorTest extends SortMergeReaderTestBase {
+
+        @Override
+        protected boolean addOnly() {
+            return true;
+        }
+
+        @Override
+        protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
+            return AccumulatorTestUtils.getExpectedForValueCount(input);
+        }
+
+        @Override
+        protected Accumulator createAccumulator() {
+            return new ValueCountAccumulator();
+        }
+
+        @Test
+        public void testCancelingRecords() throws IOException {
+            runTest(
+                    parseData(
+                            "1, 1, +, 100 | 3, 5, +, -300 | 5, 300, +, 300",
+                            "",
+                            "1, 4, +, -200 | 3, 3, +, 300",
+                            "5, 100, +, -200 | 7, 123, +, -500",
+                            "7, 321, +, 200",
+                            "7, 456, +, 300"));
+            runTest(parseData("1, 2, +, 100", "1, 1, +, -100"));
+        }
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestReusingRecordReader.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestReusingRecordReader.java
new file mode 100644
index 0000000..a59ef95
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestReusingRecordReader.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.store.file.KeyValue;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * A testing {@link RecordReader} using {@link ReusingTestData} which produces batches of random
+ * sizes (possibly empty). {@link KeyValue}s produced by the same reader is reused to test that
+ * other components correctly handles the reusing.
+ */
+public class TestReusingRecordReader implements RecordReader {
+
+    private final List<ReusingTestData> testData;
+    private final ReusingKeyValue reuse;
+
+    private final List<TestRecordIterator> producedBatches;
+    private final Random random;
+
+    private int nextLowerBound;
+    private boolean closed;
+
+    public TestReusingRecordReader(List<ReusingTestData> testData) {
+        this.testData = testData;
+        this.reuse = new ReusingKeyValue();
+
+        this.producedBatches = new ArrayList<>();
+        this.random = new Random();
+
+        this.nextLowerBound = 0;
+        this.closed = false;
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator readBatch() {
+        assertThat(nextLowerBound != -1).isTrue();
+        if (nextLowerBound == testData.size() && random.nextBoolean()) {
+            nextLowerBound = -1;
+            return null;
+        }
+        int upperBound = random.nextInt(testData.size() - nextLowerBound + 1) + nextLowerBound;
+        TestRecordIterator iterator = new TestRecordIterator(nextLowerBound, upperBound);
+        nextLowerBound = upperBound;
+        producedBatches.add(iterator);
+        return iterator;
+    }
+
+    @Override
+    public void close() throws IOException {
+        closed = true;
+    }
+
+    public void assertCleanUp() {
+        assertThat(closed).isTrue();
+        for (TestRecordIterator iterator : producedBatches) {
+            assertThat(iterator.released).isTrue();
+        }
+    }
+
+    private class TestRecordIterator implements RecordIterator {
+
+        private final int upperBound;
+
+        private int next;
+        private boolean released;
+
+        private TestRecordIterator(int lowerBound, int upperBound) {
+            this.upperBound = upperBound;
+
+            this.next = lowerBound;
+            this.released = false;
+        }
+
+        @Override
+        public KeyValue next() throws IOException {
+            assertThat(next != -1).isTrue();
+            if (next == upperBound) {
+                next = -1;
+                return null;
+            }
+            KeyValue result = reuse.update(testData.get(next));
+            next++;
+            return result;
+        }
+
+        @Override
+        public void releaseBatch() {
+            this.released = true;
+        }
+    }
+}

[flink-table-store] 05/06: [FLINK-25628] Introduce SstPathFactory

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 9182e60c8ba51616de2905fd0dbb6e2d4b66d055
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jan 12 19:51:13 2022 +0800

    [FLINK-25628] Introduce SstPathFactory
    
    Co-authored-by: tsreaper <ts...@gmail.com>
---
 .../store/file/mergetree/sst/SstPathFactory.java   | 59 +++++++++++++++++++
 .../file/mergetree/sst/SstPathFactoryTest.java     | 66 ++++++++++++++++++++++
 2 files changed, 125 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstPathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstPathFactory.java
new file mode 100644
index 0000000..cfda5c3
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstPathFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+
+/** Factory which produces new {@link Path}s for sst files. */
+public class SstPathFactory {
+
+    private final Path bucketDir;
+    private final String uuid;
+
+    private int pathCount;
+
+    public SstPathFactory(Path root, @Nullable String partition, int bucket) {
+        if (partition == null) {
+            this.bucketDir = new Path(root + "/bucket-" + bucket);
+        } else {
+            this.bucketDir = new Path(root + "/" + partition + "/bucket-" + bucket);
+        }
+        this.uuid = UUID.randomUUID().toString();
+
+        this.pathCount = 0;
+    }
+
+    public Path newPath() {
+        return new Path(bucketDir + "/sst-" + uuid + "-" + (pathCount++));
+    }
+
+    public Path toPath(String fileName) {
+        return new Path(bucketDir + "/" + fileName);
+    }
+
+    @VisibleForTesting
+    public String uuid() {
+        return uuid;
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstPathFactoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstPathFactoryTest.java
new file mode 100644
index 0000000..315e85c
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstPathFactoryTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SstPathFactory}. */
+public class SstPathFactoryTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testNoPartition() {
+        SstPathFactory pathFactory = new SstPathFactory(new Path(tempDir.toString()), null, 123);
+        String uuid = pathFactory.uuid();
+
+        for (int i = 0; i < 20; i++) {
+            assertThat(pathFactory.newPath())
+                    .isEqualTo(new Path(tempDir.toString() + "/bucket-123/sst-" + uuid + "-" + i));
+        }
+        assertThat(pathFactory.toPath("my-sst-file-name"))
+                .isEqualTo(new Path(tempDir.toString() + "/bucket-123/my-sst-file-name"));
+    }
+
+    @Test
+    public void testWithPartition() {
+        SstPathFactory pathFactory =
+                new SstPathFactory(new Path(tempDir.toString()), "dt=20211224", 123);
+        String uuid = pathFactory.uuid();
+
+        for (int i = 0; i < 20; i++) {
+            assertThat(pathFactory.newPath())
+                    .isEqualTo(
+                            new Path(
+                                    tempDir.toString()
+                                            + "/dt=20211224/bucket-123/sst-"
+                                            + uuid
+                                            + "-"
+                                            + i));
+        }
+        assertThat(pathFactory.toPath("my-sst-file-name"))
+                .isEqualTo(
+                        new Path(tempDir.toString() + "/dt=20211224/bucket-123/my-sst-file-name"));
+    }
+}

[flink-table-store] 04/06: [FLINK-25628] Introduce FieldStats

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 4ddd90cc69eed8178901c44222e6ebbc06bb87a1
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jan 12 19:38:20 2022 +0800

    [FLINK-25628] Introduce FieldStats
    
    Co-authored-by: tsreaper <ts...@gmail.com>
---
 .../flink/table/store/file/stats/FieldStats.java   |  70 +++++++++++++++
 .../file/stats/FieldStatsArraySerializer.java      | 100 +++++++++++++++++++++
 .../store/file/stats/FieldStatsCollector.java      |  82 +++++++++++++++++
 .../file/stats/FieldStatsArraySerializerTest.java  |  62 +++++++++++++
 .../store/file/stats/FieldStatsCollectorTest.java  |  98 ++++++++++++++++++++
 5 files changed, 412 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStats.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStats.java
new file mode 100644
index 0000000..62843bf
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStats.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** Statistics for each field. */
+public class FieldStats {
+
+    @Nullable private final Object minValue;
+    @Nullable private final Object maxValue;
+    private final long nullCount;
+
+    public FieldStats(Object minValue, Object maxValue, long nullCount) {
+        this.minValue = minValue;
+        this.maxValue = maxValue;
+        this.nullCount = nullCount;
+    }
+
+    public Object minValue() {
+        return minValue;
+    }
+
+    public Object maxValue() {
+        return maxValue;
+    }
+
+    public long nullCount() {
+        return nullCount;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof FieldStats)) {
+            return false;
+        }
+        FieldStats that = (FieldStats) o;
+        return Objects.equals(minValue, that.minValue)
+                && Objects.equals(maxValue, that.maxValue)
+                && nullCount == that.nullCount;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(minValue, maxValue, nullCount);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{%s, %s, %d}", minValue, maxValue, nullCount);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
new file mode 100644
index 0000000..6a7cc7f
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** Serializer for array of {@link FieldStats}. */
+public class FieldStatsArraySerializer extends ObjectSerializer<FieldStats[]> {
+
+    private final RowData.FieldGetter[] fieldGetters;
+
+    public FieldStatsArraySerializer(RowType rowType) {
+        super(schema(rowType));
+        this.fieldGetters = createFieldGetters(toAllFieldsNullableRowType(rowType));
+    }
+
+    @Override
+    public RowData toRow(FieldStats[] stats) {
+        int rowFieldCount = stats.length;
+        GenericRowData minValues = new GenericRowData(rowFieldCount);
+        GenericRowData maxValues = new GenericRowData(rowFieldCount);
+        long[] nullCounts = new long[rowFieldCount];
+        for (int i = 0; i < rowFieldCount; i++) {
+            minValues.setField(i, stats[i].minValue());
+            maxValues.setField(i, stats[i].maxValue());
+            nullCounts[i] = stats[i].nullCount();
+        }
+        return GenericRowData.of(minValues, maxValues, new GenericArrayData(nullCounts));
+    }
+
+    @Override
+    public FieldStats[] fromRow(RowData row) {
+        int rowFieldCount = fieldGetters.length;
+        RowData minValues = row.getRow(0, rowFieldCount);
+        RowData maxValues = row.getRow(1, rowFieldCount);
+        long[] nullValues = row.getArray(2).toLongArray();
+
+        FieldStats[] stats = new FieldStats[rowFieldCount];
+        for (int i = 0; i < rowFieldCount; i++) {
+            stats[i] =
+                    new FieldStats(
+                            fieldGetters[i].getFieldOrNull(minValues),
+                            fieldGetters[i].getFieldOrNull(maxValues),
+                            nullValues[i]);
+        }
+        return stats;
+    }
+
+    public static RowType schema(RowType rowType) {
+        rowType = toAllFieldsNullableRowType(rowType);
+        List<RowType.RowField> fields = new ArrayList<>();
+        fields.add(new RowType.RowField("_MIN_VALUES", rowType));
+        fields.add(new RowType.RowField("_MAX_VALUES", rowType));
+        fields.add(new RowType.RowField("_NULL_COUNTS", new ArrayType(new BigIntType(false))));
+        return new RowType(fields);
+    }
+
+    public static RowData.FieldGetter[] createFieldGetters(RowType rowType) {
+        return IntStream.range(0, rowType.getFieldCount())
+                .mapToObj(i -> RowData.createFieldGetter(rowType.getTypeAt(i), i))
+                .toArray(RowData.FieldGetter[]::new);
+    }
+
+    private static RowType toAllFieldsNullableRowType(RowType rowType) {
+        // as stated in SstFile.RollingFile#finish, field stats are not collected currently so
+        // min/max values are all nulls
+        return RowType.of(
+                rowType.getFields().stream()
+                        .map(f -> f.getType().copy(true))
+                        .toArray(LogicalType[]::new),
+                rowType.getFieldNames().toArray(new String[0]));
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
new file mode 100644
index 0000000..d7fa281
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+/** Collector to extract statistics of each fields from a series of records. */
+public class FieldStatsCollector {
+
+    private final Object[] minValues;
+    private final Object[] maxValues;
+    private final long[] nullCounts;
+
+    private final RowData.FieldGetter[] fieldGetters;
+
+    public FieldStatsCollector(RowType rowType) {
+        int numFields = rowType.getFieldCount();
+        this.minValues = new Object[numFields];
+        this.maxValues = new Object[numFields];
+        this.nullCounts = new long[numFields];
+        this.fieldGetters = FieldStatsArraySerializer.createFieldGetters(rowType);
+    }
+
+    /**
+     * Update the statistics with a new row data.
+     *
+     * <p><b>IMPORTANT</b>: Fields of this row should NOT be reused, as they're directly stored in
+     * the collector.
+     */
+    public void collect(RowData row) {
+        Preconditions.checkArgument(
+                fieldGetters.length == row.getArity(),
+                "Expecting row data with %d fields but found row data with %d fields",
+                fieldGetters.length,
+                row.getArity());
+        for (int i = 0; i < row.getArity(); i++) {
+            Object obj = fieldGetters[i].getFieldOrNull(row);
+            if (obj == null) {
+                nullCounts[i]++;
+                continue;
+            }
+
+            // TODO use comparator for not comparable types and extract this logic to a util class
+            if (!(obj instanceof Comparable)) {
+                continue;
+            }
+            Comparable<Object> c = (Comparable<Object>) obj;
+            if (minValues[i] == null || c.compareTo(minValues[i]) < 0) {
+                minValues[i] = c;
+            }
+            if (maxValues[i] == null || c.compareTo(maxValues[i]) > 0) {
+                maxValues[i] = c;
+            }
+        }
+    }
+
+    public FieldStats[] extract() {
+        FieldStats[] stats = new FieldStats[fieldGetters.length];
+        for (int i = 0; i < stats.length; i++) {
+            stats[i] = new FieldStats(minValues[i], maxValues[i], nullCounts[i]);
+        }
+        return stats;
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
new file mode 100644
index 0000000..520be75
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.store.file.utils.ObjectSerializerTestBase;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FieldStatsArraySerializer}. */
+public class FieldStatsArraySerializerTest extends ObjectSerializerTestBase<FieldStats[]> {
+
+    public TestKeyValueGenerator gen = new TestKeyValueGenerator();
+
+    @Override
+    protected ObjectSerializer<FieldStats[]> serializer() {
+        return new FieldStatsArraySerializer(TestKeyValueGenerator.ROW_TYPE);
+    }
+
+    @Override
+    protected FieldStats[] object() {
+        FieldStatsCollector collector = new FieldStatsCollector(TestKeyValueGenerator.ROW_TYPE);
+        for (int i = 0; i < 10; i++) {
+            collector.collect(gen.next().value());
+        }
+        FieldStats[] result = collector.extract();
+
+        // as stated in SstFile.RollingFile#finish, field stats are not collected currently so
+        // min/max values are all nulls
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int numFieldsNotCollected = random.nextInt(result.length + 1);
+        for (int i = 0; i < numFieldsNotCollected; i++) {
+            result[random.nextInt(result.length)] = new FieldStats(null, null, 0);
+        }
+
+        return result;
+    }
+
+    @Override
+    protected void checkResult(FieldStats[] expected, FieldStats[] actual) {
+        assertThat(actual).isEqualTo(expected);
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
new file mode 100644
index 0000000..36630f4
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FieldStatsCollector}. */
+public class FieldStatsCollectorTest {
+
+    @Test
+    public void testCollect() {
+        RowType rowType =
+                RowType.of(new IntType(), new VarCharType(10), new ArrayType(new IntType()));
+        FieldStatsCollector collector = new FieldStatsCollector(rowType);
+
+        collector.collect(
+                GenericRowData.of(
+                        1,
+                        StringData.fromString("Flink"),
+                        new GenericArrayData(new int[] {1, 10})));
+        assertThat(collector.extract())
+                .isEqualTo(
+                        new FieldStats[] {
+                            new FieldStats(1, 1, 0),
+                            new FieldStats(
+                                    StringData.fromString("Flink"),
+                                    StringData.fromString("Flink"),
+                                    0),
+                            new FieldStats(null, null, 0)
+                        });
+
+        collector.collect(GenericRowData.of(3, null, new GenericArrayData(new int[] {3, 30})));
+        assertThat(collector.extract())
+                .isEqualTo(
+                        new FieldStats[] {
+                            new FieldStats(1, 3, 0),
+                            new FieldStats(
+                                    StringData.fromString("Flink"),
+                                    StringData.fromString("Flink"),
+                                    1),
+                            new FieldStats(null, null, 0)
+                        });
+
+        collector.collect(
+                GenericRowData.of(
+                        null,
+                        StringData.fromString("Apache"),
+                        new GenericArrayData(new int[] {2, 20})));
+        assertThat(collector.extract())
+                .isEqualTo(
+                        new FieldStats[] {
+                            new FieldStats(1, 3, 1),
+                            new FieldStats(
+                                    StringData.fromString("Apache"),
+                                    StringData.fromString("Flink"),
+                                    1),
+                            new FieldStats(null, null, 0)
+                        });
+
+        collector.collect(GenericRowData.of(2, StringData.fromString("Batch"), null));
+        assertThat(collector.extract())
+                .isEqualTo(
+                        new FieldStats[] {
+                            new FieldStats(1, 3, 1),
+                            new FieldStats(
+                                    StringData.fromString("Apache"),
+                                    StringData.fromString("Flink"),
+                                    1),
+                            new FieldStats(null, null, 1)
+                        });
+    }
+}

[flink-table-store] 06/06: [FLINK-25628] Introduce SstFile and SstFileMeta

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 7ac835ffe828a511689ac080d90d2a729aa08d89
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jan 12 19:56:26 2022 +0800

    [FLINK-25628] Introduce SstFile and SstFileMeta
    
    This closes #4
    
    Co-authored-by: tsreaper <ts...@gmail.com>
---
 .../table/store/file/mergetree/sst/SstFile.java    | 269 +++++++++++++++++++++
 .../store/file/mergetree/sst/SstFileMeta.java      | 186 ++++++++++++++
 .../file/mergetree/sst/SstFileMetaSerializer.java  |  69 ++++++
 .../flink/table/store/file/utils/FileUtils.java    |  76 ++++++
 .../mergetree/sst/SstFileMetaSerializerTest.java   |  39 +++
 .../store/file/mergetree/sst/SstFileTest.java      | 245 +++++++++++++++++++
 .../file/mergetree/sst/SstTestDataGenerator.java   | 178 ++++++++++++++
 .../file/utils/FailingAtomicRenameFileSystem.java  | 143 +++++++++++
 .../store/file/utils/FileStorePathFactory.java     | 112 +++++++++
 .../file/utils/TestAtomicRenameFileSystem.java     | 121 +++++++++
 .../org.apache.flink.core.fs.FileSystemFactory     |  17 ++
 11 files changed, 1455 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
new file mode 100644
index 0000000..d73ff45
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.KeyValueSerializer;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This file includes several {@link KeyValue}s, representing the changes inserted into the file
+ * storage.
+ */
+public class SstFile {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SstFile.class);
+
+    private final RowType keyType;
+    private final RowType valueType;
+
+    private final BulkFormat<RowData, FileSourceSplit> readerFactory;
+    private final BulkWriter.Factory<RowData> writerFactory;
+    private final SstPathFactory pathFactory;
+    private final long suggestedFileSize;
+
+    public SstFile(
+            RowType keyType,
+            RowType valueType,
+            FileFormat fileFormat,
+            SstPathFactory pathFactory,
+            long suggestedFileSize) {
+        this.keyType = keyType;
+        this.valueType = valueType;
+
+        RowType recordType = KeyValue.schema(keyType, valueType);
+        this.readerFactory = fileFormat.createReaderFactory(recordType);
+        this.writerFactory = fileFormat.createWriterFactory(recordType);
+        this.pathFactory = pathFactory;
+        this.suggestedFileSize = suggestedFileSize;
+    }
+
+    public RowType keyType() {
+        return keyType;
+    }
+
+    public RowType valueType() {
+        return valueType;
+    }
+
+    @VisibleForTesting
+    public long suggestedFileSize() {
+        return suggestedFileSize;
+    }
+
+    public RecordReader read(String fileName) throws IOException {
+        return new SstFileRecordReader(pathFactory.toPath(fileName));
+    }
+
+    /**
+     * Write several {@link KeyValue}s into an sst file of a given level.
+     *
+     * <p>NOTE: This method is atomic.
+     */
+    public List<SstFileMeta> write(CloseableIterator<KeyValue> iterator, int level)
+            throws Exception {
+        List<SstFileMeta> result = new ArrayList<>();
+
+        RollingFile rollingFile = null;
+        Path currentPath = null;
+        try {
+            while (iterator.hasNext()) {
+                if (rollingFile == null) {
+                    currentPath = pathFactory.newPath();
+                    rollingFile = new RollingFile(currentPath, suggestedFileSize);
+                }
+                rollingFile.write(iterator.next());
+                if (rollingFile.exceedsSuggestedFileSize()) {
+                    result.add(rollingFile.finish(level));
+                    rollingFile = null;
+                }
+            }
+            // finish last file
+            if (rollingFile != null) {
+                result.add(rollingFile.finish(level));
+            }
+            iterator.close();
+        } catch (Throwable e) {
+            LOG.warn("Exception occurs when writing sst files. Cleaning up.", e);
+            // clean up finished files
+            for (SstFileMeta meta : result) {
+                FileUtils.deleteOrWarn(pathFactory.toPath(meta.fileName()));
+            }
+            // clean up in-progress file
+            if (currentPath != null) {
+                FileUtils.deleteOrWarn(currentPath);
+            }
+            throw e;
+        }
+
+        return result;
+    }
+
+    public void delete(SstFileMeta file) {
+        FileUtils.deleteOrWarn(pathFactory.toPath(file.fileName()));
+    }
+
+    private class SstFileRecordReader implements RecordReader {
+
+        private final BulkFormat.Reader<RowData> reader;
+        private final KeyValueSerializer serializer;
+
+        private SstFileRecordReader(Path path) throws IOException {
+            long fileSize = FileUtils.getFileSize(path);
+            FileSourceSplit split = new FileSourceSplit("ignore", path, 0, fileSize, 0, fileSize);
+            this.reader = readerFactory.createReader(FileUtils.DEFAULT_READER_CONFIG, split);
+            this.serializer = new KeyValueSerializer(keyType, valueType);
+        }
+
+        @Nullable
+        @Override
+        public RecordIterator readBatch() throws IOException {
+            BulkFormat.RecordIterator<RowData> iterator = reader.readBatch();
+            return iterator == null ? null : new SstFileRecordIterator(iterator, serializer);
+        }
+
+        @Override
+        public void close() throws IOException {
+            reader.close();
+        }
+    }
+
+    private static class SstFileRecordIterator implements RecordReader.RecordIterator {
+
+        private final BulkFormat.RecordIterator<RowData> iterator;
+        private final KeyValueSerializer serializer;
+
+        private SstFileRecordIterator(
+                BulkFormat.RecordIterator<RowData> iterator, KeyValueSerializer serializer) {
+            this.iterator = iterator;
+            this.serializer = serializer;
+        }
+
+        @Override
+        public KeyValue next() throws IOException {
+            RecordAndPosition<RowData> result = iterator.next();
+            return result == null ? null : serializer.fromRow(result.getRecord());
+        }
+
+        @Override
+        public void releaseBatch() {
+            iterator.releaseBatch();
+        }
+    }
+
+    private class RollingFile {
+        private final Path path;
+        private final long suggestedFileSize;
+
+        private final FSDataOutputStream out;
+        private final BulkWriter<RowData> writer;
+        private final KeyValueSerializer serializer;
+        private final RowDataSerializer keySerializer;
+
+        private long rowCount;
+        private BinaryRowData minKey;
+        private RowData maxKey;
+        private long minSequenceNumber;
+        private long maxSequenceNumber;
+
+        private RollingFile(Path path, long suggestedFileSize) throws IOException {
+            this.path = path;
+            this.suggestedFileSize = suggestedFileSize;
+
+            this.out =
+                    this.path.getFileSystem().create(this.path, FileSystem.WriteMode.NO_OVERWRITE);
+            this.writer = writerFactory.create(out);
+            this.serializer = new KeyValueSerializer(keyType, valueType);
+            this.keySerializer = new RowDataSerializer(keyType);
+
+            this.rowCount = 0;
+            this.minKey = null;
+            this.maxKey = null;
+            this.minSequenceNumber = Long.MAX_VALUE;
+            this.maxSequenceNumber = Long.MIN_VALUE;
+        }
+
+        private void write(KeyValue kv) throws IOException {
+            writer.addElement(serializer.toRow(kv));
+
+            rowCount++;
+            if (minKey == null) {
+                minKey = keySerializer.toBinaryRow(kv.key()).copy();
+            }
+            maxKey = kv.key();
+            minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber());
+            maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber());
+        }
+
+        private boolean exceedsSuggestedFileSize() throws IOException {
+            // NOTE: this method is inaccurate for formats buffering changes in memory
+            return out.getPos() >= suggestedFileSize;
+        }
+
+        private SstFileMeta finish(int level) throws IOException {
+            writer.finish();
+            out.close();
+
+            // TODO
+            //  1. Read statistics directly from the written orc/parquet files.
+            //  2. For other file formats use StatsCollector. Make sure fields are not reused
+            //     otherwise we need copying.
+            FieldStats[] stats = new FieldStats[valueType.getFieldCount()];
+            for (int i = 0; i < stats.length; i++) {
+                stats[i] = new FieldStats(null, null, 0);
+            }
+
+            return new SstFileMeta(
+                    path.getName(),
+                    FileUtils.getFileSize(path),
+                    rowCount,
+                    minKey,
+                    keySerializer.toBinaryRow(maxKey).copy(),
+                    stats,
+                    minSequenceNumber,
+                    maxSequenceNumber,
+                    level);
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
new file mode 100644
index 0000000..5a7e99a
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Metadata of a SST file. */
+public class SstFileMeta {
+
+    private final String fileName;
+    private final long fileSize;
+    private final long rowCount;
+
+    private final BinaryRowData minKey;
+    private final BinaryRowData maxKey;
+    private final FieldStats[] stats;
+
+    private final long minSequenceNumber;
+    private final long maxSequenceNumber;
+    private final int level;
+
+    public SstFileMeta(
+            String fileName,
+            long fileSize,
+            long rowCount,
+            BinaryRowData minKey,
+            BinaryRowData maxKey,
+            FieldStats[] stats,
+            long minSequenceNumber,
+            long maxSequenceNumber,
+            int level) {
+        this.fileName = fileName;
+        this.fileSize = fileSize;
+        this.rowCount = rowCount;
+
+        this.minKey = minKey;
+        this.maxKey = maxKey;
+        this.stats = stats;
+
+        this.minSequenceNumber = minSequenceNumber;
+        this.maxSequenceNumber = maxSequenceNumber;
+        this.level = level;
+    }
+
+    public String fileName() {
+        return fileName;
+    }
+
+    public long fileSize() {
+        return fileSize;
+    }
+
+    public long rowCount() {
+        return rowCount;
+    }
+
+    public BinaryRowData minKey() {
+        return minKey;
+    }
+
+    public BinaryRowData maxKey() {
+        return maxKey;
+    }
+
+    /** Element in the array may be null, indicating the statistics of this field is unknown. */
+    public FieldStats[] stats() {
+        return stats;
+    }
+
+    public long minSequenceNumber() {
+        return minSequenceNumber;
+    }
+
+    public long maxSequenceNumber() {
+        return maxSequenceNumber;
+    }
+
+    public int level() {
+        return level;
+    }
+
+    public SstFileMeta upgrade(int newLevel) {
+        checkArgument(newLevel > this.level);
+        return new SstFileMeta(
+                fileName,
+                fileSize,
+                rowCount,
+                minKey,
+                maxKey,
+                stats,
+                minSequenceNumber,
+                maxSequenceNumber,
+                newLevel);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof SstFileMeta)) {
+            return false;
+        }
+        SstFileMeta that = (SstFileMeta) o;
+        return Objects.equals(fileName, that.fileName)
+                && fileSize == that.fileSize
+                && rowCount == that.rowCount
+                && Objects.equals(minKey, that.minKey)
+                && Objects.equals(maxKey, that.maxKey)
+                && Arrays.equals(stats, that.stats)
+                && minSequenceNumber == that.minSequenceNumber
+                && maxSequenceNumber == that.maxSequenceNumber
+                && level == that.level;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                fileName,
+                fileSize,
+                rowCount,
+                minKey,
+                maxKey,
+                // by default, hash code of arrays are computed by reference, not by content.
+                // so we must use Arrays.hashCode to hash by content.
+                Arrays.hashCode(stats),
+                minSequenceNumber,
+                maxSequenceNumber,
+                level);
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "{%s, %d, %d, %s, %s, %s, %d, %d, %d}",
+                fileName,
+                fileSize,
+                rowCount,
+                minKey,
+                maxKey,
+                Arrays.toString(stats),
+                minSequenceNumber,
+                maxSequenceNumber,
+                level);
+    }
+
+    public static RowType schema(RowType keyType, RowType rowType) {
+        List<RowType.RowField> fields = new ArrayList<>();
+        fields.add(new RowType.RowField("_FILE_NAME", new VarCharType(false, Integer.MAX_VALUE)));
+        fields.add(new RowType.RowField("_FILE_SIZE", new BigIntType(false)));
+        fields.add(new RowType.RowField("_ROW_COUNT", new BigIntType(false)));
+        fields.add(new RowType.RowField("_MIN_KEY", keyType));
+        fields.add(new RowType.RowField("_MAX_KEY", keyType));
+        fields.add(new RowType.RowField("_STATS", FieldStatsArraySerializer.schema(rowType)));
+        fields.add(new RowType.RowField("_MIN_SEQUENCE_NUMBER", new BigIntType(false)));
+        fields.add(new RowType.RowField("_MAX_SEQUENCE_NUMBER", new BigIntType(false)));
+        fields.add(new RowType.RowField("_LEVEL", new IntType(false)));
+        return new RowType(fields);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
new file mode 100644
index 0000000..0e176d1
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+/** Serializer for {@link SstFileMeta}. */
+public class SstFileMetaSerializer extends ObjectSerializer<SstFileMeta> {
+
+    private final RowDataSerializer keySerializer;
+    private final FieldStatsArraySerializer statsArraySerializer;
+
+    public SstFileMetaSerializer(RowType keyType, RowType rowType) {
+        super(SstFileMeta.schema(keyType, rowType));
+        this.keySerializer = new RowDataSerializer(keyType);
+        this.statsArraySerializer = new FieldStatsArraySerializer(rowType);
+    }
+
+    @Override
+    public RowData toRow(SstFileMeta meta) {
+        return GenericRowData.of(
+                StringData.fromString(meta.fileName()),
+                meta.fileSize(),
+                meta.rowCount(),
+                meta.minKey(),
+                meta.maxKey(),
+                statsArraySerializer.toRow(meta.stats()),
+                meta.minSequenceNumber(),
+                meta.maxSequenceNumber(),
+                meta.level());
+    }
+
+    @Override
+    public SstFileMeta fromRow(RowData row) {
+        int keyFieldCount = keySerializer.getArity();
+        return new SstFileMeta(
+                row.getString(0).toString(),
+                row.getLong(1),
+                row.getLong(2),
+                keySerializer.toBinaryRow(row.getRow(3, keyFieldCount)).copy(),
+                keySerializer.toBinaryRow(row.getRow(4, keyFieldCount)).copy(),
+                statsArraySerializer.fromRow(row.getRow(5, statsArraySerializer.numFields())),
+                row.getLong(6),
+                row.getLong(7),
+                row.getInt(8));
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
new file mode 100644
index 0000000..b787b9a
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.Utils;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Utils for file reading and writing. */
+public class FileUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class);
+
+    public static final Configuration DEFAULT_READER_CONFIG = new Configuration();
+
+    static {
+        DEFAULT_READER_CONFIG.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
+    }
+
+    public static <T> List<T> readListFromFile(
+            Path path,
+            ObjectSerializer<T> serializer,
+            BulkFormat<RowData, FileSourceSplit> readerFactory)
+            throws IOException {
+        List<T> result = new ArrayList<>();
+        long fileSize = FileUtils.getFileSize(path);
+        FileSourceSplit split = new FileSourceSplit("ignore", path, 0, fileSize, 0, fileSize);
+        BulkFormat.Reader<RowData> reader =
+                readerFactory.createReader(DEFAULT_READER_CONFIG, split);
+        Utils.forEachRemaining(reader, row -> result.add(serializer.fromRow(row)));
+        return result;
+    }
+
+    public static long getFileSize(Path path) throws IOException {
+        return path.getFileSystem().getFileStatus(path).getLen();
+    }
+
+    public static void deleteOrWarn(Path file) {
+        try {
+            FileSystem fs = file.getFileSystem();
+            if (!fs.delete(file, false) && fs.exists(file)) {
+                LOG.warn("Failed to delete file " + file);
+            }
+        } catch (IOException e) {
+            LOG.warn("Exception occurs when deleting file " + file, e);
+        }
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializerTest.java
new file mode 100644
index 0000000..28e03b6
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializerTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.utils.ObjectSerializerTestBase;
+
+/** Tests for {@link SstFileMetaSerializer}. */
+public class SstFileMetaSerializerTest extends ObjectSerializerTestBase<SstFileMeta> {
+
+    private final SstTestDataGenerator gen = SstTestDataGenerator.builder().build();
+
+    @Override
+    protected SstFileMetaSerializer serializer() {
+        return new SstFileMetaSerializer(
+                TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE);
+    }
+
+    @Override
+    protected SstFileMeta object() {
+        return gen.next().meta;
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
new file mode 100644
index 0000000..de6ed25
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.KeyValueSerializerTest;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SstFile}. */
+public class SstFileTest {
+
+    private final SstTestDataGenerator gen =
+            SstTestDataGenerator.builder().memTableCapacity(20).build();
+    private final FileFormat flushingAvro = new FlushingAvroFormat();
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @RepeatedTest(10)
+    public void testWriteAndReadSstFile() throws Exception {
+        SstTestDataGenerator.SstFile data = gen.next();
+        SstFile sstFile = createSstFile(tempDir.toString());
+        SstFileMetaSerializer serializer =
+                new SstFileMetaSerializer(
+                        TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE);
+
+        List<SstFileMeta> actualMetas =
+                sstFile.write(CloseableIterator.fromList(data.content, kv -> {}), 0);
+
+        checkRollingFiles(data.meta, actualMetas, sstFile.suggestedFileSize());
+
+        Iterator<KeyValue> expectedIterator = data.content.iterator();
+        for (SstFileMeta meta : actualMetas) {
+            // check the contents of sst file
+            CloseableIterator<KeyValue> actualKvsIterator =
+                    new RecordReaderIterator(sstFile.read(meta.fileName()));
+            while (actualKvsIterator.hasNext()) {
+                assertThat(expectedIterator.hasNext()).isTrue();
+                KeyValue actualKv = actualKvsIterator.next();
+                assertThat(
+                                KeyValueSerializerTest.equals(
+                                        expectedIterator.next(),
+                                        actualKv,
+                                        TestKeyValueGenerator.KEY_SERIALIZER,
+                                        TestKeyValueGenerator.ROW_SERIALIZER))
+                        .isTrue();
+            }
+            actualKvsIterator.close();
+
+            // check that each sst file meta is serializable
+            assertThat(serializer.fromRow(serializer.toRow(meta))).isEqualTo(meta);
+        }
+        assertThat(expectedIterator.hasNext()).isFalse();
+    }
+
+    @RepeatedTest(10)
+    public void testCleanUpForException() throws IOException {
+        FailingAtomicRenameFileSystem.resetFailCounter(1);
+        FailingAtomicRenameFileSystem.setFailPossibility(10);
+        SstTestDataGenerator.SstFile data = gen.next();
+        SstFile sstFile =
+                createSstFile(FailingAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString());
+
+        try {
+            sstFile.write(CloseableIterator.fromList(data.content, kv -> {}), 0);
+        } catch (Throwable e) {
+            assertThat(e)
+                    .isExactlyInstanceOf(FailingAtomicRenameFileSystem.ArtificialException.class);
+            Path root = new Path(tempDir.toString());
+            FileSystem fs = root.getFileSystem();
+            for (FileStatus bucketStatus : fs.listStatus(root)) {
+                assertThat(bucketStatus.isDir()).isTrue();
+                assertThat(fs.listStatus(bucketStatus.getPath())).isEmpty();
+            }
+        }
+    }
+
+    private SstFile createSstFile(String path) {
+        FileStorePathFactory fileStorePathFactory = new FileStorePathFactory(new Path(path));
+        SstPathFactory sstPathFactory = fileStorePathFactory.createSstPathFactory(null, 0);
+        int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
+        return new SstFile(
+                TestKeyValueGenerator.KEY_TYPE,
+                TestKeyValueGenerator.ROW_TYPE,
+                // normal avro format will buffer changes in memory and we can't determine
+                // if the written file size is really larger than suggested, so we use a
+                // special avro format which flushes for every added element
+                flushingAvro,
+                sstPathFactory,
+                suggestedFileSize);
+    }
+
+    private void checkRollingFiles(
+            SstFileMeta expected, List<SstFileMeta> actual, long suggestedFileSize) {
+        // all but last file should be no smaller than suggestedFileSize
+        for (int i = 0; i + 1 < actual.size(); i++) {
+            assertThat(actual.get(i).fileSize() >= suggestedFileSize).isTrue();
+        }
+
+        // expected.rowCount == sum(rowCount)
+        assertThat(actual.stream().mapToLong(SstFileMeta::rowCount).sum())
+                .isEqualTo(expected.rowCount());
+
+        // expected.minKey == firstFile.minKey
+        assertThat(actual.get(0).minKey()).isEqualTo(expected.minKey());
+
+        // expected.maxKey == lastFile.maxKey
+        assertThat(actual.get(actual.size() - 1).maxKey()).isEqualTo(expected.maxKey());
+
+        // TODO check stats after they're collected
+        /*
+        for (int i = 0; i < expected.stats().length; i++) {
+            List<FieldStats> actualStats = new ArrayList<>();
+            for (SstFileMeta meta : actual) {
+                actualStats.add(meta.stats()[i]);
+            }
+            checkRollingFileStats(expected.stats()[i], actualStats);
+        }
+        */
+
+        // expected.minSequenceNumber == min(minSequenceNumber)
+        assertThat(actual.stream().mapToLong(SstFileMeta::minSequenceNumber).min().orElse(-1))
+                .isEqualTo(expected.minSequenceNumber());
+
+        // expected.maxSequenceNumber == max(maxSequenceNumber)
+        assertThat(actual.stream().mapToLong(SstFileMeta::maxSequenceNumber).max().orElse(-1))
+                .isEqualTo(expected.maxSequenceNumber());
+
+        // expected.level == eachFile.level
+        for (SstFileMeta meta : actual) {
+            assertThat(meta.level()).isEqualTo(expected.level());
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void checkRollingFileStats(FieldStats expected, List<FieldStats> actual) {
+        if (expected.minValue() instanceof Comparable) {
+            Object actualMin = null;
+            Object actualMax = null;
+            for (FieldStats stats : actual) {
+                if (stats.minValue() != null
+                        && (actualMin == null
+                                || ((Comparable<Object>) stats.minValue()).compareTo(actualMin)
+                                        < 0)) {
+                    actualMin = stats.minValue();
+                }
+                if (stats.maxValue() != null
+                        && (actualMax == null
+                                || ((Comparable<Object>) stats.maxValue()).compareTo(actualMax)
+                                        > 0)) {
+                    actualMax = stats.maxValue();
+                }
+            }
+            assertThat(actualMin).isEqualTo(expected.minValue());
+            assertThat(actualMax).isEqualTo(expected.maxValue());
+        } else {
+            for (FieldStats stats : actual) {
+                assertThat(stats.minValue()).isNull();
+                assertThat(stats.maxValue()).isNull();
+            }
+        }
+        assertThat(actual.stream().mapToLong(FieldStats::nullCount).sum())
+                .isEqualTo(expected.nullCount());
+    }
+
+    /** A special avro {@link FileFormat} which flushes for every added element. */
+    public static class FlushingAvroFormat implements FileFormat {
+
+        private final FileFormat avro =
+                FileFormat.fromIdentifier(
+                        SstFileTest.class.getClassLoader(), "avro", new Configuration());
+
+        @Override
+        public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+                RowType type, List<ResolvedExpression> filters) {
+            return avro.createReaderFactory(type, filters);
+        }
+
+        @Override
+        public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
+            return fsDataOutputStream -> {
+                BulkWriter<RowData> wrapped =
+                        avro.createWriterFactory(type).create(fsDataOutputStream);
+                return new BulkWriter<RowData>() {
+                    @Override
+                    public void addElement(RowData rowData) throws IOException {
+                        wrapped.addElement(rowData);
+                        wrapped.flush();
+                    }
+
+                    @Override
+                    public void flush() throws IOException {
+                        wrapped.flush();
+                    }
+
+                    @Override
+                    public void finish() throws IOException {
+                        wrapped.finish();
+                    }
+                };
+            };
+        }
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
new file mode 100644
index 0000000..94b2f08
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/** Random {@link SstFileMeta} generator. */
+public class SstTestDataGenerator {
+
+    private final int numBuckets;
+    private final int memTableCapacity;
+
+    private final List<Map<BinaryRowData, List<KeyValue>>> memTables;
+    private final TestKeyValueGenerator gen;
+
+    private SstTestDataGenerator(int numBuckets, int memTableCapacity) {
+        this.numBuckets = numBuckets;
+        this.memTableCapacity = memTableCapacity;
+
+        this.memTables = new ArrayList<>();
+        for (int i = 0; i < numBuckets; i++) {
+            memTables.add(new HashMap<>());
+        }
+        this.gen = new TestKeyValueGenerator();
+    }
+
+    public SstFile next() {
+        while (true) {
+            KeyValue kv = gen.next();
+            BinaryRowData key = (BinaryRowData) kv.key();
+            BinaryRowData partition = gen.getPartition(kv);
+            int bucket = (key.hashCode() % numBuckets + numBuckets) % numBuckets;
+            List<KeyValue> memTable =
+                    memTables.get(bucket).computeIfAbsent(partition, k -> new ArrayList<>());
+            memTable.add(kv);
+
+            if (memTable.size() >= memTableCapacity) {
+                List<SstFile> result = createSstFiles(memTable, 0, partition, bucket);
+                memTable.clear();
+                assert result.size() == 1;
+                return result.get(0);
+            }
+        }
+    }
+
+    public List<SstFile> createSstFiles(
+            List<KeyValue> kvs, int level, BinaryRowData partition, int bucket) {
+        gen.sort(kvs);
+        List<KeyValue> combined = new ArrayList<>();
+        for (int i = 0; i + 1 < kvs.size(); i++) {
+            KeyValue now = kvs.get(i);
+            KeyValue next = kvs.get(i + 1);
+            if (!now.key().equals(next.key())) {
+                combined.add(now);
+            }
+        }
+        combined.add(kvs.get(kvs.size() - 1));
+
+        int capacity = memTableCapacity;
+        for (int i = 0; i < level; i++) {
+            capacity *= memTableCapacity;
+        }
+        List<SstFile> result = new ArrayList<>();
+        for (int i = 0; i < combined.size(); i += capacity) {
+            result.add(
+                    createSstFile(
+                            combined.subList(i, Math.min(i + capacity, combined.size())),
+                            level,
+                            partition,
+                            bucket));
+        }
+        return result;
+    }
+
+    private SstFile createSstFile(
+            List<KeyValue> kvs, int level, BinaryRowData partition, int bucket) {
+        FieldStatsCollector collector = new FieldStatsCollector(TestKeyValueGenerator.ROW_TYPE);
+        long totalSize = 0;
+        BinaryRowData minKey = null;
+        BinaryRowData maxKey = null;
+        long minSequenceNumber = Long.MAX_VALUE;
+        long maxSequenceNumber = Long.MIN_VALUE;
+        for (KeyValue kv : kvs) {
+            BinaryRowData key = (BinaryRowData) kv.key();
+            BinaryRowData value = (BinaryRowData) kv.value();
+            totalSize += key.getSizeInBytes() + value.getSizeInBytes();
+            collector.collect(value);
+            if (minKey == null || gen.compareKeys(key, minKey) < 0) {
+                minKey = key;
+            }
+            if (maxKey == null || gen.compareKeys(key, maxKey) > 0) {
+                maxKey = key;
+            }
+            minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber());
+            maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber());
+        }
+
+        return new SstFile(
+                partition,
+                bucket,
+                new SstFileMeta(
+                        "sst-" + UUID.randomUUID(),
+                        totalSize,
+                        kvs.size(),
+                        minKey,
+                        maxKey,
+                        collector.extract(),
+                        minSequenceNumber,
+                        maxSequenceNumber,
+                        level),
+                kvs);
+    }
+
+    /** An in-memory SST file. */
+    public static class SstFile {
+        public final BinaryRowData partition;
+        public final int bucket;
+        public final SstFileMeta meta;
+        public final List<KeyValue> content;
+
+        private SstFile(
+                BinaryRowData partition, int bucket, SstFileMeta meta, List<KeyValue> content) {
+            this.partition = partition;
+            this.bucket = bucket;
+            this.meta = meta;
+            this.content = content;
+        }
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /** Builder for {@link SstTestDataGenerator}. */
+    public static class Builder {
+        private int numBuckets = 3;
+        private int memTableCapacity = 3;
+
+        public Builder numBuckets(int value) {
+            this.numBuckets = value;
+            return this;
+        }
+
+        public Builder memTableCapacity(int value) {
+            this.memTableCapacity = value;
+            return this;
+        }
+
+        public SstTestDataGenerator build() {
+            return new SstTestDataGenerator(numBuckets, memTableCapacity);
+        }
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
new file mode 100644
index 0000000..eb3b32a
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataInputStreamWrapper;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FSDataOutputStreamWrapper;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A {@link TestAtomicRenameFileSystem} which may fail when reading and writing. Mainly used to
+ * check if components deal with failures correctly.
+ */
+public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
+
+    public static final String SCHEME = "fail";
+
+    private static final AtomicInteger failCounter = new AtomicInteger();
+    private static int failPossibility = 1000;
+
+    public static void resetFailCounter(int maxValue) {
+        failCounter.set(maxValue);
+    }
+
+    public static void setFailPossibility(int v) {
+        failPossibility = v;
+    }
+
+    @Override
+    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+        return new FailingFSDataInputStreamWrapper(super.open(f, bufferSize));
+    }
+
+    @Override
+    public FSDataInputStream open(Path f) throws IOException {
+        return new FailingFSDataInputStreamWrapper(super.open(f));
+    }
+
+    @Override
+    public FSDataOutputStream create(Path filePath, FileSystem.WriteMode overwrite)
+            throws IOException {
+        return new FailingFSDataOutputStreamWrapper(super.create(filePath, overwrite));
+    }
+
+    @Override
+    public URI getUri() {
+        return URI.create(SCHEME + ":///");
+    }
+
+    /** {@link FileSystemFactory} for {@link FailingAtomicRenameFileSystem}. */
+    public static final class FailingAtomicRenameFileSystemFactory implements FileSystemFactory {
+
+        @Override
+        public String getScheme() {
+            return SCHEME;
+        }
+
+        @Override
+        public FileSystem create(URI uri) throws IOException {
+            return new FailingAtomicRenameFileSystem();
+        }
+    }
+
+    /** Specific {@link IOException} produced by {@link FailingAtomicRenameFileSystem}. */
+    public static final class ArtificialException extends IOException {
+
+        public ArtificialException() {
+            super("Artificial exception");
+        }
+    }
+
+    private static class FailingFSDataInputStreamWrapper extends FSDataInputStreamWrapper {
+
+        public FailingFSDataInputStreamWrapper(FSDataInputStream inputStream) {
+            super(inputStream);
+        }
+
+        @Override
+        public int read() throws IOException {
+            if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
+                    && failCounter.getAndDecrement() > 0) {
+                throw new ArtificialException();
+            }
+            return super.read();
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
+                    && failCounter.getAndDecrement() > 0) {
+                throw new ArtificialException();
+            }
+            return super.read(b, off, len);
+        }
+    }
+
+    private static class FailingFSDataOutputStreamWrapper extends FSDataOutputStreamWrapper {
+
+        public FailingFSDataOutputStreamWrapper(FSDataOutputStream outputStream) {
+            super(outputStream);
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            if (ThreadLocalRandom.current().nextInt(failPossibility) == 0) {
+                throw new ArtificialException();
+            }
+            super.write(b);
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            if (ThreadLocalRandom.current().nextInt(failPossibility) == 0) {
+                throw new ArtificialException();
+            }
+            super.write(b, off, len);
+        }
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
new file mode 100644
index 0000000..2ae6946
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.connector.file.table.RowDataPartitionComputer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
+import org.apache.flink.table.utils.PartitionPathUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+
+/** Factory which produces {@link Path}s for each type of files. */
+public class FileStorePathFactory {
+
+    private final Path root;
+    private final String uuid;
+    @Nullable private final RowDataPartitionComputer partitionComputer;
+
+    private int manifestFileCount;
+    private int manifestListCount;
+
+    public FileStorePathFactory(Path root) {
+        this(root, null, FileSystemConnectorOptions.PARTITION_DEFAULT_NAME.defaultValue());
+    }
+
+    public FileStorePathFactory(
+            Path root, @Nullable RowType partitionType, String defaultPartValue) {
+        this.root = root;
+        this.uuid = UUID.randomUUID().toString();
+
+        if (partitionType == null) {
+            this.partitionComputer = null;
+        } else {
+            String[] partitionColumns = partitionType.getFieldNames().toArray(new String[0]);
+            this.partitionComputer =
+                    new RowDataPartitionComputer(
+                            defaultPartValue,
+                            partitionColumns,
+                            partitionType.getFields().stream()
+                                    .map(f -> LogicalTypeDataTypeConverter.toDataType(f.getType()))
+                                    .toArray(DataType[]::new),
+                            partitionColumns);
+        }
+
+        this.manifestFileCount = 0;
+        this.manifestListCount = 0;
+    }
+
+    public Path newManifestFile() {
+        return new Path(root + "/manifest/manifest-" + uuid + "-" + (manifestFileCount++));
+    }
+
+    public Path newManifestList() {
+        return new Path(root + "/manifest/manifest-list-" + uuid + "-" + (manifestListCount++));
+    }
+
+    public Path toManifestFilePath(String manifestFileName) {
+        return new Path(root + "/manifest/" + manifestFileName);
+    }
+
+    public Path toManifestListPath(String manifestListName) {
+        return new Path(root + "/manifest/" + manifestListName);
+    }
+
+    public Path toSnapshotPath(long id) {
+        return new Path(root + "/snapshot/snapshot-" + id);
+    }
+
+    public SstPathFactory createSstPathFactory(@Nullable BinaryRowData partition, int bucket) {
+        return new SstPathFactory(root, getPartitionString(partition), bucket);
+    }
+
+    public @Nullable String getPartitionString(@Nullable BinaryRowData partition) {
+        if (partitionComputer == null) {
+            return null;
+        }
+        return PartitionPathUtils.generatePartitionPath(
+                partitionComputer.generatePartValues(
+                        Preconditions.checkNotNull(
+                                partition, "Partition row data is null. This is unexpected.")));
+    }
+
+    @VisibleForTesting
+    public String uuid() {
+        return uuid;
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestAtomicRenameFileSystem.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestAtomicRenameFileSystem.java
new file mode 100644
index 0000000..98f4101
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestAtomicRenameFileSystem.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileStatus;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.DirectoryNotEmptyException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** A modified {@link LocalFileSystem} supporting atomic rename. */
+public class TestAtomicRenameFileSystem extends LocalFileSystem {
+
+    public static final String SCHEME = "test";
+
+    // the same file system object is cached and shared in the same JVM,
+    // so we can use java locks to ensure atomic renaming
+    private final ReentrantLock renameLock;
+
+    public TestAtomicRenameFileSystem() {
+        this.renameLock = new ReentrantLock();
+    }
+
+    @Override
+    public boolean rename(final Path src, final Path dst) throws IOException {
+        File srcFile = pathToFile(src);
+        File dstFile = pathToFile(dst);
+        File dstParent = dstFile.getParentFile();
+        dstParent.mkdirs();
+        try {
+            renameLock.lock();
+            Files.move(srcFile.toPath(), dstFile.toPath());
+            return true;
+        } catch (NoSuchFileException
+                | AccessDeniedException
+                | DirectoryNotEmptyException
+                | SecurityException
+                | FileAlreadyExistsException e) {
+            return false;
+        } finally {
+            renameLock.unlock();
+        }
+    }
+
+    @Override
+    public FileStatus[] listStatus(final Path f) throws IOException {
+        // TODO remove this method once FLINK-25453 is fixed
+        File localf = pathToFile(f);
+        if (!localf.exists()) {
+            return null;
+        }
+        if (localf.isFile()) {
+            return new FileStatus[] {new LocalFileStatus(localf, this)};
+        }
+
+        final String[] names = localf.list();
+        if (names == null) {
+            return null;
+        }
+        List<FileStatus> results = new ArrayList<>();
+        for (String name : names) {
+            try {
+                results.add(getFileStatus(new Path(f, name)));
+            } catch (FileNotFoundException e) {
+                // ignore the files not found since the dir list may have have changed
+                // since the names[] list was generated.
+            }
+        }
+
+        return results.toArray(new FileStatus[0]);
+    }
+
+    @Override
+    public URI getUri() {
+        return URI.create(SCHEME + ":///");
+    }
+
+    /** {@link FileSystemFactory} for {@link TestAtomicRenameFileSystem}. */
+    public static final class TestAtomicRenameFileSystemFactory implements FileSystemFactory {
+
+        @Override
+        public String getScheme() {
+            return SCHEME;
+        }
+
+        @Override
+        public FileSystem create(URI uri) throws IOException {
+            return new TestAtomicRenameFileSystem();
+        }
+    }
+}
diff --git a/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 0000000..82e0c2d
--- /dev/null
+++ b/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem$TestAtomicRenameFileSystemFactory
+org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem$FailingAtomicRenameFileSystemFactory

[flink-table-store] 03/06: [FLINK-25628] Introduce ConcatRecordReader

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 610269895727efaa314d54253f5b1acfc3f769f5
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jan 12 19:44:01 2022 +0800

    [FLINK-25628] Introduce ConcatRecordReader
    
    Co-authored-by: Jane Chan <55...@users.noreply.github.com>
    Co-authored-by: tsreaper <ts...@gmail.com>
---
 .../file/mergetree/compact/ConcatRecordReader.java | 84 ++++++++++++++++++++++
 .../mergetree/compact/ConcatRecordReaderTest.java  | 67 +++++++++++++++++
 2 files changed, 151 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java
new file mode 100644
index 0000000..e8c6e44
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * This reader is to concatenate a list of {@link RecordReader}s and read them sequentially. The
+ * input list is already sorted by key and sequence number, and the key intervals do not overlap
+ * each other.
+ */
+public class ConcatRecordReader implements RecordReader {
+
+    private final Queue<ReaderSupplier> queue;
+
+    private RecordReader current;
+
+    protected ConcatRecordReader(List<ReaderSupplier> readerFactories) {
+        readerFactories.forEach(
+                supplier ->
+                        Preconditions.checkNotNull(supplier, "Reader factory must not be null."));
+        this.queue = new LinkedList<>(readerFactories);
+    }
+
+    public static RecordReader create(List<ReaderSupplier> readers) throws IOException {
+        return readers.size() == 1 ? readers.get(0).get() : new ConcatRecordReader(readers);
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator readBatch() throws IOException {
+        while (true) {
+            if (current != null) {
+                RecordIterator iterator = current.readBatch();
+                if (iterator != null) {
+                    return iterator;
+                }
+                current.close();
+                current = null;
+            } else if (queue.size() > 0) {
+                current = queue.poll().get();
+            } else {
+                return null;
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (current != null) {
+            current.close();
+        }
+    }
+
+    /** Supplier to get {@link RecordReader}. */
+    @FunctionalInterface
+    public interface ReaderSupplier {
+        RecordReader get() throws IOException;
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReaderTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReaderTest.java
new file mode 100644
index 0000000..6ed2edf
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReaderTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.ReusingTestData;
+import org.apache.flink.table.store.file.utils.TestReusingRecordReader;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Tests for {@link ConcatRecordReader}. */
+public class ConcatRecordReaderTest extends CombiningRecordReaderTestBase {
+
+    @Override
+    protected boolean addOnly() {
+        return false;
+    }
+
+    @Override
+    protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
+        return input;
+    }
+
+    @Override
+    protected RecordReader createRecordReader(List<TestReusingRecordReader> readers) {
+        return new ConcatRecordReader(
+                readers.stream()
+                        .map(r -> (ConcatRecordReader.ReaderSupplier) () -> r)
+                        .collect(Collectors.toList()));
+    }
+
+    @Test
+    public void testSmallData() throws IOException {
+        runTest(
+                parseData(
+                        "1, 1, +, 100 | 3, 2, +, 300 | 5, 3, -, 500 | "
+                                + "7, 4, +, 700 | 9, 20, +, 900",
+                        "",
+                        "12, 6, +, 1200 |  14, 7, +, 1400 |  16, 8, -, 1600 |  18, 9, -, 1800"));
+        runTest(
+                parseData(
+                        " 1, 10, +, 100 |  3, 20, +, 300 |  5, 30, -, 500 | "
+                                + " 7, 40, +, 700 |  9, 200, -, 900",
+                        "",
+                        " 12, 60, +, 1200 |  14, 70, -, 1400 |  16, 80, +, 1600 |  18, 90, -, 1800"));
+    }
+}