You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/13 09:30:18 UTC
[flink-table-store] 01/03: [FLINK-25629] Introduce IntervalPartition
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 7a9bca33d7189d157c39401139a988181348a909
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Jan 13 17:00:48 2022 +0800
[FLINK-25629] Introduce IntervalPartition
Co-authored-by: tsreaper <ts...@gmail.com>
---
.../table/store/file/mergetree/SortedRun.java | 112 ++++++++++++
.../file/mergetree/compact/IntervalPartition.java | 126 ++++++++++++++
.../mergetree/compact/IntervalPartitionTest.java | 191 +++++++++++++++++++++
3 files changed, 429 insertions(+)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortedRun.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortedRun.java
new file mode 100644
index 0000000..824bc3d
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortedRun.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link SortedRun} is a list of files sorted by their keys. The key intervals [minKey, maxKey]
+ * of these files do not overlap.
+ */
+public class SortedRun {
+
+ private final List<SstFileMeta> files;
+
+ private final long totalSize;
+
+ private SortedRun(List<SstFileMeta> files) {
+ this.files = Collections.unmodifiableList(files);
+ long totalSize = 0L;
+ for (SstFileMeta file : files) {
+ totalSize += file.fileSize();
+ }
+ this.totalSize = totalSize;
+ }
+
+ public static SortedRun empty() {
+ return new SortedRun(Collections.emptyList());
+ }
+
+ public static SortedRun fromSingle(SstFileMeta file) {
+ return new SortedRun(Collections.singletonList(file));
+ }
+
+ public static SortedRun fromSorted(List<SstFileMeta> sortedFiles) {
+ return new SortedRun(sortedFiles);
+ }
+
+ public static SortedRun fromUnsorted(
+ List<SstFileMeta> unsortedFiles, Comparator<RowData> keyComparator) {
+ unsortedFiles.sort((o1, o2) -> keyComparator.compare(o1.minKey(), o2.minKey()));
+ SortedRun run = new SortedRun(unsortedFiles);
+ run.validate(keyComparator);
+ return run;
+ }
+
+ public List<SstFileMeta> files() {
+ return files;
+ }
+
+ public boolean nonEmpty() {
+ return !files.isEmpty();
+ }
+
+ public long totalSize() {
+ return totalSize;
+ }
+
+ @VisibleForTesting
+ public void validate(Comparator<RowData> comparator) {
+ for (int i = 1; i < files.size(); i++) {
+ Preconditions.checkState(
+ comparator.compare(files.get(i).minKey(), files.get(i - 1).maxKey()) > 0,
+ "SortedRun is not sorted and may contain overlapping key intervals. This is a bug.");
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof SortedRun)) {
+ return false;
+ }
+ SortedRun that = (SortedRun) o;
+ return files.equals(that.files);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(files);
+ }
+
+ @Override
+ public String toString() {
+ return "["
+ + files.stream().map(SstFileMeta::toString).collect(Collectors.joining(", "))
+ + "]";
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartition.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartition.java
new file mode 100644
index 0000000..735246f
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartition.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+
+/** Algorithm to partition several sst files into the minimum number of {@link SortedRun}s. */
+public class IntervalPartition {
+
+ private final List<SstFileMeta> files;
+ private final Comparator<RowData> keyComparator;
+
+ public IntervalPartition(List<SstFileMeta> inputFiles, Comparator<RowData> keyComparator) {
+ this.files = new ArrayList<>(inputFiles);
+ this.files.sort(
+ (o1, o2) -> {
+ int leftResult = keyComparator.compare(o1.minKey(), o2.minKey());
+ return leftResult == 0
+ ? keyComparator.compare(o1.maxKey(), o2.maxKey())
+ : leftResult;
+ });
+ this.keyComparator = keyComparator;
+ }
+
+ /**
+ * Returns a two-dimensional list of {@link SortedRun}s.
+ *
+ * <p>The elements of the outer list are sections. Key intervals between sections do not
+ * overlap. This extra layer is to minimize the number of {@link SortedRun}s dealt at the same
+ * time.
+ *
+ * <p>The elements of the inner list are {@link SortedRun}s within a section.
+ *
+ * <p>Users are expected to use the results by this way:
+ *
+ * <pre>{@code
+ * for (List<SortedRun> section : algorithm.partition()) {
+ * // do some merge sorting within section
+ * }
+ * }</pre>
+ */
+ public List<List<SortedRun>> partition() {
+ List<List<SortedRun>> result = new ArrayList<>();
+ List<SstFileMeta> section = new ArrayList<>();
+ BinaryRowData bound = null;
+
+ for (SstFileMeta meta : files) {
+ if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {
+ // larger than current right bound, conclude current section and create a new one
+ result.add(partition(section));
+ section.clear();
+ bound = null;
+ }
+ section.add(meta);
+ if (bound == null || keyComparator.compare(meta.maxKey(), bound) > 0) {
+ // update right bound
+ bound = meta.maxKey();
+ }
+ }
+ if (!section.isEmpty()) {
+ // conclude last section
+ result.add(partition(section));
+ }
+
+ return result;
+ }
+
+ private List<SortedRun> partition(List<SstFileMeta> metas) {
+ PriorityQueue<List<SstFileMeta>> queue =
+ new PriorityQueue<>(
+ (o1, o2) ->
+ // sort by max key of the last sst file
+ keyComparator.compare(
+ o1.get(o1.size() - 1).maxKey(),
+ o2.get(o2.size() - 1).maxKey()));
+ // create the initial partition
+ List<SstFileMeta> firstRun = new ArrayList<>();
+ firstRun.add(metas.get(0));
+ queue.add(firstRun);
+
+ for (int i = 1; i < metas.size(); i++) {
+ SstFileMeta meta = metas.get(i);
+ // any file list whose max key < meta.minKey() is sufficient,
+ // for convenience we pick the smallest
+ List<SstFileMeta> top = queue.poll();
+ if (keyComparator.compare(meta.minKey(), top.get(top.size() - 1).maxKey()) > 0) {
+ // append current file to an existing partition
+ top.add(meta);
+ } else {
+ // create a new partition
+ List<SstFileMeta> newRun = new ArrayList<>();
+ newRun.add(meta);
+ queue.add(newRun);
+ }
+ queue.add(top);
+ }
+
+ // order between partitions does not matter
+ return queue.stream().map(SortedRun::fromSorted).collect(Collectors.toList());
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
new file mode 100644
index 0000000..f59c1c2
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.stats.FieldStats;
+
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link IntervalPartition}. */
+public class IntervalPartitionTest {
+
+ private static final RecordComparator COMPARATOR =
+ (RecordComparator) (o1, o2) -> o1.getInt(0) - o2.getInt(0);
+
+ @Test
+ public void testSameMinKey() {
+ runTest(
+ "[100, 200], [100, 400], [100, 300], [100, 500]",
+ "[100, 200] | [100, 300] | [100, 400] | [100, 500]");
+ }
+
+ @Test
+ public void testSameMaxKey() {
+ runTest(
+ "[100, 500], [300, 500], [200, 500], [400, 500]",
+ "[100, 500] | [200, 500] | [300, 500] | [400, 500]");
+ }
+
+ @Test
+ public void testSectionPartitioning() {
+ // 0 5 10 15 20 25 30
+ // |--------|
+ // |-|
+ // |-----|
+ // |-----|
+ // |-----------|
+ // |-------|
+ // 0 5 10 15 20 25 30
+ runTest(
+ "[0, 9], [5, 7], [9, 15], [16, 22], [16, 28], [24, 32]",
+ "[0, 9] | [5, 7], [9, 15]\n" + "[16, 22], [24, 32] | [16, 28]");
+ }
+
+ private void runTest(String in, String ans) {
+ IntervalPartition algorithm = new IntervalPartition(parseMetas(in), COMPARATOR);
+ List<List<SortedRun>> expected = new ArrayList<>();
+ for (String line : ans.split("\n")) {
+ expected.add(parseSortedRuns(line));
+ }
+
+ List<List<SortedRun>> actual = algorithm.partition();
+ for (List<SortedRun> section : actual) {
+ for (SortedRun sortedRun : section) {
+ sortedRun.validate(COMPARATOR);
+ }
+ }
+
+ // compare the results with multiset because the order between sorted runs within a section
+ // does not matter
+ assertThat(toMultiset(actual)).isEqualTo(toMultiset(expected));
+ }
+
+ @RepeatedTest(100)
+ public void randomTest() {
+ ThreadLocalRandom r = ThreadLocalRandom.current();
+ List<int[]> intervals = new ArrayList<>();
+ // construct some sorted runs
+ int numSortedRuns = r.nextInt(10) + 1;
+ for (int i = 0; i < numSortedRuns; i++) {
+ int numIntervals = r.nextInt(10) + 1;
+ // pick 2 * numIntervals distinct integers to make intervals
+ Set<Integer> set = new TreeSet<>();
+ while (set.size() < 2 * numIntervals) {
+ int x;
+ do {
+ x = r.nextInt(1000);
+ } while (set.contains(x));
+ set.add(x);
+ }
+ List<Integer> ints = new ArrayList<>(set);
+ for (int j = 0; j < 2 * numIntervals; j += 2) {
+ intervals.add(new int[] {ints.get(j), ints.get(j + 1)});
+ }
+ }
+ // change the input to string
+ String input =
+ intervals.stream()
+ .map(a -> String.format("[%d, %d]", a[0], a[1]))
+ .collect(Collectors.joining(", "));
+ // maximum number of sorted runs after partitioning must not exceed numSortedRuns
+ IntervalPartition algorithm = new IntervalPartition(parseMetas(input), COMPARATOR);
+ List<List<SortedRun>> result = algorithm.partition();
+ for (List<SortedRun> section : result) {
+ assertTrue(section.size() <= numSortedRuns);
+ for (SortedRun sortedRun : section) {
+ sortedRun.validate(COMPARATOR);
+ }
+ }
+ }
+
+ private List<SortedRun> parseSortedRuns(String in) {
+ List<SortedRun> sortedRuns = new ArrayList<>();
+ for (String s : in.split("\\|")) {
+ sortedRuns.add(SortedRun.fromSorted(parseMetas(s)));
+ }
+ return sortedRuns;
+ }
+
+ private List<SstFileMeta> parseMetas(String in) {
+ List<SstFileMeta> metas = new ArrayList<>();
+ Pattern pattern = Pattern.compile("\\[(\\d+?), (\\d+?)]");
+ Matcher matcher = pattern.matcher(in);
+ while (matcher.find()) {
+ metas.add(
+ makeInterval(
+ Integer.parseInt(matcher.group(1)),
+ Integer.parseInt(matcher.group(2))));
+ }
+ return metas;
+ }
+
+ private SstFileMeta makeInterval(int left, int right) {
+ BinaryRowData minKey = new BinaryRowData(1);
+ BinaryRowWriter minWriter = new BinaryRowWriter(minKey);
+ minWriter.writeInt(0, left);
+ minWriter.complete();
+ BinaryRowData maxKey = new BinaryRowData(1);
+ BinaryRowWriter maxWriter = new BinaryRowWriter(maxKey);
+ maxWriter.writeInt(0, right);
+ maxWriter.complete();
+
+ return new SstFileMeta(
+ "DUMMY",
+ 100,
+ 25,
+ minKey,
+ maxKey,
+ new FieldStats[] {new FieldStats(left, right, 0)},
+ 0,
+ 24,
+ 0);
+ }
+
+ private List<Map<SortedRun, Integer>> toMultiset(List<List<SortedRun>> sections) {
+ List<Map<SortedRun, Integer>> result = new ArrayList<>();
+ for (List<SortedRun> section : sections) {
+ Map<SortedRun, Integer> multiset = new HashMap<>();
+ for (SortedRun sortedRun : section) {
+ multiset.compute(sortedRun, (k, v) -> v == null ? 1 : v + 1);
+ }
+ result.add(multiset);
+ }
+ return result;
+ }
+}