You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/13 10:44:58 UTC

[flink-table-store] 04/04: [FLINK-25630] Introduce MergeTree writer and reader

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 89a183f22007ad1b9afa9f7d9f2124d0e79fbae8
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Jan 13 17:43:20 2022 +0800

    [FLINK-25630] Introduce MergeTree writer and reader
    
    This closes #6
---
 .../table/store/file/mergetree/MergeTree.java      | 112 +++++++
 .../store/file/mergetree/MergeTreeReader.java      | 122 ++++++++
 .../store/file/mergetree/MergeTreeWriter.java      | 185 ++++++++++++
 .../table/store/file/mergetree/MergeTreeTest.java  | 323 +++++++++++++++++++++
 4 files changed, 742 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTree.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTree.java
new file mode 100644
index 0000000..84142d4
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTree.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
+import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/** A merge tree, provides writer and reader, the granularity of the change is the file. */
+public class MergeTree {
+
+    private final MergeTreeOptions options;
+
+    private final SstFile sstFile;
+
+    private final Comparator<RowData> keyComparator;
+
+    private final ExecutorService compactExecutor;
+
+    private final Accumulator accumulator;
+
+    public MergeTree(
+            MergeTreeOptions options,
+            SstFile sstFile,
+            Comparator<RowData> keyComparator,
+            ExecutorService compactExecutor,
+            Accumulator accumulator) {
+        this.options = options;
+        this.sstFile = sstFile;
+        this.keyComparator = keyComparator;
+        this.compactExecutor = compactExecutor;
+        this.accumulator = accumulator;
+    }
+
+    /**
+     * Create {@link RecordWriter} from restored files. Some compaction of files may occur during
+     * the write process.
+     */
+    public RecordWriter createWriter(List<SstFileMeta> restoreFiles) {
+        long maxSequenceNumber =
+                restoreFiles.stream()
+                        .map(SstFileMeta::maxSequenceNumber)
+                        .max(Long::compare)
+                        .orElse(-1L);
+        return new MergeTreeWriter(
+                new SortBufferMemTable(
+                        sstFile.keyType(),
+                        sstFile.valueType(),
+                        options.writeBufferSize,
+                        options.pageSize),
+                createCompactManager(),
+                new Levels(keyComparator, restoreFiles, options.numLevels),
+                maxSequenceNumber,
+                keyComparator,
+                accumulator.copy(),
+                sstFile,
+                options.commitForceCompact);
+    }
+
+    /**
+     * Create {@link RecordReader} from file sections. The caller can decide whether to drop the
+     * deletion record.
+     */
+    public RecordReader createReader(List<List<SortedRun>> sections, boolean dropDelete)
+            throws IOException {
+        return new MergeTreeReader(
+                sections, dropDelete, sstFile, keyComparator, accumulator.copy());
+    }
+
+    private CompactManager createCompactManager() {
+        CompactStrategy compactStrategy =
+                new UniversalCompaction(
+                        options.maxSizeAmplificationPercent,
+                        options.sizeRatio,
+                        options.numSortedRunMax);
+        CompactManager.Rewriter rewriter =
+                (outputLevel, dropDelete, sections) ->
+                        sstFile.write(
+                                new RecordReaderIterator(createReader(sections, dropDelete)),
+                                outputLevel);
+        return new CompactManager(
+                compactExecutor, compactStrategy, keyComparator, options.targetFileSize, rewriter);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
new file mode 100644
index 0000000..30edafb
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.ReaderSupplier;
+import org.apache.flink.table.store.file.mergetree.compact.SortMergeReader;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+/** A {@link RecordReader} to read merge tree sections. */
+public class MergeTreeReader implements RecordReader {
+
+    private final RecordReader reader;
+
+    private final boolean dropDelete;
+
+    public MergeTreeReader(
+            List<List<SortedRun>> sections,
+            boolean dropDelete,
+            SstFile sstFile,
+            Comparator<RowData> userKeyComparator,
+            Accumulator accumulator)
+            throws IOException {
+        this.dropDelete = dropDelete;
+
+        List<ReaderSupplier> readers = new ArrayList<>();
+        for (List<SortedRun> section : sections) {
+            readers.add(() -> readerForSection(section, sstFile, userKeyComparator, accumulator));
+        }
+        this.reader = ConcatRecordReader.create(readers);
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator readBatch() throws IOException {
+        RecordIterator batch = reader.readBatch();
+
+        if (!dropDelete) {
+            return batch;
+        }
+
+        if (batch == null) {
+            return null;
+        }
+
+        return new RecordIterator() {
+            @Override
+            public KeyValue next() throws IOException {
+                while (true) {
+                    KeyValue kv = batch.next();
+                    if (kv == null) {
+                        return null;
+                    }
+
+                    if (kv.valueKind() == ValueKind.ADD) {
+                        return kv;
+                    }
+                }
+            }
+
+            @Override
+            public void releaseBatch() {
+                batch.releaseBatch();
+            }
+        };
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    public static RecordReader readerForSection(
+            List<SortedRun> section,
+            SstFile sstFile,
+            Comparator<RowData> userKeyComparator,
+            Accumulator accumulator)
+            throws IOException {
+        List<RecordReader> readers = new ArrayList<>();
+        for (SortedRun run : section) {
+            readers.add(readerForRun(run, sstFile));
+        }
+        return SortMergeReader.create(readers, userKeyComparator, accumulator);
+    }
+
+    public static RecordReader readerForRun(SortedRun run, SstFile sstFile) throws IOException {
+        List<ReaderSupplier> readers = new ArrayList<>();
+        for (SstFileMeta file : run.files()) {
+            readers.add(() -> sstFile.read(file.fileName()));
+        }
+        return ConcatRecordReader.create(readers);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
new file mode 100644
index 0000000..5cb4a3d
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+/** A {@link RecordWriter} to write records and generate {@link Increment}. */
+public class MergeTreeWriter implements RecordWriter {
+
+    private final MemTable memTable;
+
+    private final CompactManager compactManager;
+
+    private final Levels levels;
+
+    private final Comparator<RowData> keyComparator;
+
+    private final Accumulator accumulator;
+
+    private final SstFile sstFile;
+
+    private final boolean commitForceCompact;
+
+    private final LinkedHashSet<SstFileMeta> newFiles;
+
+    private final LinkedHashSet<SstFileMeta> compactBefore;
+
+    private final LinkedHashSet<SstFileMeta> compactAfter;
+
+    private long newSequenceNumber;
+
+    public MergeTreeWriter(
+            MemTable memTable,
+            CompactManager compactManager,
+            Levels levels,
+            long maxSequenceNumber,
+            Comparator<RowData> keyComparator,
+            Accumulator accumulator,
+            SstFile sstFile,
+            boolean commitForceCompact) {
+        this.memTable = memTable;
+        this.compactManager = compactManager;
+        this.levels = levels;
+        this.newSequenceNumber = maxSequenceNumber + 1;
+        this.keyComparator = keyComparator;
+        this.accumulator = accumulator;
+        this.sstFile = sstFile;
+        this.commitForceCompact = commitForceCompact;
+        this.newFiles = new LinkedHashSet<>();
+        this.compactBefore = new LinkedHashSet<>();
+        this.compactAfter = new LinkedHashSet<>();
+    }
+
+    private long newSequenceNumber() {
+        return newSequenceNumber++;
+    }
+
+    @VisibleForTesting
+    Levels levels() {
+        return levels;
+    }
+
+    @Override
+    public void write(ValueKind valueKind, RowData key, RowData value) throws Exception {
+        long sequenceNumber = newSequenceNumber();
+        boolean success = memTable.put(sequenceNumber, valueKind, key, value);
+        if (!success) {
+            flush();
+            success = memTable.put(sequenceNumber, valueKind, key, value);
+            if (!success) {
+                throw new RuntimeException("Mem table is too small to hold a single element.");
+            }
+        }
+    }
+
+    private void flush() throws Exception {
+        if (memTable.size() > 0) {
+            finishCompaction();
+            Iterator<KeyValue> iterator = memTable.iterator(keyComparator, accumulator);
+            List<SstFileMeta> files =
+                    sstFile.write(CloseableIterator.adapterForIterator(iterator), 0);
+            newFiles.addAll(files);
+            files.forEach(levels::addLevel0File);
+            memTable.clear();
+            submitCompaction();
+        }
+    }
+
+    @Override
+    public Increment prepareCommit() throws Exception {
+        flush();
+        if (commitForceCompact) {
+            finishCompaction();
+        }
+        return drainIncrement();
+    }
+
+    @Override
+    public void sync() throws Exception {
+        finishCompaction();
+    }
+
+    private Increment drainIncrement() {
+        Increment increment =
+                new Increment(
+                        new ArrayList<>(newFiles),
+                        new ArrayList<>(compactBefore),
+                        new ArrayList<>(compactAfter));
+        newFiles.clear();
+        compactBefore.clear();
+        compactAfter.clear();
+        return increment;
+    }
+
+    private void updateCompactResult(CompactManager.CompactResult result) {
+        for (SstFileMeta file : result.before()) {
+            boolean removed = compactAfter.remove(file);
+            if (removed) {
+                // This is an intermediate file (not a new data file), which is no longer needed
+                // after compaction and can be deleted directly
+                sstFile.delete(file);
+            } else {
+                compactBefore.add(file);
+            }
+        }
+        compactAfter.addAll(result.after());
+    }
+
+    private void submitCompaction() {
+        compactManager.submitCompaction(levels);
+    }
+
+    private void finishCompaction() throws ExecutionException, InterruptedException {
+        Optional<CompactManager.CompactResult> result = compactManager.finishCompaction(levels);
+        if (result.isPresent()) {
+            updateCompactResult(result.get());
+        }
+    }
+
+    @Override
+    public List<SstFileMeta> close() {
+        // delete temporary files
+        List<SstFileMeta> delete = new ArrayList<>(newFiles);
+        delete.addAll(compactAfter);
+        for (SstFileMeta file : delete) {
+            sstFile.delete(file);
+        }
+        newFiles.clear();
+        compactAfter.clear();
+        return delete;
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
new file mode 100644
index 0000000..5c00ccc
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileTest;
+import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link MergeTree}. */
+public class MergeTreeTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private static ExecutorService service;
+
+    private SstPathFactory fileFactory;
+
+    private Comparator<RowData> comparator;
+
+    private SstFile sstFile;
+
+    private MergeTree mergeTree;
+
+    private RecordWriter writer;
+
+    @BeforeEach
+    public void beforeEach() throws IOException {
+        fileFactory = new SstPathFactory(new Path(tempDir.toString()), null, 123);
+        Path bucketDir = fileFactory.toPath("ignore").getParent();
+        bucketDir.getFileSystem().mkdirs(bucketDir);
+
+        comparator = Comparator.comparingInt(o -> o.getInt(0));
+        Configuration configuration = new Configuration();
+        configuration.set(MergeTreeOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+        configuration.set(MergeTreeOptions.PAGE_SIZE, new MemorySize(4096));
+        MergeTreeOptions options = new MergeTreeOptions(configuration);
+        sstFile =
+                new SstFile(
+                        new RowType(singletonList(new RowType.RowField("k", new IntType()))),
+                        new RowType(singletonList(new RowType.RowField("v", new IntType()))),
+                        new SstFileTest.FlushingAvroFormat(),
+                        fileFactory,
+                        options.targetFileSize);
+        mergeTree =
+                new MergeTree(options, sstFile, comparator, service, new DeduplicateAccumulator());
+        writer = mergeTree.createWriter(new ArrayList<>());
+    }
+
+    @BeforeAll
+    public static void before() {
+        service = Executors.newSingleThreadExecutor();
+    }
+
+    @AfterAll
+    public static void after() {
+        service.shutdownNow();
+        service = null;
+    }
+
+    @Test
+    public void testEmpty() throws Exception {
+        doTestWriteRead(0);
+    }
+
+    @Test
+    public void test1() throws Exception {
+        doTestWriteRead(1);
+    }
+
+    @Test
+    public void test2() throws Exception {
+        doTestWriteRead(new Random().nextInt(2));
+    }
+
+    @Test
+    public void test8() throws Exception {
+        doTestWriteRead(new Random().nextInt(8));
+    }
+
+    @Test
+    public void testRandom() throws Exception {
+        doTestWriteRead(new Random().nextInt(20));
+    }
+
+    @Test
+    public void testRestore() throws Exception {
+        List<TestRecord> expected = new ArrayList<>(writeBatch());
+        List<SstFileMeta> newFiles = writer.prepareCommit().newFiles();
+        writer = mergeTree.createWriter(newFiles);
+        expected.addAll(writeBatch());
+        writer.prepareCommit();
+        writer.sync();
+        assertRecords(expected);
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        doTestWriteRead(6);
+        List<SstFileMeta> files = writer.close();
+        for (SstFileMeta file : files) {
+            Path path = fileFactory.toPath(file.fileName());
+            assertThat(path.getFileSystem().exists(path)).isFalse();
+        }
+    }
+
+    @Test
+    public void testWriteMany() throws Exception {
+        doTestWriteRead(3, 20_000);
+    }
+
+    private void doTestWriteRead(int batchNumber) throws Exception {
+        doTestWriteRead(batchNumber, 200);
+    }
+
+    private void doTestWriteRead(int batchNumber, int perBatch) throws Exception {
+        List<TestRecord> expected = new ArrayList<>();
+        List<SstFileMeta> newFiles = new ArrayList<>();
+        Set<String> newFileNames = new HashSet<>();
+        List<SstFileMeta> compactedFiles = new ArrayList<>();
+
+        // write batch and commit
+        for (int i = 0; i <= batchNumber; i++) {
+            if (i < batchNumber) {
+                expected.addAll(writeBatch(perBatch));
+            } else {
+                writer.sync();
+            }
+
+            Increment increment = writer.prepareCommit();
+            newFiles.addAll(increment.newFiles());
+            increment.newFiles().stream().map(SstFileMeta::fileName).forEach(newFileNames::add);
+
+            // merge compacted
+            compactedFiles.addAll(increment.newFiles());
+            for (SstFileMeta file : increment.compactBefore()) {
+                boolean remove = compactedFiles.remove(file);
+                assertThat(remove).isTrue();
+                if (!newFileNames.contains(file.fileName())) {
+                    sstFile.delete(file);
+                }
+            }
+            compactedFiles.addAll(increment.compactAfter());
+        }
+
+        // assert records from writer
+        assertRecords(expected);
+
+        // assert records from increment new files
+        assertRecords(expected, newFiles, false);
+        assertRecords(expected, newFiles, true);
+
+        // assert records from increment compacted files
+        assertRecords(expected, compactedFiles, true);
+
+        Path bucketDir = fileFactory.toPath("ignore").getParent();
+        Set<String> files =
+                Arrays.stream(bucketDir.getFileSystem().listStatus(bucketDir))
+                        .map(FileStatus::getPath)
+                        .map(Path::getName)
+                        .collect(Collectors.toSet());
+        newFiles.stream().map(SstFileMeta::fileName).forEach(files::remove);
+        compactedFiles.stream().map(SstFileMeta::fileName).forEach(files::remove);
+        assertThat(files).isEqualTo(Collections.emptySet());
+    }
+
+    private List<TestRecord> writeBatch() throws Exception {
+        return writeBatch(200);
+    }
+
+    private List<TestRecord> writeBatch(int perBatch) throws Exception {
+        List<TestRecord> records = generateRandom(perBatch);
+        writeAll(records);
+        return records;
+    }
+
+    private void assertRecords(List<TestRecord> expected) throws Exception {
+        // compaction will drop delete
+        List<SstFileMeta> files = ((MergeTreeWriter) writer).levels().allFiles();
+        assertRecords(expected, files, true);
+    }
+
+    private void assertRecords(
+            List<TestRecord> expected, List<SstFileMeta> files, boolean dropDelete)
+            throws Exception {
+        assertThat(readAll(files, dropDelete)).isEqualTo(compactAndSort(expected, dropDelete));
+    }
+
+    private List<TestRecord> compactAndSort(List<TestRecord> records, boolean dropDelete) {
+        TreeMap<Integer, TestRecord> map = new TreeMap<>();
+        for (TestRecord record : records) {
+            map.put(record.k, record);
+        }
+        if (dropDelete) {
+            return map.values().stream()
+                    .filter(record -> record.kind == ValueKind.ADD)
+                    .collect(Collectors.toList());
+        }
+        return new ArrayList<>(map.values());
+    }
+
+    private void writeAll(List<TestRecord> records) throws Exception {
+        for (TestRecord record : records) {
+            writer.write(record.kind, row(record.k), row(record.v));
+        }
+    }
+
+    private List<TestRecord> readAll(List<SstFileMeta> files, boolean dropDelete) throws Exception {
+        RecordReader reader =
+                mergeTree.createReader(
+                        new IntervalPartition(files, comparator).partition(), dropDelete);
+        List<TestRecord> records = new ArrayList<>();
+        try (RecordReaderIterator iterator = new RecordReaderIterator(reader)) {
+            while (iterator.hasNext()) {
+                KeyValue kv = iterator.next();
+                records.add(
+                        new TestRecord(kv.valueKind(), kv.key().getInt(0), kv.value().getInt(0)));
+            }
+        }
+        return records;
+    }
+
+    private RowData row(int i) {
+        return GenericRowData.of(i);
+    }
+
+    private List<TestRecord> generateRandom(int perBatch) {
+        Random random = new Random();
+        List<TestRecord> records = new ArrayList<>(perBatch);
+        for (int i = 0; i < perBatch; i++) {
+            records.add(
+                    new TestRecord(
+                            random.nextBoolean() ? ValueKind.ADD : ValueKind.DELETE,
+                            random.nextInt(perBatch / 2),
+                            random.nextInt()));
+        }
+        return records;
+    }
+
+    private static class TestRecord {
+
+        private final ValueKind kind;
+        private final int k;
+        private final int v;
+
+        private TestRecord(ValueKind kind, int k, int v) {
+            this.kind = kind;
+            this.k = k;
+            this.v = v;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TestRecord that = (TestRecord) o;
+            return k == that.k && v == that.v && kind == that.kind;
+        }
+
+        @Override
+        public String toString() {
+            return "TestRecord{" + "kind=" + kind + ", k=" + k + ", v=" + v + '}';
+        }
+    }
+}