You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/28 06:49:57 UTC
[flink-table-store] branch master updated: [FLINK-30205] Modify compact interface for TableWrite and FileStoreWrite to support normal compaction in Table Store
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new e6e62699 [FLINK-30205] Modify compact interface for TableWrite and FileStoreWrite to support normal compaction in Table Store
e6e62699 is described below
commit e6e62699ef42a90d9eb2ca73b10460eb2764a586
Author: tsreaper <ts...@gmail.com>
AuthorDate: Mon Nov 28 14:49:51 2022 +0800
[FLINK-30205] Modify compact interface for TableWrite and FileStoreWrite to support normal compaction in Table Store
This closes #403
---
.../sink/FullChangelogStoreWriteOperator.java | 2 +-
.../store/connector/sink/StoreCompactOperator.java | 2 +-
.../table/store/file/append/AppendOnlyWriter.java | 18 ++++++----------
.../store/file/mergetree/MergeTreeWriter.java | 25 ++++++++++------------
.../file/operation/AbstractFileStoreWrite.java | 5 +++--
.../table/store/file/operation/FileStoreWrite.java | 3 ++-
.../flink/table/store/file/utils/RecordWriter.java | 8 ++++---
.../flink/table/store/table/sink/TableWrite.java | 2 +-
.../table/store/table/sink/TableWriteImpl.java | 5 +++--
.../store/file/operation/TestCommitThread.java | 2 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 12 +++++------
11 files changed, 41 insertions(+), 43 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java
index b33aa4a6..59f392cf 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java
@@ -208,7 +208,7 @@ public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
}
compactedBuckets.add(bucket);
try {
- write.compact(bucket.f0, bucket.f1);
+ write.compact(bucket.f0, bucket.f1, true);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index dc8f15d8..b331059c 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -99,7 +99,7 @@ public class StoreCompactOperator extends PrepareCommitOperator {
bucket);
}
try {
- write.compact(partition, bucket);
+ write.compact(partition, bucket, true);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
index 27d34a9c..dfa94931 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
@@ -94,18 +94,19 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
}
@Override
- public void fullCompaction() throws Exception {
- flushWriter(true);
+ public void compact(boolean fullCompaction) throws Exception {
+ flushWriter(true, fullCompaction);
}
@Override
public CommitIncrement prepareCommit(boolean blocking) throws Exception {
- flushWriter(false);
+ flushWriter(false, false);
trySyncLatestCompaction(blocking || forceCompact);
return drainIncrement();
}
- private void flushWriter(boolean forcedFullCompaction) throws Exception {
+ private void flushWriter(boolean waitForLatestCompaction, boolean forcedFullCompaction)
+ throws Exception {
List<DataFileMeta> flushedFiles = new ArrayList<>();
if (writer != null) {
writer.close();
@@ -119,7 +120,8 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
// add new generated files
flushedFiles.forEach(compactManager::addNewFile);
- submitCompaction(forcedFullCompaction);
+ trySyncLatestCompaction(waitForLatestCompaction);
+ compactManager.triggerCompaction(forcedFullCompaction);
newFiles.addAll(flushedFiles);
}
@@ -145,12 +147,6 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
schemaId, fileFormat, targetFileSize, writeSchema, pathFactory, seqNumCounter);
}
- private void submitCompaction(boolean forcedFullCompaction)
- throws ExecutionException, InterruptedException {
- trySyncLatestCompaction(forcedFullCompaction);
- compactManager.triggerCompaction(forcedFullCompaction);
- }
-
private void trySyncLatestCompaction(boolean blocking)
throws ExecutionException, InterruptedException {
compactManager
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 6945dec7..f17c8b0f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -130,7 +130,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
: kv.sequenceNumber();
boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
- flushWriteBuffer(false);
+ flushWriteBuffer(false, false);
success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
throw new RuntimeException("Mem table is too small to hold a single element.");
@@ -139,8 +139,8 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
}
@Override
- public void fullCompaction() throws Exception {
- flushWriteBuffer(true);
+ public void compact(boolean fullCompaction) throws Exception {
+ flushWriteBuffer(true, fullCompaction);
}
@Override
@@ -152,15 +152,15 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
public void flushMemory() throws Exception {
boolean success = writeBuffer.flushMemory();
if (!success) {
- flushWriteBuffer(false);
+ flushWriteBuffer(false, false);
}
}
- private void flushWriteBuffer(boolean forcedFullCompaction) throws Exception {
+ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFullCompaction)
+ throws Exception {
if (writeBuffer.size() > 0) {
if (compactManager.shouldWaitCompaction()) {
- // stop writing, wait for compaction finished
- trySyncLatestCompaction(true);
+ waitForLatestCompaction = true;
}
final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter =
@@ -194,12 +194,14 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
writeBuffer.clear();
}
- submitCompaction(forcedFullCompaction);
+
+ trySyncLatestCompaction(waitForLatestCompaction);
+ compactManager.triggerCompaction(forcedFullCompaction);
}
@Override
public CommitIncrement prepareCommit(boolean blocking) throws Exception {
- flushWriteBuffer(false);
+ flushWriteBuffer(false, false);
trySyncLatestCompaction(blocking || commitForceCompact);
return drainIncrement();
}
@@ -260,11 +262,6 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
compactChangelog.addAll(result.changelog());
}
- private void submitCompaction(boolean forcedFullCompaction) throws Exception {
- trySyncLatestCompaction(forcedFullCompaction);
- compactManager.triggerCompaction(forcedFullCompaction);
- }
-
private void trySyncLatestCompaction(boolean blocking) throws Exception {
Optional<CompactResult> result = compactManager.getCompactionResult(blocking);
result.ifPresent(this::updateCompactResult);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
index 45107366..98212081 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
@@ -108,8 +108,9 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
}
@Override
- public void compact(BinaryRowData partition, int bucket) throws Exception {
- getWriter(partition, bucket).fullCompaction();
+ public void compact(BinaryRowData partition, int bucket, boolean fullCompaction)
+ throws Exception {
+ getWriter(partition, bucket).compact(fullCompaction);
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index 36f6c9a6..919a1eaf 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -60,9 +60,10 @@ public interface FileStoreWrite<T> {
*
* @param partition the partition to compact
* @param bucket the bucket to compact
+ * @param fullCompaction whether to trigger full compaction or just normal compaction
* @throws Exception the thrown exception when compacting the records
*/
- void compact(BinaryRowData partition, int bucket) throws Exception;
+ void compact(BinaryRowData partition, int bucket, boolean fullCompaction) throws Exception;
/**
* Prepare commit in the write.
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
index a055bee7..bedac510 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
@@ -34,10 +34,12 @@ public interface RecordWriter<T> {
void write(T record) throws Exception;
/**
- * Compact all files related to the writer. Note that compaction process is only submitted and
- * may not be completed when the method returns.
+ * Compact files related to the writer. Note that compaction process is only submitted and may
+ * not be completed when the method returns.
+ *
+ * @param fullCompaction whether to trigger full compaction or just normal compaction
*/
- void fullCompaction() throws Exception;
+ void compact(boolean fullCompaction) throws Exception;
/**
* Prepare for a commit.
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
index 6e449d14..dce0eae4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
@@ -39,7 +39,7 @@ public interface TableWrite extends AutoCloseable {
/** Log record need to preserve original pk (which includes partition fields). */
SinkRecord toLogRecord(SinkRecord record);
- void compact(BinaryRowData partition, int bucket) throws Exception;
+ void compact(BinaryRowData partition, int bucket, boolean fullCompaction) throws Exception;
List<FileCommittable> prepareCommit(boolean blocking, long commitIdentifier) throws Exception;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
index 7cf96aea..fb3c9b20 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
@@ -70,8 +70,9 @@ public class TableWriteImpl<T> implements TableWrite {
}
@Override
- public void compact(BinaryRowData partition, int bucket) throws Exception {
- write.compact(partition, bucket);
+ public void compact(BinaryRowData partition, int bucket, boolean fullCompaction)
+ throws Exception {
+ write.compact(partition, bucket, fullCompaction);
}
@Override
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 10a183f3..a3018088 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -168,7 +168,7 @@ public class TestCommitThread extends Thread {
for (BinaryRowData partition : writtenPartitions) {
MergeTreeWriter writer =
writers.computeIfAbsent(partition, p -> createWriter(p, false));
- writer.fullCompaction();
+ writer.compact(true);
RecordWriter.CommitIncrement inc = writer.prepareCommit(true);
committable.addFileCommittable(
new FileCommittable(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index dd4116af..5178181e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -260,8 +260,8 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.write(rowData(2, 10, 210L));
write.write(rowData(2, 20, 220L));
write.write(rowDataWithKind(RowKind.DELETE, 2, 10, 210L));
- write.compact(binaryRow(1), 0);
- write.compact(binaryRow(2), 0);
+ write.compact(binaryRow(1), 0, true);
+ write.compact(binaryRow(2), 0, true);
commit.commit(0, write.prepareCommit(true, 0));
List<Split> splits = table.newScan().withIncremental(true).plan().splits();
@@ -281,8 +281,8 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 140L));
write.write(rowData(2, 40, 241L));
- write.compact(binaryRow(1), 0);
- write.compact(binaryRow(2), 0);
+ write.compact(binaryRow(1), 0, true);
+ write.compact(binaryRow(2), 0, true);
commit.commit(2, write.prepareCommit(true, 2));
splits = table.newScan().withIncremental(true).plan().splits();
@@ -305,8 +305,8 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
write.write(rowData(2, 20, 221L));
write.write(rowDataWithKind(RowKind.DELETE, 2, 20, 221L));
write.write(rowData(2, 40, 242L));
- write.compact(binaryRow(1), 0);
- write.compact(binaryRow(2), 0);
+ write.compact(binaryRow(1), 0, true);
+ write.compact(binaryRow(2), 0, true);
commit.commit(4, write.prepareCommit(true, 4));
splits = table.newScan().withIncremental(true).plan().splits();