You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/13 10:44:58 UTC
[flink-table-store] 04/04: [FLINK-25630] Introduce MergeTree writer and reader
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 89a183f22007ad1b9afa9f7d9f2124d0e79fbae8
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Jan 13 17:43:20 2022 +0800
[FLINK-25630] Introduce MergeTree writer and reader
This closes #6
---
.../table/store/file/mergetree/MergeTree.java | 112 +++++++
.../store/file/mergetree/MergeTreeReader.java | 122 ++++++++
.../store/file/mergetree/MergeTreeWriter.java | 185 ++++++++++++
.../table/store/file/mergetree/MergeTreeTest.java | 323 +++++++++++++++++++++
4 files changed, 742 insertions(+)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTree.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTree.java
new file mode 100644
index 0000000..84142d4
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTree.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
+import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/** A merge tree, provides writer and reader, the granularity of the change is the file. */
+public class MergeTree {
+
+ private final MergeTreeOptions options;
+
+ private final SstFile sstFile;
+
+ private final Comparator<RowData> keyComparator;
+
+ private final ExecutorService compactExecutor;
+
+ private final Accumulator accumulator;
+
+ public MergeTree(
+ MergeTreeOptions options,
+ SstFile sstFile,
+ Comparator<RowData> keyComparator,
+ ExecutorService compactExecutor,
+ Accumulator accumulator) {
+ this.options = options;
+ this.sstFile = sstFile;
+ this.keyComparator = keyComparator;
+ this.compactExecutor = compactExecutor;
+ this.accumulator = accumulator;
+ }
+
+ /**
+ * Create {@link RecordWriter} from restored files. Some compaction of files may occur during
+ * the write process.
+ */
+ public RecordWriter createWriter(List<SstFileMeta> restoreFiles) {
+ long maxSequenceNumber =
+ restoreFiles.stream()
+ .map(SstFileMeta::maxSequenceNumber)
+ .max(Long::compare)
+ .orElse(-1L);
+ return new MergeTreeWriter(
+ new SortBufferMemTable(
+ sstFile.keyType(),
+ sstFile.valueType(),
+ options.writeBufferSize,
+ options.pageSize),
+ createCompactManager(),
+ new Levels(keyComparator, restoreFiles, options.numLevels),
+ maxSequenceNumber,
+ keyComparator,
+ accumulator.copy(),
+ sstFile,
+ options.commitForceCompact);
+ }
+
+ /**
+ * Create {@link RecordReader} from file sections. The caller can decide whether to drop the
+ * deletion record.
+ */
+ public RecordReader createReader(List<List<SortedRun>> sections, boolean dropDelete)
+ throws IOException {
+ return new MergeTreeReader(
+ sections, dropDelete, sstFile, keyComparator, accumulator.copy());
+ }
+
+ private CompactManager createCompactManager() {
+ CompactStrategy compactStrategy =
+ new UniversalCompaction(
+ options.maxSizeAmplificationPercent,
+ options.sizeRatio,
+ options.numSortedRunMax);
+ CompactManager.Rewriter rewriter =
+ (outputLevel, dropDelete, sections) ->
+ sstFile.write(
+ new RecordReaderIterator(createReader(sections, dropDelete)),
+ outputLevel);
+ return new CompactManager(
+ compactExecutor, compactStrategy, keyComparator, options.targetFileSize, rewriter);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
new file mode 100644
index 0000000..30edafb
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.ReaderSupplier;
+import org.apache.flink.table.store.file.mergetree.compact.SortMergeReader;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+/** A {@link RecordReader} to read merge tree sections. */
+public class MergeTreeReader implements RecordReader {
+
+ private final RecordReader reader;
+
+ private final boolean dropDelete;
+
+ public MergeTreeReader(
+ List<List<SortedRun>> sections,
+ boolean dropDelete,
+ SstFile sstFile,
+ Comparator<RowData> userKeyComparator,
+ Accumulator accumulator)
+ throws IOException {
+ this.dropDelete = dropDelete;
+
+ List<ReaderSupplier> readers = new ArrayList<>();
+ for (List<SortedRun> section : sections) {
+ readers.add(() -> readerForSection(section, sstFile, userKeyComparator, accumulator));
+ }
+ this.reader = ConcatRecordReader.create(readers);
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator readBatch() throws IOException {
+ RecordIterator batch = reader.readBatch();
+
+ if (!dropDelete) {
+ return batch;
+ }
+
+ if (batch == null) {
+ return null;
+ }
+
+ return new RecordIterator() {
+ @Override
+ public KeyValue next() throws IOException {
+ while (true) {
+ KeyValue kv = batch.next();
+ if (kv == null) {
+ return null;
+ }
+
+ if (kv.valueKind() == ValueKind.ADD) {
+ return kv;
+ }
+ }
+ }
+
+ @Override
+ public void releaseBatch() {
+ batch.releaseBatch();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ public static RecordReader readerForSection(
+ List<SortedRun> section,
+ SstFile sstFile,
+ Comparator<RowData> userKeyComparator,
+ Accumulator accumulator)
+ throws IOException {
+ List<RecordReader> readers = new ArrayList<>();
+ for (SortedRun run : section) {
+ readers.add(readerForRun(run, sstFile));
+ }
+ return SortMergeReader.create(readers, userKeyComparator, accumulator);
+ }
+
+ public static RecordReader readerForRun(SortedRun run, SstFile sstFile) throws IOException {
+ List<ReaderSupplier> readers = new ArrayList<>();
+ for (SstFileMeta file : run.files()) {
+ readers.add(() -> sstFile.read(file.fileName()));
+ }
+ return ConcatRecordReader.create(readers);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
new file mode 100644
index 0000000..5cb4a3d
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+/** A {@link RecordWriter} to write records and generate {@link Increment}. */
+public class MergeTreeWriter implements RecordWriter {
+
+ private final MemTable memTable;
+
+ private final CompactManager compactManager;
+
+ private final Levels levels;
+
+ private final Comparator<RowData> keyComparator;
+
+ private final Accumulator accumulator;
+
+ private final SstFile sstFile;
+
+ private final boolean commitForceCompact;
+
+ private final LinkedHashSet<SstFileMeta> newFiles;
+
+ private final LinkedHashSet<SstFileMeta> compactBefore;
+
+ private final LinkedHashSet<SstFileMeta> compactAfter;
+
+ private long newSequenceNumber;
+
+ public MergeTreeWriter(
+ MemTable memTable,
+ CompactManager compactManager,
+ Levels levels,
+ long maxSequenceNumber,
+ Comparator<RowData> keyComparator,
+ Accumulator accumulator,
+ SstFile sstFile,
+ boolean commitForceCompact) {
+ this.memTable = memTable;
+ this.compactManager = compactManager;
+ this.levels = levels;
+ this.newSequenceNumber = maxSequenceNumber + 1;
+ this.keyComparator = keyComparator;
+ this.accumulator = accumulator;
+ this.sstFile = sstFile;
+ this.commitForceCompact = commitForceCompact;
+ this.newFiles = new LinkedHashSet<>();
+ this.compactBefore = new LinkedHashSet<>();
+ this.compactAfter = new LinkedHashSet<>();
+ }
+
+ private long newSequenceNumber() {
+ return newSequenceNumber++;
+ }
+
+ @VisibleForTesting
+ Levels levels() {
+ return levels;
+ }
+
+ @Override
+ public void write(ValueKind valueKind, RowData key, RowData value) throws Exception {
+ long sequenceNumber = newSequenceNumber();
+ boolean success = memTable.put(sequenceNumber, valueKind, key, value);
+ if (!success) {
+ flush();
+ success = memTable.put(sequenceNumber, valueKind, key, value);
+ if (!success) {
+ throw new RuntimeException("Mem table is too small to hold a single element.");
+ }
+ }
+ }
+
+ private void flush() throws Exception {
+ if (memTable.size() > 0) {
+ finishCompaction();
+ Iterator<KeyValue> iterator = memTable.iterator(keyComparator, accumulator);
+ List<SstFileMeta> files =
+ sstFile.write(CloseableIterator.adapterForIterator(iterator), 0);
+ newFiles.addAll(files);
+ files.forEach(levels::addLevel0File);
+ memTable.clear();
+ submitCompaction();
+ }
+ }
+
+ @Override
+ public Increment prepareCommit() throws Exception {
+ flush();
+ if (commitForceCompact) {
+ finishCompaction();
+ }
+ return drainIncrement();
+ }
+
+ @Override
+ public void sync() throws Exception {
+ finishCompaction();
+ }
+
+ private Increment drainIncrement() {
+ Increment increment =
+ new Increment(
+ new ArrayList<>(newFiles),
+ new ArrayList<>(compactBefore),
+ new ArrayList<>(compactAfter));
+ newFiles.clear();
+ compactBefore.clear();
+ compactAfter.clear();
+ return increment;
+ }
+
+ private void updateCompactResult(CompactManager.CompactResult result) {
+ for (SstFileMeta file : result.before()) {
+ boolean removed = compactAfter.remove(file);
+ if (removed) {
+ // This is an intermediate file (not a new data file), which is no longer needed
+ // after compaction and can be deleted directly
+ sstFile.delete(file);
+ } else {
+ compactBefore.add(file);
+ }
+ }
+ compactAfter.addAll(result.after());
+ }
+
+ private void submitCompaction() {
+ compactManager.submitCompaction(levels);
+ }
+
+ private void finishCompaction() throws ExecutionException, InterruptedException {
+ Optional<CompactManager.CompactResult> result = compactManager.finishCompaction(levels);
+ if (result.isPresent()) {
+ updateCompactResult(result.get());
+ }
+ }
+
+ @Override
+ public List<SstFileMeta> close() {
+ // delete temporary files
+ List<SstFileMeta> delete = new ArrayList<>(newFiles);
+ delete.addAll(compactAfter);
+ for (SstFileMeta file : delete) {
+ sstFile.delete(file);
+ }
+ newFiles.clear();
+ compactAfter.clear();
+ return delete;
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
new file mode 100644
index 0000000..5c00ccc
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileTest;
+import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link MergeTree}. */
+public class MergeTreeTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private static ExecutorService service;
+
+ private SstPathFactory fileFactory;
+
+ private Comparator<RowData> comparator;
+
+ private SstFile sstFile;
+
+ private MergeTree mergeTree;
+
+ private RecordWriter writer;
+
+ @BeforeEach
+ public void beforeEach() throws IOException {
+ fileFactory = new SstPathFactory(new Path(tempDir.toString()), null, 123);
+ Path bucketDir = fileFactory.toPath("ignore").getParent();
+ bucketDir.getFileSystem().mkdirs(bucketDir);
+
+ comparator = Comparator.comparingInt(o -> o.getInt(0));
+ Configuration configuration = new Configuration();
+ configuration.set(MergeTreeOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+ configuration.set(MergeTreeOptions.PAGE_SIZE, new MemorySize(4096));
+ MergeTreeOptions options = new MergeTreeOptions(configuration);
+ sstFile =
+ new SstFile(
+ new RowType(singletonList(new RowType.RowField("k", new IntType()))),
+ new RowType(singletonList(new RowType.RowField("v", new IntType()))),
+ new SstFileTest.FlushingAvroFormat(),
+ fileFactory,
+ options.targetFileSize);
+ mergeTree =
+ new MergeTree(options, sstFile, comparator, service, new DeduplicateAccumulator());
+ writer = mergeTree.createWriter(new ArrayList<>());
+ }
+
+ @BeforeAll
+ public static void before() {
+ service = Executors.newSingleThreadExecutor();
+ }
+
+ @AfterAll
+ public static void after() {
+ service.shutdownNow();
+ service = null;
+ }
+
+ @Test
+ public void testEmpty() throws Exception {
+ doTestWriteRead(0);
+ }
+
+ @Test
+ public void test1() throws Exception {
+ doTestWriteRead(1);
+ }
+
+ @Test
+ public void test2() throws Exception {
+ doTestWriteRead(new Random().nextInt(2));
+ }
+
+ @Test
+ public void test8() throws Exception {
+ doTestWriteRead(new Random().nextInt(8));
+ }
+
+ @Test
+ public void testRandom() throws Exception {
+ doTestWriteRead(new Random().nextInt(20));
+ }
+
+ @Test
+ public void testRestore() throws Exception {
+ List<TestRecord> expected = new ArrayList<>(writeBatch());
+ List<SstFileMeta> newFiles = writer.prepareCommit().newFiles();
+ writer = mergeTree.createWriter(newFiles);
+ expected.addAll(writeBatch());
+ writer.prepareCommit();
+ writer.sync();
+ assertRecords(expected);
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ doTestWriteRead(6);
+ List<SstFileMeta> files = writer.close();
+ for (SstFileMeta file : files) {
+ Path path = fileFactory.toPath(file.fileName());
+ assertThat(path.getFileSystem().exists(path)).isFalse();
+ }
+ }
+
+ @Test
+ public void testWriteMany() throws Exception {
+ doTestWriteRead(3, 20_000);
+ }
+
+ private void doTestWriteRead(int batchNumber) throws Exception {
+ doTestWriteRead(batchNumber, 200);
+ }
+
+ private void doTestWriteRead(int batchNumber, int perBatch) throws Exception {
+ List<TestRecord> expected = new ArrayList<>();
+ List<SstFileMeta> newFiles = new ArrayList<>();
+ Set<String> newFileNames = new HashSet<>();
+ List<SstFileMeta> compactedFiles = new ArrayList<>();
+
+ // write batch and commit
+ for (int i = 0; i <= batchNumber; i++) {
+ if (i < batchNumber) {
+ expected.addAll(writeBatch(perBatch));
+ } else {
+ writer.sync();
+ }
+
+ Increment increment = writer.prepareCommit();
+ newFiles.addAll(increment.newFiles());
+ increment.newFiles().stream().map(SstFileMeta::fileName).forEach(newFileNames::add);
+
+ // merge compacted
+ compactedFiles.addAll(increment.newFiles());
+ for (SstFileMeta file : increment.compactBefore()) {
+ boolean remove = compactedFiles.remove(file);
+ assertThat(remove).isTrue();
+ if (!newFileNames.contains(file.fileName())) {
+ sstFile.delete(file);
+ }
+ }
+ compactedFiles.addAll(increment.compactAfter());
+ }
+
+ // assert records from writer
+ assertRecords(expected);
+
+ // assert records from increment new files
+ assertRecords(expected, newFiles, false);
+ assertRecords(expected, newFiles, true);
+
+ // assert records from increment compacted files
+ assertRecords(expected, compactedFiles, true);
+
+ Path bucketDir = fileFactory.toPath("ignore").getParent();
+ Set<String> files =
+ Arrays.stream(bucketDir.getFileSystem().listStatus(bucketDir))
+ .map(FileStatus::getPath)
+ .map(Path::getName)
+ .collect(Collectors.toSet());
+ newFiles.stream().map(SstFileMeta::fileName).forEach(files::remove);
+ compactedFiles.stream().map(SstFileMeta::fileName).forEach(files::remove);
+ assertThat(files).isEqualTo(Collections.emptySet());
+ }
+
+ private List<TestRecord> writeBatch() throws Exception {
+ return writeBatch(200);
+ }
+
+ private List<TestRecord> writeBatch(int perBatch) throws Exception {
+ List<TestRecord> records = generateRandom(perBatch);
+ writeAll(records);
+ return records;
+ }
+
+ private void assertRecords(List<TestRecord> expected) throws Exception {
+ // compaction will drop delete
+ List<SstFileMeta> files = ((MergeTreeWriter) writer).levels().allFiles();
+ assertRecords(expected, files, true);
+ }
+
+ private void assertRecords(
+ List<TestRecord> expected, List<SstFileMeta> files, boolean dropDelete)
+ throws Exception {
+ assertThat(readAll(files, dropDelete)).isEqualTo(compactAndSort(expected, dropDelete));
+ }
+
+ private List<TestRecord> compactAndSort(List<TestRecord> records, boolean dropDelete) {
+ TreeMap<Integer, TestRecord> map = new TreeMap<>();
+ for (TestRecord record : records) {
+ map.put(record.k, record);
+ }
+ if (dropDelete) {
+ return map.values().stream()
+ .filter(record -> record.kind == ValueKind.ADD)
+ .collect(Collectors.toList());
+ }
+ return new ArrayList<>(map.values());
+ }
+
+ private void writeAll(List<TestRecord> records) throws Exception {
+ for (TestRecord record : records) {
+ writer.write(record.kind, row(record.k), row(record.v));
+ }
+ }
+
+ private List<TestRecord> readAll(List<SstFileMeta> files, boolean dropDelete) throws Exception {
+ RecordReader reader =
+ mergeTree.createReader(
+ new IntervalPartition(files, comparator).partition(), dropDelete);
+ List<TestRecord> records = new ArrayList<>();
+ try (RecordReaderIterator iterator = new RecordReaderIterator(reader)) {
+ while (iterator.hasNext()) {
+ KeyValue kv = iterator.next();
+ records.add(
+ new TestRecord(kv.valueKind(), kv.key().getInt(0), kv.value().getInt(0)));
+ }
+ }
+ return records;
+ }
+
+ private RowData row(int i) {
+ return GenericRowData.of(i);
+ }
+
+ private List<TestRecord> generateRandom(int perBatch) {
+ Random random = new Random();
+ List<TestRecord> records = new ArrayList<>(perBatch);
+ for (int i = 0; i < perBatch; i++) {
+ records.add(
+ new TestRecord(
+ random.nextBoolean() ? ValueKind.ADD : ValueKind.DELETE,
+ random.nextInt(perBatch / 2),
+ random.nextInt()));
+ }
+ return records;
+ }
+
+ private static class TestRecord {
+
+ private final ValueKind kind;
+ private final int k;
+ private final int v;
+
+ private TestRecord(ValueKind kind, int k, int v) {
+ this.kind = kind;
+ this.k = k;
+ this.v = v;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestRecord that = (TestRecord) o;
+ return k == that.k && v == that.v && kind == that.kind;
+ }
+
+ @Override
+ public String toString() {
+ return "TestRecord{" + "kind=" + kind + ", k=" + k + ", v=" + v + '}';
+ }
+ }
+}