You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/28 06:49:57 UTC

[flink-table-store] branch master updated: [FLINK-30205] Modify compact interface for TableWrite and FileStoreWrite to support normal compaction in Table Store

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new e6e62699 [FLINK-30205] Modify compact interface for TableWrite and FileStoreWrite to support normal compaction in Table Store
e6e62699 is described below

commit e6e62699ef42a90d9eb2ca73b10460eb2764a586
Author: tsreaper <ts...@gmail.com>
AuthorDate: Mon Nov 28 14:49:51 2022 +0800

    [FLINK-30205] Modify compact interface for TableWrite and FileStoreWrite to support normal compaction in Table Store
    
    This closes #403
---
 .../sink/FullChangelogStoreWriteOperator.java      |  2 +-
 .../store/connector/sink/StoreCompactOperator.java |  2 +-
 .../table/store/file/append/AppendOnlyWriter.java  | 18 ++++++----------
 .../store/file/mergetree/MergeTreeWriter.java      | 25 ++++++++++------------
 .../file/operation/AbstractFileStoreWrite.java     |  5 +++--
 .../table/store/file/operation/FileStoreWrite.java |  3 ++-
 .../flink/table/store/file/utils/RecordWriter.java |  8 ++++---
 .../flink/table/store/table/sink/TableWrite.java   |  2 +-
 .../table/store/table/sink/TableWriteImpl.java     |  5 +++--
 .../store/file/operation/TestCommitThread.java     |  2 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  | 12 +++++------
 11 files changed, 41 insertions(+), 43 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java
index b33aa4a6..59f392cf 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreWriteOperator.java
@@ -208,7 +208,7 @@ public class FullChangelogStoreWriteOperator extends StoreWriteOperator {
                     }
                     compactedBuckets.add(bucket);
                     try {
-                        write.compact(bucket.f0, bucket.f1);
+                        write.compact(bucket.f0, bucket.f1, true);
                     } catch (Exception e) {
                         throw new RuntimeException(e);
                     }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index dc8f15d8..b331059c 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -99,7 +99,7 @@ public class StoreCompactOperator extends PrepareCommitOperator {
                         bucket);
             }
             try {
-                write.compact(partition, bucket);
+                write.compact(partition, bucket, true);
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
index 27d34a9c..dfa94931 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
@@ -94,18 +94,19 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
     }
 
     @Override
