You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/19 11:01:44 UTC
[flink-table-store] branch master updated: [FLINK-28810] Introduce write.skip-compaction to skip compaction on write
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new fc73a0bc [FLINK-28810] Introduce write.skip-compaction to skip compaction on write
fc73a0bc is described below
commit fc73a0bc4427741b41cb4b15e75c2dba363b97fb
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Sep 19 19:01:38 2022 +0800
[FLINK-28810] Introduce write.skip-compaction to skip compaction on write
This closes #262
---
.../shortcodes/generated/core_configuration.html | 6 +++
.../org/apache/flink/table/store/CoreOptions.java | 14 ++++-
.../table/store/file/AppendOnlyFileStore.java | 6 +--
.../table/store/file/compact/CompactManager.java | 34 +++++++-----
.../store/file/compact/NoopCompactManager.java | 48 +++++++++++++++++
.../store/file/data/AppendOnlyCompactManager.java | 39 ++++++++++++--
.../table/store/file/data/AppendOnlyWriter.java | 51 +++++-------------
.../flink/table/store/file/data/DataFileMeta.java | 7 +++
.../store/file/mergetree/MergeTreeWriter.java | 32 ++++-------
.../mergetree/compact/MergeTreeCompactManager.java | 30 +++++++++--
.../file/operation/AbstractFileStoreWrite.java | 7 ---
.../file/operation/AppendOnlyFileStoreWrite.java | 54 +++++++++++--------
.../file/operation/KeyValueFileStoreWrite.java | 35 +++++++-----
.../store/file/data/AppendOnlyWriterTest.java | 63 ++++++++++++----------
.../store/file/format/FileFormatSuffixTest.java | 2 +-
.../table/store/file/mergetree/MergeTreeTest.java | 8 +--
.../compact/MergeTreeCompactManagerTest.java | 12 +++--
.../table/store/table/FileStoreTableTestBase.java | 40 ++++++++++++++
18 files changed, 324 insertions(+), 164 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index c01d1bc8..81aa677d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -230,5 +230,11 @@
<td><p>Enum</p></td>
<td>Specify the write mode for table.<br /><br />Possible values:<ul><li>"append-only": The table can only accept append-only insert operations. Neither data deduplication nor any primary key constraints will be done when inserting rows into table store.</li><li>"change-log": The table can accept insert/delete/update operations.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>write.compaction-skip</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to skip compaction on write.</td>
+ </tr>
</tbody>
</table>
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 31b6b1f2..72bce71a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -156,6 +156,12 @@ public class CoreOptions implements Serializable {
.defaultValue(WriteMode.CHANGE_LOG)
.withDescription("Specify the write mode for table.");
+ public static final ConfigOption<Boolean> WRITE_COMPACTION_SKIP =
+ ConfigOptions.key("write.compaction-skip")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to skip compaction on write.");
+
public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
ConfigOptions.key("source.split.target-size")
.memoryType()
@@ -458,11 +464,11 @@ public class CoreOptions implements Serializable {
return options.get(COMPACTION_SIZE_RATIO);
}
- public int minFileNum() {
+ public int compactionMinFileNum() {
return options.get(COMPACTION_MIN_FILE_NUM);
}
- public int maxFileNum() {
+ public int compactionMaxFileNum() {
return options.get(COMPACTION_MAX_FILE_NUM);
}
@@ -478,6 +484,10 @@ public class CoreOptions implements Serializable {
return options.getOptional(SEQUENCE_FIELD);
}
+ public boolean writeCompactionSkip() {
+ return options.get(WRITE_COMPACTION_SKIP);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index 25dee9fe..d8aa4703 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -63,14 +63,10 @@ public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
newRead(),
schemaId,
rowType,
- options.fileFormat(),
pathFactory(),
snapshotManager(),
newScan(true),
- options.targetFileSize(),
- options.minFileNum(),
- options.maxFileNum(),
- options.commitForceCompact());
+ options);
}
private AppendOnlyFileStoreScan newScan(boolean checkNumOfBuckets) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
index daf062b8..3bd3b5ad 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.store.file.compact;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,14 +42,28 @@ public abstract class CompactManager {
this.executor = executor;
}
- /** Submit a new compaction task. */
- public abstract void submitCompaction();
+ /** Should wait compaction finish. */
+ public abstract boolean shouldWaitCompaction();
+
+ /** Add a new file. */
+ public abstract void addNewFile(DataFileMeta file);
+
+ /** Trigger a new compaction task. */
+ public abstract void triggerCompaction();
- public boolean isCompactionFinished() {
- return taskFuture == null;
+ /** Get compaction result. Wait finish if {@code blocking} is true. */
+ public abstract Optional<CompactResult> getCompactionResult(boolean blocking)
+ throws ExecutionException, InterruptedException;
+
+ public void cancelCompaction() {
+ // TODO this method may leave behind orphan files if compaction is actually finished
+ // but some CPU work still needs to be done
+ if (taskFuture != null && !taskFuture.isCancelled()) {
+ taskFuture.cancel(true);
+ }
}
- public Optional<CompactResult> finishCompaction(boolean blocking)
+ protected final Optional<CompactResult> innerGetCompactionResult(boolean blocking)
throws ExecutionException, InterruptedException {
if (taskFuture != null) {
if (blocking || taskFuture.isDone()) {
@@ -65,12 +81,4 @@ public abstract class CompactManager {
}
return Optional.empty();
}
-
- public void cancelCompaction() {
- // TODO this method may leave behind orphan files if compaction is actually finished
- // but some CPU work still needs to be done
- if (taskFuture != null && !taskFuture.isCancelled()) {
- taskFuture.cancel(true);
- }
- }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
new file mode 100644
index 00000000..84a650fe
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.compact;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+/** A {@link CompactManager} which doesn't do things. */
+public class NoopCompactManager extends CompactManager {
+
+ public NoopCompactManager(ExecutorService executor) {
+ super(executor);
+ }
+
+ @Override
+ public boolean shouldWaitCompaction() {
+ return false;
+ }
+
+ @Override
+ public void addNewFile(DataFileMeta file) {}
+
+ @Override
+ public void triggerCompaction() {}
+
+ @Override
+ public Optional<CompactResult> getCompactionResult(boolean blocking) {
+ return Optional.empty();
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
index 1f27efe4..d10063bd 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
@@ -31,6 +31,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
/** Compact manager for {@link org.apache.flink.table.store.file.AppendOnlyFileStore}. */
@@ -58,10 +59,9 @@ public class AppendOnlyCompactManager extends CompactManager {
}
@Override
- public void submitCompaction() {
+ public void triggerCompaction() {
if (taskFuture != null) {
- throw new IllegalStateException(
- "Please finish the previous compaction before submitting new one.");
+ return;
}
pickCompactBefore()
.ifPresent(
@@ -70,6 +70,35 @@ public class AppendOnlyCompactManager extends CompactManager {
executor.submit(new AutoCompactTask(inputs, rewriter)));
}
+ @Override
+ public boolean shouldWaitCompaction() {
+ return false;
+ }
+
+ @Override
+ public void addNewFile(DataFileMeta file) {
+ toCompact.add(file);
+ }
+
+ /** Finish current task, and update result files to {@link #toCompact}. */
+ @Override
+ public Optional<CompactResult> getCompactionResult(boolean blocking)
+ throws ExecutionException, InterruptedException {
+ Optional<CompactResult> result = innerGetCompactionResult(blocking);
+ result.ifPresent(
+ r -> {
+ if (!r.after().isEmpty()) {
+ // if the last compacted file is still small,
+ // add it back to the head
+ DataFileMeta lastFile = r.after().get(r.after().size() - 1);
+ if (lastFile.fileSize() < targetFileSize) {
+ toCompact.offerFirst(lastFile);
+ }
+ }
+ });
+ return result;
+ }
+
@VisibleForTesting
Optional<List<DataFileMeta>> pickCompactBefore() {
return pick(toCompact, targetFileSize, minFileNum, maxFileNum);
@@ -80,6 +109,10 @@ public class AppendOnlyCompactManager extends CompactManager {
long targetFileSize,
int minFileNum,
int maxFileNum) {
+ if (toCompact.isEmpty()) {
+ return Optional.empty();
+ }
+
long totalFileSize = 0L;
int fileNum = 0;
LinkedList<DataFileMeta> candidates = new LinkedList<>();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
index 1790e4c0..1476a287 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
@@ -19,10 +19,10 @@
package org.apache.flink.table.store.file.data;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.compact.CompactManager;
import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.stats.BinaryTableStats;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
@@ -41,13 +41,13 @@ import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.apache.flink.table.store.file.data.AppendOnlyWriter.RowRollingWriter.createRollingRowWriter;
+import static org.apache.flink.table.store.file.data.DataFileMeta.getMaxSequenceNumber;
/**
* A {@link RecordWriter} implementation that only accepts records which are always insert
@@ -60,9 +60,8 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
private final long targetFileSize;
private final RowType writeSchema;
private final DataFilePathFactory pathFactory;
- private final AppendOnlyCompactManager compactManager;
+ private final CompactManager compactManager;
private final boolean forceCompact;
- private final LinkedList<DataFileMeta> toCompact;
private final List<DataFileMeta> compactBefore;
private final List<DataFileMeta> compactAfter;
private final LongCounter seqNumCounter;
@@ -74,8 +73,8 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
FileFormat fileFormat,
long targetFileSize,
RowType writeSchema,
- LinkedList<DataFileMeta> restoredFiles,
- AppendOnlyCompactManager compactManager,
+ long maxSequenceNumber,
+ CompactManager compactManager,
boolean forceCompact,
DataFilePathFactory pathFactory) {
this.schemaId = schemaId;
@@ -85,10 +84,9 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
this.pathFactory = pathFactory;
this.compactManager = compactManager;
this.forceCompact = forceCompact;
- this.toCompact = restoredFiles;
this.compactBefore = new ArrayList<>();
this.compactAfter = new ArrayList<>();
- this.seqNumCounter = new LongCounter(getMaxSequenceNumber(restoredFiles) + 1);
+ this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
this.writer =
createRollingRowWriter(
schemaId,
@@ -128,18 +126,18 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
seqNumCounter);
}
// add new generated files
- toCompact.addAll(newFiles);
+ newFiles.forEach(compactManager::addNewFile);
submitCompaction();
boolean blocking = endOnfInput || forceCompact;
- finishCompaction(blocking);
+ trySyncLatestCompaction(blocking);
return drainIncrement(newFiles);
}
@Override
public void sync() throws Exception {
- finishCompaction(true);
+ trySyncLatestCompaction(true);
}
@Override
@@ -160,37 +158,19 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
return result;
}
- private static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
- return fileMetas.stream()
- .map(DataFileMeta::maxSequenceNumber)
- .max(Long::compare)
- .orElse(-1L);
- }
-
private void submitCompaction() throws ExecutionException, InterruptedException {
- finishCompaction(false);
- if (compactManager.isCompactionFinished() && !toCompact.isEmpty()) {
- compactManager.submitCompaction();
- }
+ trySyncLatestCompaction(false);
+ compactManager.triggerCompaction();
}
- private void finishCompaction(boolean blocking)
+ private void trySyncLatestCompaction(boolean blocking)
throws ExecutionException, InterruptedException {
compactManager
- .finishCompaction(blocking)
+ .getCompactionResult(blocking)
.ifPresent(
result -> {
compactBefore.addAll(result.before());
compactAfter.addAll(result.after());
- if (!result.after().isEmpty()) {
- // if the last compacted file is still small,
- // add it back to the head
- DataFileMeta lastFile =
- result.after().get(result.after().size() - 1);
- if (lastFile.fileSize() < targetFileSize) {
- toCompact.offerFirst(lastFile);
- }
- }
});
}
@@ -203,11 +183,6 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
return increment;
}
- @VisibleForTesting
- List<DataFileMeta> getToCompact() {
- return toCompact;
- }
-
/** Rolling file writer for append-only table. */
public static class RowRollingWriter extends RollingFileWriter<RowData, DataFileMeta> {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
index b8d70ec0..512d285e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
@@ -294,4 +294,11 @@ public class DataFileMeta {
new RowType.RowField("_EXTRA_FILES", new ArrayType(false, newStringType(false))));
return new RowType(fields);
}
+
+ public static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
+ return fileMetas.stream()
+ .map(DataFileMeta::maxSequenceNumber)
+ .max(Long::compare)
+ .orElse(-1L);
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 6c1161a3..b48942f0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -52,8 +52,6 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
private final CompactManager compactManager;
- private final Levels levels;
-
private final Comparator<RowData> keyComparator;
private final MergeFunction mergeFunction;
@@ -62,8 +60,6 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
private final boolean commitForceCompact;
- private final int numSortedRunStopTrigger;
-
private final ChangelogProducer changelogProducer;
private final LinkedHashSet<DataFileMeta> newFiles;
@@ -80,24 +76,20 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
RowType keyType,
RowType valueType,
CompactManager compactManager,
- Levels levels,
long maxSequenceNumber,
Comparator<RowData> keyComparator,
MergeFunction mergeFunction,
DataFileWriter dataFileWriter,
boolean commitForceCompact,
- int numSortedRunStopTrigger,
ChangelogProducer changelogProducer) {
this.keyType = keyType;
this.valueType = valueType;
this.compactManager = compactManager;
- this.levels = levels;
this.newSequenceNumber = maxSequenceNumber + 1;
this.keyComparator = keyComparator;
this.mergeFunction = mergeFunction;
this.dataFileWriter = dataFileWriter;
this.commitForceCompact = commitForceCompact;
- this.numSortedRunStopTrigger = numSortedRunStopTrigger;
this.changelogProducer = changelogProducer;
this.newFiles = new LinkedHashSet<>();
this.compactBefore = new LinkedHashMap<>();
@@ -109,8 +101,8 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
}
@VisibleForTesting
- Levels levels() {
- return levels;
+ CompactManager compactManager() {
+ return compactManager;
}
@Override
@@ -142,9 +134,9 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
@Override
public void flushMemory() throws Exception {
if (memTable.size() > 0) {
- if (levels.numberOfSortedRuns() > numSortedRunStopTrigger) {
+ if (compactManager.shouldWaitCompaction()) {
// stop writing, wait for compaction finished
- finishCompaction(true);
+ trySyncLatestCompaction(true);
}
List<String> extraFiles = new ArrayList<>();
if (changelogProducer == ChangelogProducer.INPUT) {
@@ -165,7 +157,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
file -> {
DataFileMeta fileMeta = file.copy(extraFiles);
newFiles.add(fileMeta);
- levels.addLevel0File(fileMeta);
+ compactManager.addNewFile(fileMeta);
return true;
})
.orElse(false);
@@ -183,13 +175,13 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
public Increment prepareCommit(boolean endOfInput) throws Exception {
flushMemory();
boolean blocking = endOfInput || commitForceCompact;
- finishCompaction(blocking);
+ trySyncLatestCompaction(blocking);
return drainIncrement();
}
@Override
public void sync() throws Exception {
- finishCompaction(true);
+ trySyncLatestCompaction(true);
}
private Increment drainIncrement() {
@@ -226,14 +218,12 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
}
private void submitCompaction() throws Exception {
- finishCompaction(false);
- if (compactManager.isCompactionFinished()) {
- compactManager.submitCompaction();
- }
+ trySyncLatestCompaction(false);
+ compactManager.triggerCompaction();
}
- private void finishCompaction(boolean blocking) throws Exception {
- Optional<CompactResult> result = compactManager.finishCompaction(blocking);
+ private void trySyncLatestCompaction(boolean blocking) throws Exception {
+ Optional<CompactResult> result = compactManager.getCompactionResult(blocking);
result.ifPresent(this::updateCompactResult);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
index 17c160b2..64ddc5a6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
@@ -18,10 +18,12 @@
package org.apache.flink.table.store.file.mergetree.compact;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.compact.CompactManager;
import org.apache.flink.table.store.file.compact.CompactResult;
import org.apache.flink.table.store.file.compact.CompactUnit;
+import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.mergetree.Levels;
import org.slf4j.Logger;
@@ -46,6 +48,8 @@ public class MergeTreeCompactManager extends CompactManager {
private final long minFileSize;
+ private final int numSortedRunStopTrigger;
+
private final CompactRewriter rewriter;
public MergeTreeCompactManager(
@@ -54,20 +58,31 @@ public class MergeTreeCompactManager extends CompactManager {
CompactStrategy strategy,
Comparator<RowData> keyComparator,
long minFileSize,
+ int numSortedRunStopTrigger,
CompactRewriter rewriter) {
super(executor);
this.levels = levels;
this.strategy = strategy;
this.minFileSize = minFileSize;
+ this.numSortedRunStopTrigger = numSortedRunStopTrigger;
this.keyComparator = keyComparator;
this.rewriter = rewriter;
}
@Override
- public void submitCompaction() {
+ public boolean shouldWaitCompaction() {
+ return levels.numberOfSortedRuns() > numSortedRunStopTrigger;
+ }
+
+ @Override
+ public void addNewFile(DataFileMeta file) {
+ levels.addLevel0File(file);
+ }
+
+ @Override
+ public void triggerCompaction() {
if (taskFuture != null) {
- throw new IllegalStateException(
- "Please finish the previous compaction before submitting new one.");
+ return;
}
strategy.pick(levels.numberOfLevels(), levels.levelSortedRuns())
.ifPresent(
@@ -104,6 +119,11 @@ public class MergeTreeCompactManager extends CompactManager {
});
}
+ @VisibleForTesting
+ public Levels levels() {
+ return levels;
+ }
+
private void submitCompaction(CompactUnit unit, boolean dropDelete) {
MergeTreeCompactTask task =
new MergeTreeCompactTask(keyComparator, minFileSize, rewriter, unit, dropDelete);
@@ -123,9 +143,9 @@ public class MergeTreeCompactManager extends CompactManager {
/** Finish current task, and update result files to {@link Levels}. */
@Override
- public Optional<CompactResult> finishCompaction(boolean blocking)
+ public Optional<CompactResult> getCompactionResult(boolean blocking)
throws ExecutionException, InterruptedException {
- Optional<CompactResult> result = super.finishCompaction(blocking);
+ Optional<CompactResult> result = innerGetCompactionResult(blocking);
result.ifPresent(r -> levels.update(r.before(), r.after()));
return result;
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
index c3e09745..b5fa2fe3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
@@ -56,11 +56,4 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
}
return existingFileMetas;
}
-
- protected long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
- return fileMetas.stream()
- .map(DataFileMeta::maxSequenceNumber)
- .max(Long::compare)
- .orElse(-1L);
- }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index b2bed275..8911df14 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -21,7 +21,10 @@ package org.apache.flink.table.store.file.operation;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.compact.CompactManager;
import org.apache.flink.table.store.file.compact.CompactResult;
+import org.apache.flink.table.store.file.compact.NoopCompactManager;
import org.apache.flink.table.store.file.data.AppendOnlyCompactManager;
import org.apache.flink.table.store.file.data.AppendOnlyWriter;
import org.apache.flink.table.store.file.data.DataFileMeta;
@@ -42,6 +45,8 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import static org.apache.flink.table.store.file.data.DataFileMeta.getMaxSequenceNumber;
+
/** {@link FileStoreWrite} for {@link org.apache.flink.table.store.file.AppendOnlyFileStore}. */
public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
@@ -51,32 +56,30 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
private final FileFormat fileFormat;
private final FileStorePathFactory pathFactory;
private final long targetFileSize;
- private final int minFileNum;
- private final int maxFileNum;
+ private final int compactionMinFileNum;
+ private final int compactionMaxFileNum;
private final boolean commitForceCompact;
+ private final boolean skipCompaction;
public AppendOnlyFileStoreWrite(
AppendOnlyFileStoreRead read,
long schemaId,
RowType rowType,
- FileFormat fileFormat,
FileStorePathFactory pathFactory,
SnapshotManager snapshotManager,
FileStoreScan scan,
- long targetFileSize,
- int minFileNum,
- int maxFileNum,
- boolean commitForceCompact) {
+ CoreOptions options) {
super(snapshotManager, scan);
this.read = read;
this.schemaId = schemaId;
this.rowType = rowType;
- this.fileFormat = fileFormat;
+ this.fileFormat = options.fileFormat();
this.pathFactory = pathFactory;
- this.targetFileSize = targetFileSize;
- this.maxFileNum = maxFileNum;
- this.minFileNum = minFileNum;
- this.commitForceCompact = commitForceCompact;
+ this.targetFileSize = options.targetFileSize();
+ this.compactionMinFileNum = options.compactionMinFileNum();
+ this.compactionMaxFileNum = options.compactionMaxFileNum();
+ this.commitForceCompact = options.commitForceCompact();
+ this.skipCompaction = options.writeCompactionSkip();
}
@Override
@@ -101,8 +104,8 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
return new AppendOnlyCompactManager.IterativeCompactTask(
compactFiles,
targetFileSize,
- minFileNum,
- maxFileNum,
+ compactionMinFileNum,
+ compactionMaxFileNum,
compactRewriter(partition, bucket),
pathFactory.createDataFilePathFactory(partition, bucket));
}
@@ -116,19 +119,26 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
// and make restore files mutable to update
LinkedList<DataFileMeta> restored = new LinkedList<>(restoredFiles);
DataFilePathFactory factory = pathFactory.createDataFilePathFactory(partition, bucket);
+ CompactManager compactManager;
+ if (skipCompaction) {
+ compactManager = new NoopCompactManager(compactExecutor);
+ } else {
+ compactManager =
+ new AppendOnlyCompactManager(
+ compactExecutor,
+ restored,
+ compactionMinFileNum,
+ compactionMaxFileNum,
+ targetFileSize,
+ compactRewriter(partition, bucket));
+ }
return new AppendOnlyWriter(
schemaId,
fileFormat,
targetFileSize,
rowType,
- restored,
- new AppendOnlyCompactManager(
- compactExecutor,
- restored,
- minFileNum,
- maxFileNum,
- targetFileSize,
- compactRewriter(partition, bucket)),
+ getMaxSequenceNumber(restored),
+ compactManager,
commitForceCompact,
factory);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 9175bffa..9e430b01 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.compact.CompactManager;
import org.apache.flink.table.store.file.compact.CompactResult;
import org.apache.flink.table.store.file.compact.CompactUnit;
+import org.apache.flink.table.store.file.compact.NoopCompactManager;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileReader;
import org.apache.flink.table.store.file.data.DataFileWriter;
@@ -53,6 +54,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
+import static org.apache.flink.table.store.file.data.DataFileMeta.getMaxSequenceNumber;
+
/** {@link FileStoreWrite} for {@link org.apache.flink.table.store.file.KeyValueFileStore}. */
public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
@@ -130,27 +133,32 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
ExecutorService compactExecutor) {
DataFileWriter dataFileWriter = dataFileWriterFactory.create(partition, bucket);
Comparator<RowData> keyComparator = keyComparatorSupplier.get();
- Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
+ CompactManager compactManager;
+ if (options.writeCompactionSkip()) {
+ compactManager = new NoopCompactManager(compactExecutor);
+ } else {
+ Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
+ compactManager =
+ createCompactManager(
+ partition,
+ bucket,
+ new UniversalCompaction(
+ options.maxSizeAmplificationPercent(),
+ options.sortedRunSizeRatio(),
+ options.numSortedRunCompactionTrigger(),
+ options.maxSortedRunNum()),
+ compactExecutor,
+ levels);
+ }
return new MergeTreeWriter(
dataFileWriter.keyType(),
dataFileWriter.valueType(),
- createCompactManager(
- partition,
- bucket,
- new UniversalCompaction(
- options.maxSizeAmplificationPercent(),
- options.sortedRunSizeRatio(),
- options.numSortedRunCompactionTrigger(),
- options.maxSortedRunNum()),
- compactExecutor,
- levels),
- levels,
+ compactManager,
getMaxSequenceNumber(restoreFiles),
keyComparator,
mergeFunction.copy(),
dataFileWriter,
options.commitForceCompact(),
- options.numSortedRunStopTrigger(),
options.changelogProducer());
}
@@ -168,6 +176,7 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
compactStrategy,
keyComparator,
options.targetFileSize(),
+ options.numSortedRunStopTrigger(),
rewriter);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
index a16411a4..c450116c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.store.file.data;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
@@ -48,6 +49,7 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
+import static org.apache.flink.table.store.file.data.DataFileMeta.getMaxSequenceNumber;
import static org.assertj.core.api.Assertions.assertThat;
/** Test the correctness for {@link AppendOnlyWriter}. */
@@ -121,7 +123,7 @@ public class AppendOnlyWriterTest {
@Test
public void testMultipleCommits() throws Exception {
- RecordWriter<RowData> writer = createWriter(1024 * 1024L, true, Collections.emptyList());
+ RecordWriter<RowData> writer = createWriter(1024 * 1024L, true, Collections.emptyList()).f0;
// Commit 5 continues txn.
for (int txn = 0; txn < 5; txn += 1) {
@@ -224,8 +226,11 @@ public class AppendOnlyWriterTest {
// increase target file size to test compaction
long targetFileSize = 1024 * 1024L;
- writer = createWriter(targetFileSize, true, firstInc.newFiles());
- assertThat(writer.getToCompact()).containsExactlyElementsOf(firstInc.newFiles());
+ Tuple2<AppendOnlyWriter, LinkedList<DataFileMeta>> writerAndToCompact =
+ createWriter(targetFileSize, true, firstInc.newFiles());
+ writer = writerAndToCompact.f0;
+ LinkedList<DataFileMeta> toCompact = writerAndToCompact.f1;
+ assertThat(toCompact).containsExactlyElementsOf(firstInc.newFiles());
writer.write(row(id, String.format("%03d", id), PART));
writer.sync();
Increment secInc = writer.prepareCommit(true);
@@ -253,10 +258,10 @@ public class AppendOnlyWriterTest {
* <3> the newFiles[round]
* with strict order
*/
- List<DataFileMeta> toCompact = new ArrayList<>(compactAfter);
- toCompact.addAll(firstInc.newFiles().subList(4, firstInc.newFiles().size()));
- toCompact.addAll(secInc.newFiles());
- assertThat(writer.getToCompact()).containsExactlyElementsOf(toCompact);
+ List<DataFileMeta> toCompactResult = new ArrayList<>(compactAfter);
+ toCompactResult.addAll(firstInc.newFiles().subList(4, firstInc.newFiles().size()));
+ toCompactResult.addAll(secInc.newFiles());
+ assertThat(toCompact).containsExactlyElementsOf(toCompactResult);
}
private FieldStats initStats(Integer min, Integer max, long nullCount) {
@@ -280,33 +285,35 @@ public class AppendOnlyWriterTest {
}
private AppendOnlyWriter createEmptyWriter(long targetFileSize) {
- return createWriter(targetFileSize, false, Collections.emptyList());
+ return createWriter(targetFileSize, false, Collections.emptyList()).f0;
}
- private AppendOnlyWriter createWriter(
+ private Tuple2<AppendOnlyWriter, LinkedList<DataFileMeta>> createWriter(
long targetFileSize, boolean forceCompact, List<DataFileMeta> scannedFiles) {
FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Configuration());
LinkedList<DataFileMeta> toCompact = new LinkedList<>(scannedFiles);
- return new AppendOnlyWriter(
- SCHEMA_ID,
- fileFormat,
- targetFileSize,
- AppendOnlyWriterTest.SCHEMA,
- toCompact,
- new AppendOnlyCompactManager(
- Executors.newSingleThreadScheduledExecutor(
- new ExecutorThreadFactory("compaction-thread")),
- toCompact,
- MIN_FILE_NUM,
- MAX_FILE_NUM,
+ return new Tuple2<>(
+ new AppendOnlyWriter(
+ SCHEMA_ID,
+ fileFormat,
targetFileSize,
- compactBefore ->
- compactBefore.isEmpty()
- ? Collections.emptyList()
- : Collections.singletonList(
- generateCompactAfter(compactBefore))),
- forceCompact,
- pathFactory);
+ AppendOnlyWriterTest.SCHEMA,
+ getMaxSequenceNumber(toCompact),
+ new AppendOnlyCompactManager(
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory("compaction-thread")),
+ toCompact,
+ MIN_FILE_NUM,
+ MAX_FILE_NUM,
+ targetFileSize,
+ compactBefore ->
+ compactBefore.isEmpty()
+ ? Collections.emptyList()
+ : Collections.singletonList(
+ generateCompactAfter(compactBefore))),
+ forceCompact,
+ pathFactory),
+ toCompact);
}
private DataFileMeta generateCompactAfter(List<DataFileMeta> toCompact) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
index 4be7c693..39ed935e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
@@ -66,7 +66,7 @@ public class FileFormatSuffixTest extends DataFileTest {
fileFormat,
10,
SCHEMA,
- toCompact,
+ 0,
new AppendOnlyCompactManager(null, toCompact, 4, 10, 10, null), // not used
false,
dataFilePathFactory);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 446cfc45..4a82d35f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -272,13 +272,11 @@ public class MergeTreeTest {
dataFileWriter.keyType(),
dataFileWriter.valueType(),
createCompactManager(dataFileWriter, service, files),
- new Levels(comparator, files, options.numLevels()),
maxSequenceNumber,
comparator,
new DeduplicateMergeFunction(),
dataFileWriter,
options.commitForceCompact(),
- options.numSortedRunStopTrigger(),
ChangelogProducer.NONE);
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
@@ -312,6 +310,7 @@ public class MergeTreeTest {
strategy,
comparator,
options.targetFileSize(),
+ options.numSortedRunStopTrigger(),
rewriter);
}
@@ -346,7 +345,10 @@ public class MergeTreeTest {
private void assertRecords(List<TestRecord> expected) throws Exception {
// compaction will drop delete
- List<DataFileMeta> files = ((MergeTreeWriter) writer).levels().allFiles();
+ List<DataFileMeta> files =
+ ((MergeTreeCompactManager) ((MergeTreeWriter) writer).compactManager())
+ .levels()
+ .allFiles();
assertRecords(expected, files, true);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
index 64c46c81..a645a74e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -197,9 +197,15 @@ public class MergeTreeCompactManagerTest {
Levels levels = new Levels(comparator, files, 3);
MergeTreeCompactManager manager =
new MergeTreeCompactManager(
- service, levels, strategy, comparator, 2, testRewriter(expectedDropDelete));
- manager.submitCompaction();
- manager.finishCompaction(true);
+ service,
+ levels,
+ strategy,
+ comparator,
+ 2,
+ Integer.MAX_VALUE,
+ testRewriter(expectedDropDelete));
+ manager.triggerCompaction();
+ manager.getCompactionResult(true);
List<LevelMinMax> outputs =
levels.allFiles().stream().map(LevelMinMax::new).collect(Collectors.toList());
assertThat(outputs).isEqualTo(expected);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 74b824db..3c80b563 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -25,11 +25,14 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.ReaderSupplier;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.TableCommit;
@@ -52,9 +55,12 @@ import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.stream.Collectors;
import static org.apache.flink.table.store.CoreOptions.BUCKET;
import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
+import static org.apache.flink.table.store.CoreOptions.COMPACTION_MAX_FILE_NUM;
+import static org.apache.flink.table.store.CoreOptions.WRITE_COMPACTION_SKIP;
import static org.assertj.core.api.Assertions.assertThat;
/** Base test class for {@link FileStoreTable}. */
@@ -211,6 +217,40 @@ public abstract class FileStoreTableTestBase {
write.close();
}
+ @Test
+ public void testWriteWithoutCompaction() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(WRITE_COMPACTION_SKIP, true);
+ conf.set(COMPACTION_MAX_FILE_NUM, 5);
+ });
+
+ TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
+ for (int i = 0; i < 10; i++) {
+ write.write(GenericRowData.of(1, 1, 100L));
+ commit.commit(String.valueOf(i), write.prepareCommit(true));
+ }
+ write.close();
+
+ List<DataFileMeta> files =
+ table.newScan().plan().splits.stream()
+ .flatMap(split -> split.files().stream())
+ .collect(Collectors.toList());
+ for (DataFileMeta file : files) {
+ assertThat(file.level()).isEqualTo(0);
+ }
+
+ SnapshotManager snapshotManager = new SnapshotManager(table.location());
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ assertThat(latestSnapshotId).isNotNull();
+ for (int i = 1; i <= latestSnapshotId; i++) {
+ Snapshot snapshot = snapshotManager.snapshot(i);
+ assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ }
+ }
+
protected List<String> getResult(
TableRead read,
List<Split> splits,