You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/09 02:06:01 UTC

[flink-table-store] branch master updated: [FLINK-30589] Snapshot expiration should be skipped in Table Store dedicated writer jobs

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 8365182a [FLINK-30589] Snapshot expiration should be skipped in Table Store dedicated writer jobs
8365182a is described below

commit 8365182ae1b715508b3a24ed3911f2dfa244b17b
Author: tsreaper <ts...@gmail.com>
AuthorDate: Mon Jan 9 10:05:57 2023 +0800

    [FLINK-30589] Snapshot expiration should be skipped in Table Store dedicated writer jobs
    
    This closes #464
---
 docs/content/docs/maintenance/write-performance.md        |  4 ++--
 docs/layouts/shortcodes/generated/core_configuration.html |  4 ++--
 .../flink/table/store/connector/action/CompactAction.java |  2 +-
 .../flink/table/store/connector/sink/FlinkSink.java       |  2 +-
 .../table/store/connector/sink/StoreCompactOperator.java  |  5 ++---
 .../connector/ChangelogWithKeyFileStoreTableITCase.java   |  4 ++--
 .../table/store/connector/action/CompactActionITCase.java | 10 ++++++++--
 .../java/org/apache/flink/table/store/CoreOptions.java    | 13 ++++++++-----
 .../table/store/file/compact/NoopCompactManager.java      |  2 +-
 .../store/file/operation/AppendOnlyFileStoreWrite.java    |  2 +-
 .../store/file/operation/KeyValueFileStoreWrite.java      |  2 +-
 .../flink/table/store/table/AbstractFileStoreTable.java   |  3 ++-
 .../apache/flink/table/store/table/sink/TableCommit.java  | 15 +++++++++++----
 .../flink/table/store/table/FileStoreTableTestBase.java   | 12 +++++++++---
 .../flink/table/store/tests/FlinkActionsE2eTest.java      |  2 +-
 15 files changed, 52 insertions(+), 30 deletions(-)

diff --git a/docs/content/docs/maintenance/write-performance.md b/docs/content/docs/maintenance/write-performance.md
index 708c57f0..556efeb4 100644
--- a/docs/content/docs/maintenance/write-performance.md
+++ b/docs/content/docs/maintenance/write-performance.md
@@ -134,11 +134,11 @@ To skip compactions in writers, set the following table property to `true`.
     </thead>
     <tbody>
     <tr>
-      <td><h5>write.compaction-skip</h5></td>
+      <td><h5>write-only</h5></td>
       <td>No</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
-      <td>Whether to skip compaction on write.</td>
+      <td>If set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs.</td>
     </tr>
     </tbody>
 </table>
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 069444ad..feae87cf 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -267,10 +267,10 @@
             <td>Specify the write mode for table.<br /><br />Possible values:<ul><li>"append-only": The table can only accept append-only insert operations. Neither data deduplication nor any primary key constraints will be done when inserting rows into table store.</li><li>"change-log": The table can accept insert/delete/update operations.</li></ul></td>
         </tr>
         <tr>
-            <td><h5>write.compaction-skip</h5></td>
+            <td><h5>write-only</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>Whether to skip compaction on write.</td>
+            <td>If set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs.</td>
         </tr>
     </tbody>
 </table>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java
