You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/13 10:44:54 UTC

[flink-table-store] branch master updated (ac9d902 -> 89a183f)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git.


    from ac9d902  [FLINK-25629] Introduce CompactManager
     new 49e4762  [FLINK-25630] Add LevelsTest
     new c94dfa8  [FLINK-25630] Introduce RecordWriter
     new 58d50fb  [FLINK-25630] Introduce MergeTreeOptions
     new 89a183f  [FLINK-25630] Introduce MergeTree writer and reader

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/store/file/mergetree/Increment.java      |  63 ++++
 .../table/store/file/mergetree/MergeTree.java      | 112 +++++++
 .../store/file/mergetree/MergeTreeOptions.java     | 132 +++++++++
 .../store/file/mergetree/MergeTreeReader.java      | 122 ++++++++
 .../store/file/mergetree/MergeTreeWriter.java      | 185 ++++++++++++
 .../flink/table/store/file/utils/RecordWriter.java |  57 ++++
 .../table/store/file/mergetree/LevelsTest.java     |  67 +++++
 .../table/store/file/mergetree/MergeTreeTest.java  | 323 +++++++++++++++++++++
 8 files changed, 1061 insertions(+)
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTree.java
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
 create mode 100644 flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
 create mode 100644 flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java

[flink-table-store] 02/04: [FLINK-25630] Introduce RecordWriter

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit c94dfa8191bc39bf57e05b1e37840a3e0e4cd43d
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Jan 13 17:38:15 2022 +0800

    [FLINK-25630] Introduce RecordWriter
---
 .../table/store/file/mergetree/Increment.java      | 63 ++++++++++++++++++++++
 .../flink/table/store/file/utils/RecordWriter.java | 57 ++++++++++++++++++++
 2 files changed, 120 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