-    public void fullCompaction() throws Exception {
-        flushWriter(true);
+    public void compact(boolean fullCompaction) throws Exception {
+        flushWriter(true, fullCompaction);
     }
 
     @Override
     public CommitIncrement prepareCommit(boolean blocking) throws Exception {
-        flushWriter(false);
+        flushWriter(false, false);
         trySyncLatestCompaction(blocking || forceCompact);
         return drainIncrement();
     }
 
-    private void flushWriter(boolean forcedFullCompaction) throws Exception {
+    private void flushWriter(boolean waitForLatestCompaction, boolean forcedFullCompaction)
+            throws Exception {
         List<DataFileMeta> flushedFiles = new ArrayList<>();
         if (writer != null) {
             writer.close();
@@ -119,7 +120,8 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
 
         // add new generated files
         flushedFiles.forEach(compactManager::addNewFile);
-        submitCompaction(forcedFullCompaction);
+        trySyncLatestCompaction(waitForLatestCompaction);
+        compactManager.triggerCompaction(forcedFullCompaction);
         newFiles.addAll(flushedFiles);
     }
 
@@ -145,12 +147,6 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
                 schemaId, fileFormat, targetFileSize, writeSchema, pathFactory, seqNumCounter);
     }
 
-    private void submitCompaction(boolean forcedFullCompaction)
-            throws ExecutionException, InterruptedException {
-        trySyncLatestCompaction(forcedFullCompaction);
-        compactManager.triggerCompaction(forcedFullCompaction);
-    }
-
     private void trySyncLatestCompaction(boolean blocking)
             throws ExecutionException, InterruptedException {
         compactManager
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 6945dec7..f17c8b0f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -130,7 +130,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
                         : kv.sequenceNumber();
         boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
         if (!success) {
-            flushWriteBuffer(false);
+            flushWriteBuffer(false, false);
             success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
             if (!success) {
                 throw new RuntimeException("Mem table is too small to hold a single element.");
@@ -139,8 +139,8 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
     }
 
     @Override
-    public void fullCompaction() throws Exception {
-        flushWriteBuffer(true);
+    public void compact(boolean fullCompaction) throws Exception {
+        flushWriteBuffer(true, fullCompaction);
     }
 
     @Override
@@ -152,15 +152,15 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
     public void flushMemory() throws Exception {
         boolean success = writeBuffer.flushMemory();
         if (!success) {
-            flushWriteBuffer(false);
+            flushWriteBuffer(false, false);
         }
     }
 
-    private void flushWriteBuffer(boolean forcedFullCompaction) throws Exception {
+    private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFullCompaction)
+            throws Exception {
         if (writeBuffer.size() > 0) {
             if (compactManager.shouldWaitCompaction()) {
-                // stop writing, wait for compaction finished
-                trySyncLatestCompaction(true);
+                waitForLatestCompaction = true;
             }
 
             final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter =
@@ -194,12 +194,14 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
 
             writeBuffer.clear();
         }
-        submitCompaction(forcedFullCompaction);
+
+        trySyncLatestCompaction(waitForLatestCompaction);
+        compactManager.triggerCompaction(forcedFullCompaction);
     }
 
     @Override
     public CommitIncrement prepareCommit(boolean blocking) throws Exception {
-        flushWriteBuffer(false);
+        flushWriteBuffer(false, false);
         trySyncLatestCompaction(blocking || commitForceCompact);
         return drainIncrement();
     }
@@ -260,11 +262,6 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
         compactChangelog.addAll(result.changelog());
     }
 
-    private void submitCompaction(boolean forcedFullCompaction) throws Exception {
-        trySyncLatestCompaction(forcedFullCompaction);
-        compactManager.triggerCompaction(forcedFullCompaction);
-    }
-
     private void trySyncLatestCompaction(boolean blocking) throws Exception {
         Optional<CompactResult> result = compactManager.getCompactionResult(blocking);
         result.ifPresent(this::updateCompactResult);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
index 45107366..98212081 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
@@ -108,8 +108,9 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
     }
 
     @Override
-    public void compact(BinaryRowData partition, int bucket) throws Exception {
-        getWriter(partition, bucket).fullCompaction();
+    public void compact(BinaryRowData partition, int bucket, boolean fullCompaction)
+            throws Exception {
+        getWriter(partition, bucket).compact(fullCompaction);
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index 36f6c9a6..919a1eaf 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -60,9 +60,10 @@ public interface FileStoreWrite<T> {
      *
      * @param partition the partition to compact
      * @param bucket the bucket to compact
+     * @param fullCompaction whether to trigger full compaction or just normal compaction
      * @throws Exception the thrown exception when compacting the records
      */
-    void compact(BinaryRowData partition, int bucket) throws Exception;
+    void compact(BinaryRowData partition, int bucket, boolean fullCompaction) throws Exception;
 
     /**
      * Prepare commit in the write.
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
index a055bee7..bedac510 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
@@ -34,10 +34,12 @@ public interface RecordWriter<T> {
     void write(T record) throws Exception;
 
     /**
-     * Compact all files related to the writer. Note that compaction process is only submitted and
-     * may not be completed when the method returns.
+     * Compact files related to the writer. Note that compaction process is only submitted and may
+     * not be completed when the method returns.
+     *
+     * @param fullCompaction whether to trigger full compaction or just normal compaction
      */
-    void fullCompaction() throws Exception;
+    void compact(boolean fullCompaction) throws Exception;
 
     /**
      * Prepare for a commit.
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
index 6e449d14..dce0eae4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
@@ -39,7 +39,7 @@ public interface TableWrite extends AutoCloseable {
     /** Log record need to preserve original pk (which includes partition fields). */
     SinkRecord toLogRecord(SinkRecord record);
 
-    void compact(BinaryRowData partition, int bucket) throws Exception;
+    void compact(BinaryRowData partition, int bucket, boolean fullCompaction) throws Exception;
 
     List<FileCommittable> prepareCommit(boolean blocking, long commitIdentifier) throws Exception;
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
index 7cf96aea..fb3c9b20 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
@@ -70,8 +70,9 @@ public class TableWriteImpl<T> implements TableWrite {
     }
 
     @Override
-    public void compact(BinaryRowData partition, int bucket) throws Exception {
-        write.compact(partition, bucket);
+    public void compact(BinaryRowData partition, int bucket, boolean fullCompaction)
+            throws Exception {
+        write.compact(partition, bucket, fullCompaction);
     }
 
     @Override
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 10a183f3..a3018088 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -168,7 +168,7 @@ public class TestCommitThread extends Thread {
                 for (BinaryRowData partition : writtenPartitions) {
                     MergeTreeWriter writer =
                             writers.computeIfAbsent(partition, p -> createWriter(p, false));
-                    writer.fullCompaction();
+                    writer.compact(true);
                     RecordWriter.CommitIncrement inc = writer.prepareCommit(true);
                     committable.addFileCommittable(
                             new FileCommittable(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index dd4116af..5178181e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -260,8 +260,8 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         write.write(rowData(2, 10, 210L));
         write.write(rowData(2, 20, 220L));
         write.write(rowDataWithKind(RowKind.DELETE, 2, 10, 210L));
-        write.compact(binaryRow(1), 0);
-        write.compact(binaryRow(2), 0);
+        write.compact(binaryRow(1), 0, true);
+        write.compact(binaryRow(2), 0, true);
         commit.commit(0, write.prepareCommit(true, 0));
 
         List<Split> splits = table.newScan().withIncremental(true).plan().splits();
@@ -281,8 +281,8 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
 
         write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 140L));
         write.write(rowData(2, 40, 241L));
-        write.compact(binaryRow(1), 0);
-        write.compact(binaryRow(2), 0);
+        write.compact(binaryRow(1), 0, true);
+        write.compact(binaryRow(2), 0, true);
         commit.commit(2, write.prepareCommit(true, 2));
 
         splits = table.newScan().withIncremental(true).plan().splits();
@@ -305,8 +305,8 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         write.write(rowData(2, 20, 221L));
         write.write(rowDataWithKind(RowKind.DELETE, 2, 20, 221L));
         write.write(rowData(2, 40, 242L));
-        write.compact(binaryRow(1), 0);
-        write.compact(binaryRow(2), 0);
+        write.compact(binaryRow(1), 0, true);
+        write.compact(binaryRow(2), 0, true);
         commit.commit(4, write.prepareCommit(true, 4));
 
         splits = table.newScan().withIncremental(true).plan().splits();