index 7b31df19..947056ad 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java
@@ -49,7 +49,7 @@ public class CompactAction {
     CompactAction(Path tablePath) {
         Configuration tableOptions = new Configuration();
         tableOptions.set(CoreOptions.PATH, tablePath.toString());
-        tableOptions.set(CoreOptions.WRITE_COMPACTION_SKIP, false);
+        tableOptions.set(CoreOptions.WRITE_ONLY, false);
         FileStoreTable table = FileStoreTableFactory.create(tableOptions);
 
         sourceBuilder = new CompactorSourceBuilder(tablePath.toString(), table);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
index bad47854..60c7d65f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
@@ -58,7 +58,7 @@ public abstract class FlinkSink implements Serializable {
 
     protected StoreSinkWrite.Provider createWriteProvider(String initialCommitUser) {
         if (table.options().changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION
-                && !table.options().writeCompactionSkip()) {
+                && !table.options().writeOnly()) {
             long fullCompactionThresholdMs =
                     table.options().changelogProducerFullCompactionTriggerInterval().toMillis();
             return (table, context, ioManager) ->
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index fdab6d1f..fc7ce5a1 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -56,9 +56,8 @@ public class StoreCompactOperator extends PrepareCommitOperator {
             StoreSinkWrite.Provider storeSinkWriteProvider,
             boolean isStreaming) {
         Preconditions.checkArgument(
-                !table.options().writeCompactionSkip(),
-                CoreOptions.WRITE_COMPACTION_SKIP.key()
-                        + " should not be true for StoreCompactOperator.");
+                !table.options().writeOnly(),
+                CoreOptions.WRITE_ONLY.key() + " should not be true for StoreCompactOperator.");
         this.table = table;
         this.storeSinkWriteProvider = storeSinkWriteProvider;
         this.isStreaming = isStreaming;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
index 21a73a05..3a0c508f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
@@ -228,7 +228,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
                         + "  'bucket' = '1',"
                         + "  'changelog-producer' = 'full-compaction',"
                         + "  'changelog-producer.compaction-interval' = '2s',"
-                        + "  'write.compaction-skip' = 'true'"
+                        + "  'write-only' = 'true'"
                         + ")");
 
         // run select job
@@ -357,7 +357,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
                                 random.nextBoolean() ? "512kb" : "1mb")
                         + "'changelog-producer' = 'full-compaction',"
                         + "'changelog-producer.compaction-interval' = '2s',"
-                        + "'write.compaction-skip' = 'true'");
+                        + "'write-only' = 'true'");
 
         // sleep for a random amount of time to check
         // if dedicated compactor job can find first snapshot to compact correctly
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
index 803ea251..643880c5 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
@@ -86,7 +86,7 @@ public class CompactActionITCase extends AbstractTestBase {
     @Test(timeout = 60000)
     public void testBatchCompact() throws Exception {
         Map<String, String> options = new HashMap<>();
-        options.put(CoreOptions.WRITE_COMPACTION_SKIP.key(), "true");
+        options.put(CoreOptions.WRITE_ONLY.key(), "true");
 
         FileStoreTable table = createFileStoreTable(options);
         SnapshotManager snapshotManager = table.snapshotManager();
@@ -138,7 +138,10 @@ public class CompactActionITCase extends AbstractTestBase {
         options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
         options.put(CoreOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL.key(), "1s");
         options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
-        options.put(CoreOptions.WRITE_COMPACTION_SKIP.key(), "true");
+        options.put(CoreOptions.WRITE_ONLY.key(), "true");
+        // test that dedicated compact job will expire snapshots
+        options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
+        options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
 
         FileStoreTable table = createFileStoreTable(options);
         SnapshotManager snapshotManager = table.snapshotManager();
@@ -211,6 +214,9 @@ public class CompactActionITCase extends AbstractTestBase {
                         "-U 1|100|15|20221208",
                         "-U 1|100|15|20221209"),
                 actual);
+
+        // assert dedicated compact job will expire snapshots
+        Assert.assertEquals(2L, (long) snapshotManager.earliestSnapshotId());
     }
 
     private List<Map<String, String>> getSpecifiedPartitions() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 368620d6..dbc419cf 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -167,11 +167,14 @@ public class CoreOptions implements Serializable {
                     .defaultValue(WriteMode.CHANGE_LOG)
                     .withDescription("Specify the write mode for table.");
 
-    public static final ConfigOption<Boolean> WRITE_COMPACTION_SKIP =
-            ConfigOptions.key("write.compaction-skip")
+    public static final ConfigOption<Boolean> WRITE_ONLY =
+            ConfigOptions.key("write-only")
                     .booleanType()
                     .defaultValue(false)
-                    .withDescription("Whether to skip compaction on write.");
+                    .withDeprecatedKeys("write.compaction-skip")
+                    .withDescription(
+                            "If set to true, compactions and snapshot expiration will be skipped. "
+                                    + "This option is used along with dedicated compact jobs.");
 
     public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
             ConfigOptions.key("source.split.target-size")
@@ -590,8 +593,8 @@ public class CoreOptions implements Serializable {
         return options.get(WRITE_MODE);
     }
 
-    public boolean writeCompactionSkip() {
-        return options.get(WRITE_COMPACTION_SKIP);
+    public boolean writeOnly() {
+        return options.get(WRITE_ONLY);
     }
 
     /** Specifies the merge engine for table with primary key. */
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
index db014c45..1fff0226 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
@@ -44,7 +44,7 @@ public class NoopCompactManager implements CompactManager {
                 !fullCompaction,
                 "NoopCompactManager does not support user triggered compaction.\n"
                         + "If you really need a guaranteed compaction, please set "
-                        + CoreOptions.WRITE_COMPACTION_SKIP.key()
+                        + CoreOptions.WRITE_ONLY.key()
                         + " property of this table to false.");
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index 261b7c3f..9d086a21 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -77,7 +77,7 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
         this.compactionMinFileNum = options.compactionMinFileNum();
         this.compactionMaxFileNum = options.compactionMaxFileNum();
         this.commitForceCompact = options.commitForceCompact();
-        this.skipCompaction = options.writeCompactionSkip();
+        this.skipCompaction = options.writeOnly();
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 6e901453..acb8519a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -184,7 +184,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
             CompactStrategy compactStrategy,
             ExecutorService compactExecutor,
             Levels levels) {
-        if (options.writeCompactionSkip()) {
+        if (options.writeOnly()) {
             return new NoopCompactManager();
         } else {
             Comparator<RowData> keyComparator = keyComparatorSupplier.get();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
index ead3f0f5..83022584 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
@@ -106,6 +106,7 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
 
     @Override
     public TableCommit newCommit(String commitUser) {
-        return new TableCommit(store().newCommit(commitUser), store().newExpire());
+        return new TableCommit(
+                store().newCommit(commitUser), options().writeOnly() ? null : store().newExpire());
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
index c763cc1e..dd6056d5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
@@ -37,12 +37,12 @@ import java.util.Map;
 public class TableCommit implements AutoCloseable {
 
     private final FileStoreCommit commit;
-    private final FileStoreExpire expire;
+    @Nullable private final FileStoreExpire expire;
 
     @Nullable private Map<String, String> overwritePartition = null;
     @Nullable private Lock lock;
 
-    public TableCommit(FileStoreCommit commit, FileStoreExpire expire) {
+    public TableCommit(FileStoreCommit commit, @Nullable FileStoreExpire expire) {
         this.commit = commit;
         this.expire = expire;
     }
@@ -54,7 +54,11 @@ public class TableCommit implements AutoCloseable {
 
     public TableCommit withLock(Lock lock) {
         commit.withLock(lock);
-        expire.withLock(lock);
+
+        if (expire != null) {
+            expire.withLock(lock);
+        }
+
         this.lock = lock;
         return this;
     }
@@ -97,7 +101,10 @@ public class TableCommit implements AutoCloseable {
             }
             commit.overwrite(overwritePartition, committable, new HashMap<>());
         }
-        expire.expire();
+
+        if (expire != null) {
+            expire.expire();
+        }
     }
 
     @Override
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index a19a4e79..95a01374 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -70,7 +70,9 @@ import static org.apache.flink.table.store.CoreOptions.BUCKET;
 import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
 import static org.apache.flink.table.store.CoreOptions.COMPACTION_MAX_FILE_NUM;
 import static org.apache.flink.table.store.CoreOptions.FILE_FORMAT;
-import static org.apache.flink.table.store.CoreOptions.WRITE_COMPACTION_SKIP;
+import static org.apache.flink.table.store.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
+import static org.apache.flink.table.store.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
+import static org.apache.flink.table.store.CoreOptions.WRITE_ONLY;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Base test class for {@link FileStoreTable}. */
@@ -317,12 +319,16 @@ public abstract class FileStoreTableTestBase {
     }
 
     @Test
-    public void testWriteWithoutCompaction() throws Exception {
+    public void testWriteWithoutCompactionAndExpiration() throws Exception {
         FileStoreTable table =
                 createFileStoreTable(
                         conf -> {
-                            conf.set(WRITE_COMPACTION_SKIP, true);
+                            conf.set(WRITE_ONLY, true);
                             conf.set(COMPACTION_MAX_FILE_NUM, 5);
+                            // 'write-only' options will also skip expiration
+                            // these options shouldn't have any effect
+                            conf.set(SNAPSHOT_NUM_RETAINED_MIN, 3);
+                            conf.set(SNAPSHOT_NUM_RETAINED_MAX, 3);
                         });
 
         TableWrite write = table.newWrite(commitUser);
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
index 3d682efa..956d2051 100644
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
@@ -77,7 +77,7 @@ public class FlinkActionsE2eTest extends E2eTestBase {
                         + ") PARTITIONED BY (dt) WITH (\n"
                         + "    'changelog-producer' = 'full-compaction',\n"
                         + "    'changelog-producer.compaction-interval' = '1s',\n"
-                        + "    'write.compaction-skip' = 'true'\n"
+                        + "    'write-only' = 'true'\n"
                         + ");";
 
         // insert data into table store