new file mode 100644
index 0000000..c8afffe
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Incremental files for merge tree. It consists of two parts:
+ *
+ * <ul>
+ *   <li>New files: The new files generated in this snapshot cycle. They must be committed.
+ *   <li>Compact files: The {@link #compactBefore} files are compacted to {@link #compactAfter}
+ *       files in this snapshot cycle. The compaction is an optimization of files.
+ * </ul>
+ */
+public class Increment {
+
+    private final List<SstFileMeta> newFiles;
+
+    private final List<SstFileMeta> compactBefore;
+
+    private final List<SstFileMeta> compactAfter;
+
+    public Increment(
+            List<SstFileMeta> newFiles,
+            List<SstFileMeta> beCompacted,
+            List<SstFileMeta> compacted) {
+        this.newFiles = Collections.unmodifiableList(newFiles);
+        this.compactBefore = Collections.unmodifiableList(beCompacted);
+        this.compactAfter = Collections.unmodifiableList(compacted);
+    }
+
+    public List<SstFileMeta> newFiles() {
+        return newFiles;
+    }
+
+    public List<SstFileMeta> compactBefore() {
+        return compactBefore;
+    }
+
+    public List<SstFileMeta> compactAfter() {
+        return compactAfter;
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
new file mode 100644
index 0000000..4047301
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import java.util.List;
+
+/**
+ * The {@code RecordWriter} is responsible for writing data and handling in-progress files used to
+ * write yet un-staged data. The incremental files ready to commit is returned to the system by the
+ * {@link #prepareCommit()}.
+ */
+public interface RecordWriter {
+
+    /** Add a key-value element to the writer. */
+    void write(ValueKind valueKind, RowData key, RowData value) throws Exception;
+
+    /**
+     * Prepare for a commit.
+     *
+     * @return Incremental files in this snapshot cycle
+     */
+    Increment prepareCommit() throws Exception;
+
+    /**
+     * Sync the writer. The structure related to file reading and writing is thread unsafe, there
+     * are asynchronous threads inside the writer, which should be synced before reading data.
+     */
+    void sync() throws Exception;
+
+    /**
+     * Close this writer, the call will delete newly generated but not committed files.
+     *
+     * @return Deleted files.
+     */
+    List<SstFileMeta> close() throws Exception;
+}

[flink-table-store] 01/04: [FLINK-25630] Add LevelsTest

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 49e476251ed8aeea331f171e55bdb8a8f27214d8
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Jan 13 17:44:36 2022 +0800

    [FLINK-25630] Add LevelsTest
---
 .../table/store/file/mergetree/LevelsTest.java     | 67 ++++++++++++++++++++++
 1 file changed, 67 insertions(+)

diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
new file mode 100644
index 0000000..3f6a89b
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+
+import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link Levels}. */
+public class LevelsTest {
+
+    private final Comparator<RowData> comparator = Comparator.comparingInt(o -> o.getInt(0));
+
+    @Test
+    public void testNonEmptyHighestLevelNo() {
+        Levels levels = new Levels(comparator, Collections.emptyList(), 3);
+        assertThat(levels.nonEmptyHighestLevel()).isEqualTo(-1);
+    }
+
+    @Test
+    public void testNonEmptyHighestLevel0() {
+
+        Levels levels = new Levels(comparator, Arrays.asList(newFile(0), newFile(0)), 3);
+        assertThat(levels.nonEmptyHighestLevel()).isEqualTo(0);
+    }
+
+    @Test
+    public void testNonEmptyHighestLevel1() {
+        Levels levels = new Levels(comparator, Arrays.asList(newFile(0), newFile(1)), 3);
+        assertThat(levels.nonEmptyHighestLevel()).isEqualTo(1);
+    }
+
+    @Test
+    public void testNonEmptyHighestLevel2() {
+        Levels levels =
+                new Levels(comparator, Arrays.asList(newFile(0), newFile(1), newFile(2)), 3);
+        assertThat(levels.nonEmptyHighestLevel()).isEqualTo(2);
+    }
+
+    public static SstFileMeta newFile(int level) {
+        return new SstFileMeta("", 0, 1, row(0), row(0), null, 0, 1, level);
+    }
+}

[flink-table-store] 04/04: [FLINK-25630] Introduce MergeTree writer and reader

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 89a183f22007ad1b9afa9f7d9f2124d0e79fbae8
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Jan 13 17:43:20 2022 +0800

    [FLINK-25630] Introduce MergeTree writer and reader
    
    This closes #6
---
 .../table/store/file/mergetree/MergeTree.java      | 112 +++++++
 .../store/file/mergetree/MergeTreeReader.java      | 122 ++++++++
 .../store/file/mergetree/MergeTreeWriter.java      | 185 ++++++++++++
 .../table/store/file/mergetree/MergeTreeTest.java  | 323 +++++++++++++++++++++
 4 files changed, 742 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTree.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTree.java
new file mode 100644
index 0000000..84142d4
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTree.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
+import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/** A merge tree, provides writer and reader, the granularity of the change is the file. */
+public class MergeTree {
+
+    private final MergeTreeOptions options;
+
+    private final SstFile sstFile;
+
+    private final Comparator<RowData> keyComparator;
+
+    private final ExecutorService compactExecutor;
+
+    private final Accumulator accumulator;
+
+    public MergeTree(
+            MergeTreeOptions options,
+            SstFile sstFile,
+            Comparator<RowData> keyComparator,
+            ExecutorService compactExecutor,
+            Accumulator accumulator) {
+        this.options = options;
+        this.sstFile = sstFile;
+        this.keyComparator = keyComparator;
+        this.compactExecutor = compactExecutor;
+        this.accumulator = accumulator;
+    }
+
+    /**
+     * Create {@link RecordWriter} from restored files. Some compaction of files may occur during
+     * the write process.
+     */
+    public RecordWriter createWriter(List<SstFileMeta> restoreFiles) {
+        long maxSequenceNumber =
+                restoreFiles.stream()
+                        .map(SstFileMeta::maxSequenceNumber)
+                        .max(Long::compare)
+                        .orElse(-1L);
+        return new MergeTreeWriter(
+                new SortBufferMemTable(
+                        sstFile.keyType(),
+                        sstFile.valueType(),
+                        options.writeBufferSize,
+                        options.pageSize),
+                createCompactManager(),
+                new Levels(keyComparator, restoreFiles, options.numLevels),
+                maxSequenceNumber,
+                keyComparator,
+                accumulator.copy(),
+                sstFile,
+                options.commitForceCompact);
+    }
+
+    /**
+     * Create {@link RecordReader} from file sections. The caller can decide whether to drop the
+     * deletion record.
+     */
+    public RecordReader createReader(List<List<SortedRun>> sections, boolean dropDelete)
+            throws IOException {
+        return new MergeTreeReader(
+                sections, dropDelete, sstFile, keyComparator, accumulator.copy());
+    }
+
+    private CompactManager createCompactManager() {
+        CompactStrategy compactStrategy =
+                new UniversalCompaction(
+                        options.maxSizeAmplificationPercent,
+                        options.sizeRatio,
+                        options.numSortedRunMax);
+        CompactManager.Rewriter rewriter =
+                (outputLevel, dropDelete, sections) ->
+                        sstFile.write(
+                                new RecordReaderIterator(createReader(sections, dropDelete)),
+                                outputLevel);
+        return new CompactManager(
+                compactExecutor, compactStrategy, keyComparator, options.targetFileSize, rewriter);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
new file mode 100644
index 0000000..30edafb
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.ReaderSupplier;
+import org.apache.flink.table.store.file.mergetree.compact.SortMergeReader;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+/** A {@link RecordReader} to read merge tree sections. */
+public class MergeTreeReader implements RecordReader {
+
+    private final RecordReader reader;
+
+    private final boolean dropDelete;
+
+    public MergeTreeReader(
+            List<List<SortedRun>> sections,
+            boolean dropDelete,
+            SstFile sstFile,
+            Comparator<RowData> userKeyComparator,
+            Accumulator accumulator)
+            throws IOException {
+        this.dropDelete = dropDelete;
+
+        List<ReaderSupplier> readers = new ArrayList<>();
+        for (List<SortedRun> section : sections) {
+            readers.add(() -> readerForSection(section, sstFile, userKeyComparator, accumulator));
+        }
+        this.reader = ConcatRecordReader.create(readers);
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator readBatch() throws IOException {
+        RecordIterator batch = reader.readBatch();
+
+        if (!dropDelete) {
+            return batch;
+        }
+
+        if (batch == null) {
+            return null;
+        }
+
+        return new RecordIterator() {
+            @Override
+            public KeyValue next() throws IOException {
+                while (true) {
+                    KeyValue kv = batch.next();
+                    if (kv == null) {
+                        return null;
+                    }
+
+                    if (kv.valueKind() == ValueKind.ADD) {
+                        return kv;
+                    }
+                }
+            }
+
+            @Override
+            public void releaseBatch() {
+                batch.releaseBatch();
+            }
+        };
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    public static RecordReader readerForSection(
+            List<SortedRun> section,
+            SstFile sstFile,
+            Comparator<RowData> userKeyComparator,
+            Accumulator accumulator)
+            throws IOException {
+        List<RecordReader> readers = new ArrayList<>();
+        for (SortedRun run : section) {
+            readers.add(readerForRun(run, sstFile));
+        }
+        return SortMergeReader.create(readers, userKeyComparator, accumulator);
+    }
+
+    public static RecordReader readerForRun(SortedRun run, SstFile sstFile) throws IOException {
+        List<ReaderSupplier> readers = new ArrayList<>();
+        for (SstFileMeta file : run.files()) {
+            readers.add(() -> sstFile.read(file.fileName()));
+        }
+        return ConcatRecordReader.create(readers);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
new file mode 100644
index 0000000..5cb4a3d
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+/** A {@link RecordWriter} to write records and generate {@link Increment}. */
+public class MergeTreeWriter implements RecordWriter {
+
+    private final MemTable memTable;
+
+    private final CompactManager compactManager;
+
+    private final Levels levels;
+
+    private final Comparator<RowData> keyComparator;
+
+    private final Accumulator accumulator;
+
+    private final SstFile sstFile;
+
+    private final boolean commitForceCompact;
+
+    private final LinkedHashSet<SstFileMeta> newFiles;
+
+    private final LinkedHashSet<SstFileMeta> compactBefore;
+
+    private final LinkedHashSet<SstFileMeta> compactAfter;
+
+    private long newSequenceNumber;
+
+    public MergeTreeWriter(
+            MemTable memTable,
+            CompactManager compactManager,
+            Levels levels,
+            long maxSequenceNumber,
+            Comparator<RowData> keyComparator,
+            Accumulator accumulator,
+            SstFile sstFile,
+            boolean commitForceCompact) {
+        this.memTable = memTable;
+        this.compactManager = compactManager;
+        this.levels = levels;
+        this.newSequenceNumber = maxSequenceNumber + 1;
+        this.keyComparator = keyComparator;
+        this.accumulator = accumulator;
+        this.sstFile = sstFile;
+        this.commitForceCompact = commitForceCompact;
+        this.newFiles = new LinkedHashSet<>();
+        this.compactBefore = new LinkedHashSet<>();
+        this.compactAfter = new LinkedHashSet<>();
+    }
+
+    private long newSequenceNumber() {
+        return newSequenceNumber++;
+    }
+
+    @VisibleForTesting
+    Levels levels() {
+        return levels;
+    }
+
+    @Override
+    public void write(ValueKind valueKind, RowData key, RowData value) throws Exception {
+        long sequenceNumber = newSequenceNumber();
+        boolean success = memTable.put(sequenceNumber, valueKind, key, value);
+        if (!success) {
+            flush();
+            success = memTable.put(sequenceNumber, valueKind, key, value);
+            if (!success) {
+                throw new RuntimeException("Mem table is too small to hold a single element.");
+            }
+        }
+    }
+
+    private void flush() throws Exception {
+        if (memTable.size() > 0) {
+            finishCompaction();
+            Iterator<KeyValue> iterator = memTable.iterator(keyComparator, accumulator);
+            List<SstFileMeta> files =
+                    sstFile.write(CloseableIterator.adapterForIterator(iterator), 0);
+            newFiles.addAll(files);
+            files.forEach(levels::addLevel0File);
+            memTable.clear();
+            submitCompaction();
+        }
+    }
+
+    @Override
+    public Increment prepareCommit() throws Exception {
+        flush();
+        if (commitForceCompact) {
+            finishCompaction();
+        }
+        return drainIncrement();
+    }
+
+    @Override
+    public void sync() throws Exception {
+        finishCompaction();
+    }
+
+    private Increment drainIncrement() {
+        Increment increment =
+                new Increment(
+                        new ArrayList<>(newFiles),
+                        new ArrayList<>(compactBefore),
+                        new ArrayList<>(compactAfter));
+        newFiles.clear();
+        compactBefore.clear();
+        compactAfter.clear();
+        return increment;
+    }
+
+    private void updateCompactResult(CompactManager.CompactResult result) {
+        for (SstFileMeta file : result.before()) {
+            boolean removed = compactAfter.remove(file);
+            if (removed) {
+                // This is an intermediate file (not a new data file), which is no longer needed
+                // after compaction and can be deleted directly
+                sstFile.delete(file);
+            } else {
+                compactBefore.add(file);
+            }
+        }
+        compactAfter.addAll(result.after());
+    }
+
+    private void submitCompaction() {
+        compactManager.submitCompaction(levels);
+    }
+
+    private void finishCompaction() throws ExecutionException, InterruptedException {
+        Optional<CompactManager.CompactResult> result = compactManager.finishCompaction(levels);
+        if (result.isPresent()) {
+            updateCompactResult(result.get());
+        }
+    }
+
+    @Override
+    public List<SstFileMeta> close() {
+        // delete temporary files
+        List<SstFileMeta> delete = new ArrayList<>(newFiles);
+        delete.addAll(compactAfter);
+        for (SstFileMeta file : delete) {
+            sstFile.delete(file);
+        }
+        newFiles.clear();
+        compactAfter.clear();
+        return delete;
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
new file mode 100644
index 0000000..5c00ccc
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileTest;
+import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link MergeTree}. */
+public class MergeTreeTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private static ExecutorService service;
+
+    private SstPathFactory fileFactory;
+
+    private Comparator<RowData> comparator;
+
+    private SstFile sstFile;
+
+    private MergeTree mergeTree;
+
+    private RecordWriter writer;
+
+    @BeforeEach
+    public void beforeEach() throws IOException {
+        fileFactory = new SstPathFactory(new Path(tempDir.toString()), null, 123);
+        Path bucketDir = fileFactory.toPath("ignore").getParent();
+        bucketDir.getFileSystem().mkdirs(bucketDir);
+
+        comparator = Comparator.comparingInt(o -> o.getInt(0));
+        Configuration configuration = new Configuration();
+        configuration.set(MergeTreeOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+        configuration.set(MergeTreeOptions.PAGE_SIZE, new MemorySize(4096));
+        MergeTreeOptions options = new MergeTreeOptions(configuration);
+        sstFile =
+                new SstFile(
+                        new RowType(singletonList(new RowType.RowField("k", new IntType()))),
+                        new RowType(singletonList(new RowType.RowField("v", new IntType()))),
+                        new SstFileTest.FlushingAvroFormat(),
+                        fileFactory,
+                        options.targetFileSize);
+        mergeTree =
+                new MergeTree(options, sstFile, comparator, service, new DeduplicateAccumulator());
+        writer = mergeTree.createWriter(new ArrayList<>());
+    }
+
+    @BeforeAll
+    public static void before() {
+        service = Executors.newSingleThreadExecutor();
+    }
+
+    @AfterAll
+    public static void after() {
+        service.shutdownNow();
+        service = null;
+    }
+
+    @Test
+    public void testEmpty() throws Exception {
+        doTestWriteRead(0);
+    }
+
+    @Test
+    public void test1() throws Exception {
+        doTestWriteRead(1);
+    }
+
+    @Test
+    public void test2() throws Exception {
+        doTestWriteRead(new Random().nextInt(2));
+    }
+
+    @Test
+    public void test8() throws Exception {
+        doTestWriteRead(new Random().nextInt(8));
+    }
+
+    @Test
+    public void testRandom() throws Exception {
+        doTestWriteRead(new Random().nextInt(20));
+    }
+
+    @Test
+    public void testRestore() throws Exception {
+        List<TestRecord> expected = new ArrayList<>(writeBatch());
+        List<SstFileMeta> newFiles = writer.prepareCommit().newFiles();
+        writer = mergeTree.createWriter(newFiles);
+        expected.addAll(writeBatch());
+        writer.prepareCommit();
+        writer.sync();
+        assertRecords(expected);
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        doTestWriteRead(6);
+        List<SstFileMeta> files = writer.close();
+        for (SstFileMeta file : files) {
+            Path path = fileFactory.toPath(file.fileName());
+            assertThat(path.getFileSystem().exists(path)).isFalse();
+        }
+    }
+
+    @Test
+    public void testWriteMany() throws Exception {
+        doTestWriteRead(3, 20_000);
+    }
+
+    private void doTestWriteRead(int batchNumber) throws Exception {
+        doTestWriteRead(batchNumber, 200);
+    }
+
+    private void doTestWriteRead(int batchNumber, int perBatch) throws Exception {
+        List<TestRecord> expected = new ArrayList<>();
+        List<SstFileMeta> newFiles = new ArrayList<>();
+        Set<String> newFileNames = new HashSet<>();
+        List<SstFileMeta> compactedFiles = new ArrayList<>();
+
+        // write batch and commit
+        for (int i = 0; i <= batchNumber; i++) {
+            if (i < batchNumber) {
+                expected.addAll(writeBatch(perBatch));
+            } else {
+                writer.sync();
+            }
+
+            Increment increment = writer.prepareCommit();
+            newFiles.addAll(increment.newFiles());
+            increment.newFiles().stream().map(SstFileMeta::fileName).forEach(newFileNames::add);
+
+            // merge compacted
+            compactedFiles.addAll(increment.newFiles());
+            for (SstFileMeta file : increment.compactBefore()) {
+                boolean remove = compactedFiles.remove(file);
+                assertThat(remove).isTrue();
+                if (!newFileNames.contains(file.fileName())) {
+                    sstFile.delete(file);
+                }
+            }
+            compactedFiles.addAll(increment.compactAfter());
+        }
+
+        // assert records from writer
+        assertRecords(expected);
+
+        // assert records from increment new files
+        assertRecords(expected, newFiles, false);
+        assertRecords(expected, newFiles, true);
+
+        // assert records from increment compacted files
+        assertRecords(expected, compactedFiles, true);
+
+        Path bucketDir = fileFactory.toPath("ignore").getParent();
+        Set<String> files =
+                Arrays.stream(bucketDir.getFileSystem().listStatus(bucketDir))
+                        .map(FileStatus::getPath)
+                        .map(Path::getName)
+                        .collect(Collectors.toSet());
+        newFiles.stream().map(SstFileMeta::fileName).forEach(files::remove);
+        compactedFiles.stream().map(SstFileMeta::fileName).forEach(files::remove);
+        assertThat(files).isEqualTo(Collections.emptySet());
+    }
+
+    private List<TestRecord> writeBatch() throws Exception {
+        return writeBatch(200);
+    }
+
+    private List<TestRecord> writeBatch(int perBatch) throws Exception {
+        List<TestRecord> records = generateRandom(perBatch);
+        writeAll(records);
+        return records;
+    }
+
+    private void assertRecords(List<TestRecord> expected) throws Exception {
+        // compaction will drop delete
+        List<SstFileMeta> files = ((MergeTreeWriter) writer).levels().allFiles();
+        assertRecords(expected, files, true);
+    }
+
+    private void assertRecords(
+            List<TestRecord> expected, List<SstFileMeta> files, boolean dropDelete)
+            throws Exception {
+        assertThat(readAll(files, dropDelete)).isEqualTo(compactAndSort(expected, dropDelete));
+    }
+
+    private List<TestRecord> compactAndSort(List<TestRecord> records, boolean dropDelete) {
+        TreeMap<Integer, TestRecord> map = new TreeMap<>();
+        for (TestRecord record : records) {
+            map.put(record.k, record);
+        }
+        if (dropDelete) {
+            return map.values().stream()
+                    .filter(record -> record.kind == ValueKind.ADD)
+                    .collect(Collectors.toList());
+        }
+        return new ArrayList<>(map.values());
+    }
+
+    private void writeAll(List<TestRecord> records) throws Exception {
+        for (TestRecord record : records) {
+            writer.write(record.kind, row(record.k), row(record.v));
+        }
+    }
+
+    private List<TestRecord> readAll(List<SstFileMeta> files, boolean dropDelete) throws Exception {
+        RecordReader reader =
+                mergeTree.createReader(
+                        new IntervalPartition(files, comparator).partition(), dropDelete);
+        List<TestRecord> records = new ArrayList<>();
+        try (RecordReaderIterator iterator = new RecordReaderIterator(reader)) {
+            while (iterator.hasNext()) {
+                KeyValue kv = iterator.next();
+                records.add(
+                        new TestRecord(kv.valueKind(), kv.key().getInt(0), kv.value().getInt(0)));
+            }
+        }
+        return records;
+    }
+
+    private RowData row(int i) {
+        return GenericRowData.of(i);
+    }
+
+    private List<TestRecord> generateRandom(int perBatch) {
+        Random random = new Random();
+        List<TestRecord> records = new ArrayList<>(perBatch);
+        for (int i = 0; i < perBatch; i++) {
+            records.add(
+                    new TestRecord(
+                            random.nextBoolean() ? ValueKind.ADD : ValueKind.DELETE,
+                            random.nextInt(perBatch / 2),
+                            random.nextInt()));
+        }
+        return records;
+    }
+
+    private static class TestRecord {
+
+        private final ValueKind kind;
+        private final int k;
+        private final int v;
+
+        private TestRecord(ValueKind kind, int k, int v) {
+            this.kind = kind;
+            this.k = k;
+            this.v = v;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TestRecord that = (TestRecord) o;
+            return k == that.k && v == that.v && kind == that.kind;
+        }
+
+        @Override
+        public String toString() {
+            return "TestRecord{" + "kind=" + kind + ", k=" + k + ", v=" + v + '}';
+        }
+    }
+}

[flink-table-store] 03/04: [FLINK-25630] Introduce MergeTreeOptions

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 58d50fb35e37dd4f8482df4fd83de06b3d211791
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Jan 13 17:38:47 2022 +0800

    [FLINK-25630] Introduce MergeTreeOptions
---
 .../store/file/mergetree/MergeTreeOptions.java     | 132 +++++++++++++++++++++
 1 file changed, 132 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
new file mode 100644
index 0000000..d6d1ad9
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Options for merge tree. */
+public class MergeTreeOptions {
+
+    public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
+            ConfigOptions.key("write-buffer-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("64 mb"))
+                    .withDescription(
+                            "Amount of data to build up in memory before converting to a sorted on-disk file.");
+
+    public static final ConfigOption<MemorySize> PAGE_SIZE =
+            ConfigOptions.key("page-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("1 mb"))
+                    .withDescription("Memory page size.");
+
+    public static final ConfigOption<MemorySize> TARGET_FILE_SIZE =
+            ConfigOptions.key("target-file-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(128))
+                    .withDescription("Target size of a file.");
+
+    public static final ConfigOption<Integer> NUM_SORTED_RUNS_MAX =
+            ConfigOptions.key("num-sorted-run.max")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The max sorted run number. Includes level0 files (one file one sorted run) and "
+                                    + "high-level runs (one level one sorted run).");
+
+    public static final ConfigOption<Integer> NUM_LEVELS =
+            ConfigOptions.key("num-levels")
+                    .intType()
+                    .defaultValue(4)
+                    .withDescription(
+                            "Total level number, for example, there are 3 levels, including 0,1,2 levels.");
+
+    public static final ConfigOption<Boolean> COMMIT_FORCE_COMPACT =
+            ConfigOptions.key("commit.force-compact")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to force a compaction before commit.");
+
+    public static final ConfigOption<Integer> COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT =
+            ConfigOptions.key("compaction.max-size-amplification-percent")
+                    .intType()
+                    .defaultValue(200)
+                    .withDescription(
+                            "The size amplification is defined as the amount (in percentage) of additional storage "
+                                    + "needed to store a single byte of data in the merge tree.");
+
+    public static final ConfigOption<Integer> COMPACTION_SIZE_RATIO =
+            ConfigOptions.key("compaction.size-ratio")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Percentage flexibility while comparing sorted run size. If the candidate sorted run(s) "
+                                    + "size is 1% smaller than the next sorted run's size, then include next sorted run "
+                                    + "into this candidate set.");
+
+    public final long writeBufferSize;
+
+    public final long pageSize;
+
+    public final long targetFileSize;
+
+    public final int numSortedRunMax;
+
+    public final int numLevels;
+
+    public final boolean commitForceCompact;
+
+    public final int maxSizeAmplificationPercent;
+
+    public final int sizeRatio;
+
+    public MergeTreeOptions(
+            long writeBufferSize,
+            long pageSize,
+            long targetFileSize,
+            int numSortedRunMax,
+            int numLevels,
+            boolean commitForceCompact,
+            int maxSizeAmplificationPercent,
+            int sizeRatio) {
+        this.writeBufferSize = writeBufferSize;
+        this.pageSize = pageSize;
+        this.targetFileSize = targetFileSize;
+        this.numSortedRunMax = numSortedRunMax;
+        this.numLevels = numLevels;
+        this.commitForceCompact = commitForceCompact;
+        this.maxSizeAmplificationPercent = maxSizeAmplificationPercent;
+        this.sizeRatio = sizeRatio;
+    }
+
+    public MergeTreeOptions(ReadableConfig config) {
+        this(
+                config.get(WRITE_BUFFER_SIZE).getBytes(),
+                config.get(PAGE_SIZE).getBytes(),
+                config.get(TARGET_FILE_SIZE).getBytes(),
+                config.get(NUM_SORTED_RUNS_MAX),
+                config.get(NUM_LEVELS),
+                config.get(COMMIT_FORCE_COMPACT),
+                config.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT),
+                config.get(COMPACTION_SIZE_RATIO));
+    }
+}