You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/13 09:30:18 UTC

[flink-table-store] 01/03: [FLINK-25629] Introduce IntervalPartition

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 7a9bca33d7189d157c39401139a988181348a909
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Jan 13 17:00:48 2022 +0800

    [FLINK-25629] Introduce IntervalPartition
    
    Co-authored-by: tsreaper <ts...@gmail.com>
---
 .../table/store/file/mergetree/SortedRun.java      | 112 ++++++++++++
 .../file/mergetree/compact/IntervalPartition.java  | 126 ++++++++++++++
 .../mergetree/compact/IntervalPartitionTest.java   | 191 +++++++++++++++++++++
 3 files changed, 429 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortedRun.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortedRun.java
new file mode 100644
index 0000000..824bc3d
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortedRun.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link SortedRun} is a list of files sorted by their keys. The key intervals [minKey, maxKey]
+ * of these files do not overlap.
+ */
+public class SortedRun {
+
+    private final List<SstFileMeta> files;
+
+    private final long totalSize;
+
+    private SortedRun(List<SstFileMeta> files) {
+        this.files = Collections.unmodifiableList(files);
+        long totalSize = 0L;
+        for (SstFileMeta file : files) {
+            totalSize += file.fileSize();
+        }
+        this.totalSize = totalSize;
+    }
+
+    public static SortedRun empty() {
+        return new SortedRun(Collections.emptyList());
+    }
+
+    public static SortedRun fromSingle(SstFileMeta file) {
+        return new SortedRun(Collections.singletonList(file));
+    }
+
+    public static SortedRun fromSorted(List<SstFileMeta> sortedFiles) {
+        return new SortedRun(sortedFiles);
+    }
+
+    public static SortedRun fromUnsorted(
+            List<SstFileMeta> unsortedFiles, Comparator<RowData> keyComparator) {
+        unsortedFiles.sort((o1, o2) -> keyComparator.compare(o1.minKey(), o2.minKey()));
+        SortedRun run = new SortedRun(unsortedFiles);
+        run.validate(keyComparator);
+        return run;
+    }
+
+    public List<SstFileMeta> files() {
+        return files;
+    }
+
+    public boolean nonEmpty() {
+        return !files.isEmpty();
+    }
+
+    public long totalSize() {
+        return totalSize;
+    }
+
+    @VisibleForTesting
+    public void validate(Comparator<RowData> comparator) {
+        for (int i = 1; i < files.size(); i++) {
+            Preconditions.checkState(
+                    comparator.compare(files.get(i).minKey(), files.get(i - 1).maxKey()) > 0,
+                    "SortedRun is not sorted and may contain overlapping key intervals. This is a bug.");
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof SortedRun)) {
+            return false;
+        }
+        SortedRun that = (SortedRun) o;
+        return files.equals(that.files);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(files);
+    }
+
+    @Override
+    public String toString() {
+        return "["
+                + files.stream().map(SstFileMeta::toString).collect(Collectors.joining(", "))
+                + "]";
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartition.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartition.java
new file mode 100644
index 0000000..735246f
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartition.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+
+/** Algorithm to partition several sst files into the minimum number of {@link SortedRun}s. */
+public class IntervalPartition {
+
+    private final List<SstFileMeta> files;
+    private final Comparator<RowData> keyComparator;
+
+    public IntervalPartition(List<SstFileMeta> inputFiles, Comparator<RowData> keyComparator) {
+        this.files = new ArrayList<>(inputFiles);
+        this.files.sort(
+                (o1, o2) -> {
+                    int leftResult = keyComparator.compare(o1.minKey(), o2.minKey());
+                    return leftResult == 0
+                            ? keyComparator.compare(o1.maxKey(), o2.maxKey())
+                            : leftResult;
+                });
+        this.keyComparator = keyComparator;
+    }
+
+    /**
+     * Returns a two-dimensional list of {@link SortedRun}s.
+     *
+     * <p>The elements of the outer list are sections. Key intervals between sections do not
+     * overlap. This extra layer is to minimize the number of {@link SortedRun}s dealt at the same
+     * time.
+     *
+     * <p>The elements of the inner list are {@link SortedRun}s within a section.
+     *
+     * <p>Users are expected to use the results by this way:
+     *
+     * <pre>{@code
+     * for (List<SortedRun> section : algorithm.partition()) {
+     *     // do some merge sorting within section
+     * }
+     * }</pre>
+     */
+    public List<List<SortedRun>> partition() {
+        List<List<SortedRun>> result = new ArrayList<>();
+        List<SstFileMeta> section = new ArrayList<>();
+        BinaryRowData bound = null;
+
+        for (SstFileMeta meta : files) {
+            if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {
+                // larger than current right bound, conclude current section and create a new one
+                result.add(partition(section));
+                section.clear();
+                bound = null;
+            }
+            section.add(meta);
+            if (bound == null || keyComparator.compare(meta.maxKey(), bound) > 0) {
+                // update right bound
+                bound = meta.maxKey();
+            }
+        }
+        if (!section.isEmpty()) {
+            // conclude last section
+            result.add(partition(section));
+        }
+
+        return result;
+    }
+
+    private List<SortedRun> partition(List<SstFileMeta> metas) {
+        PriorityQueue<List<SstFileMeta>> queue =
+                new PriorityQueue<>(
+                        (o1, o2) ->
+                                // sort by max key of the last sst file
+                                keyComparator.compare(
+                                        o1.get(o1.size() - 1).maxKey(),
+                                        o2.get(o2.size() - 1).maxKey()));
+        // create the initial partition
+        List<SstFileMeta> firstRun = new ArrayList<>();
+        firstRun.add(metas.get(0));
+        queue.add(firstRun);
+
+        for (int i = 1; i < metas.size(); i++) {
+            SstFileMeta meta = metas.get(i);
+            // any file list whose max key < meta.minKey() is sufficient,
+            // for convenience we pick the smallest
+            List<SstFileMeta> top = queue.poll();
+            if (keyComparator.compare(meta.minKey(), top.get(top.size() - 1).maxKey()) > 0) {
+                // append current file to an existing partition
+                top.add(meta);
+            } else {
+                // create a new partition
+                List<SstFileMeta> newRun = new ArrayList<>();
+                newRun.add(meta);
+                queue.add(newRun);
+            }
+            queue.add(top);
+        }
+
+        // order between partitions does not matter
+        return queue.stream().map(SortedRun::fromSorted).collect(Collectors.toList());
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
new file mode 100644
index 0000000..f59c1c2
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link IntervalPartition}. */
+public class IntervalPartitionTest {
+
+    private static final RecordComparator COMPARATOR =
+            (RecordComparator) (o1, o2) -> o1.getInt(0) - o2.getInt(0);
+
+    @Test
+    public void testSameMinKey() {
+        runTest(
+                "[100, 200], [100, 400], [100, 300], [100, 500]",
+                "[100, 200] | [100, 300] | [100, 400] | [100, 500]");
+    }
+
+    @Test
+    public void testSameMaxKey() {
+        runTest(
+                "[100, 500], [300, 500], [200, 500], [400, 500]",
+                "[100, 500] | [200, 500] | [300, 500] | [400, 500]");
+    }
+
+    @Test
+    public void testSectionPartitioning() {
+        // 0    5    10   15   20   25   30
+        // |--------|
+        //      |-|
+        //          |-----|
+        //                 |-----|
+        //                 |-----------|
+        //                         |-------|
+        // 0    5    10   15   20   25   30
+        runTest(
+                "[0, 9], [5, 7], [9, 15], [16, 22], [16, 28], [24, 32]",
+                "[0, 9] | [5, 7], [9, 15]\n" + "[16, 22], [24, 32] | [16, 28]");
+    }
+
+    private void runTest(String in, String ans) {
+        IntervalPartition algorithm = new IntervalPartition(parseMetas(in), COMPARATOR);
+        List<List<SortedRun>> expected = new ArrayList<>();
+        for (String line : ans.split("\n")) {
+            expected.add(parseSortedRuns(line));
+        }
+
+        List<List<SortedRun>> actual = algorithm.partition();
+        for (List<SortedRun> section : actual) {
+            for (SortedRun sortedRun : section) {
+                sortedRun.validate(COMPARATOR);
+            }
+        }
+
+        // compare the results with multiset because the order between sorted runs within a section
+        // does not matter
+        assertThat(toMultiset(actual)).isEqualTo(toMultiset(expected));
+    }
+
+    @RepeatedTest(100)
+    public void randomTest() {
+        ThreadLocalRandom r = ThreadLocalRandom.current();
+        List<int[]> intervals = new ArrayList<>();
+        // construct some sorted runs
+        int numSortedRuns = r.nextInt(10) + 1;
+        for (int i = 0; i < numSortedRuns; i++) {
+            int numIntervals = r.nextInt(10) + 1;
+            // pick 2 * numIntervals distinct integers to make intervals
+            Set<Integer> set = new TreeSet<>();
+            while (set.size() < 2 * numIntervals) {
+                int x;
+                do {
+                    x = r.nextInt(1000);
+                } while (set.contains(x));
+                set.add(x);
+            }
+            List<Integer> ints = new ArrayList<>(set);
+            for (int j = 0; j < 2 * numIntervals; j += 2) {
+                intervals.add(new int[] {ints.get(j), ints.get(j + 1)});
+            }
+        }
+        // change the input to string
+        String input =
+                intervals.stream()
+                        .map(a -> String.format("[%d, %d]", a[0], a[1]))
+                        .collect(Collectors.joining(", "));
+        // maximum number of sorted runs after partitioning must not exceed numSortedRuns
+        IntervalPartition algorithm = new IntervalPartition(parseMetas(input), COMPARATOR);
+        List<List<SortedRun>> result = algorithm.partition();
+        for (List<SortedRun> section : result) {
+            assertTrue(section.size() <= numSortedRuns);
+            for (SortedRun sortedRun : section) {
+                sortedRun.validate(COMPARATOR);
+            }
+        }
+    }
+
+    private List<SortedRun> parseSortedRuns(String in) {
+        List<SortedRun> sortedRuns = new ArrayList<>();
+        for (String s : in.split("\\|")) {
+            sortedRuns.add(SortedRun.fromSorted(parseMetas(s)));
+        }
+        return sortedRuns;
+    }
+
+    private List<SstFileMeta> parseMetas(String in) {
+        List<SstFileMeta> metas = new ArrayList<>();
+        Pattern pattern = Pattern.compile("\\[(\\d+?), (\\d+?)]");
+        Matcher matcher = pattern.matcher(in);
+        while (matcher.find()) {
+            metas.add(
+                    makeInterval(
+                            Integer.parseInt(matcher.group(1)),
+                            Integer.parseInt(matcher.group(2))));
+        }
+        return metas;
+    }
+
+    private SstFileMeta makeInterval(int left, int right) {
+        BinaryRowData minKey = new BinaryRowData(1);
+        BinaryRowWriter minWriter = new BinaryRowWriter(minKey);
+        minWriter.writeInt(0, left);
+        minWriter.complete();
+        BinaryRowData maxKey = new BinaryRowData(1);
+        BinaryRowWriter maxWriter = new BinaryRowWriter(maxKey);
+        maxWriter.writeInt(0, right);
+        maxWriter.complete();
+
+        return new SstFileMeta(
+                "DUMMY",
+                100,
+                25,
+                minKey,
+                maxKey,
+                new FieldStats[] {new FieldStats(left, right, 0)},
+                0,
+                24,
+                0);
+    }
+
+    private List<Map<SortedRun, Integer>> toMultiset(List<List<SortedRun>> sections) {
+        List<Map<SortedRun, Integer>> result = new ArrayList<>();
+        for (List<SortedRun> section : sections) {
+            Map<SortedRun, Integer> multiset = new HashMap<>();
+            for (SortedRun sortedRun : section) {
+                multiset.compute(sortedRun, (k, v) -> v == null ? 1 : v + 1);
+            }
+            result.add(multiset);
+        }
+        return result;
+    }
+}