You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/09 02:06:01 UTC
[flink-table-store] branch master updated: [FLINK-30589] Snapshot expiration should be skipped in Table Store dedicated writer jobs
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 8365182a [FLINK-30589] Snapshot expiration should be skipped in Table Store dedicated writer jobs
8365182a is described below
commit 8365182ae1b715508b3a24ed3911f2dfa244b17b
Author: tsreaper <ts...@gmail.com>
AuthorDate: Mon Jan 9 10:05:57 2023 +0800
[FLINK-30589] Snapshot expiration should be skipped in Table Store dedicated writer jobs
This closes #464
---
docs/content/docs/maintenance/write-performance.md | 4 ++--
docs/layouts/shortcodes/generated/core_configuration.html | 4 ++--
.../flink/table/store/connector/action/CompactAction.java | 2 +-
.../flink/table/store/connector/sink/FlinkSink.java | 2 +-
.../table/store/connector/sink/StoreCompactOperator.java | 5 ++---
.../connector/ChangelogWithKeyFileStoreTableITCase.java | 4 ++--
.../table/store/connector/action/CompactActionITCase.java | 10 ++++++++--
.../java/org/apache/flink/table/store/CoreOptions.java | 13 ++++++++-----
.../table/store/file/compact/NoopCompactManager.java | 2 +-
.../store/file/operation/AppendOnlyFileStoreWrite.java | 2 +-
.../store/file/operation/KeyValueFileStoreWrite.java | 2 +-
.../flink/table/store/table/AbstractFileStoreTable.java | 3 ++-
.../apache/flink/table/store/table/sink/TableCommit.java | 15 +++++++++++----
.../flink/table/store/table/FileStoreTableTestBase.java | 12 +++++++++---
.../flink/table/store/tests/FlinkActionsE2eTest.java | 2 +-
15 files changed, 52 insertions(+), 30 deletions(-)
diff --git a/docs/content/docs/maintenance/write-performance.md b/docs/content/docs/maintenance/write-performance.md
index 708c57f0..556efeb4 100644
--- a/docs/content/docs/maintenance/write-performance.md
+++ b/docs/content/docs/maintenance/write-performance.md
@@ -134,11 +134,11 @@ To skip compactions in writers, set the following table property to `true`.
</thead>
<tbody>
<tr>
- <td><h5>write.compaction-skip</h5></td>
+ <td><h5>write-only</h5></td>
<td>No</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
- <td>Whether to skip compaction on write.</td>
+ <td>If set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs.</td>
</tr>
</tbody>
</table>
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 069444ad..feae87cf 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -267,10 +267,10 @@
<td>Specify the write mode for table.<br /><br />Possible values:<ul><li>"append-only": The table can only accept append-only insert operations. Neither data deduplication nor any primary key constraints will be done when inserting rows into table store.</li><li>"change-log": The table can accept insert/delete/update operations.</li></ul></td>
</tr>
<tr>
- <td><h5>write.compaction-skip</h5></td>
+ <td><h5>write-only</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
- <td>Whether to skip compaction on write.</td>
+ <td>If set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs.</td>
</tr>
</tbody>
</table>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java
index 7b31df19..947056ad 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java
@@ -49,7 +49,7 @@ public class CompactAction {
CompactAction(Path tablePath) {
Configuration tableOptions = new Configuration();
tableOptions.set(CoreOptions.PATH, tablePath.toString());
- tableOptions.set(CoreOptions.WRITE_COMPACTION_SKIP, false);
+ tableOptions.set(CoreOptions.WRITE_ONLY, false);
FileStoreTable table = FileStoreTableFactory.create(tableOptions);
sourceBuilder = new CompactorSourceBuilder(tablePath.toString(), table);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
index bad47854..60c7d65f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
@@ -58,7 +58,7 @@ public abstract class FlinkSink implements Serializable {
protected StoreSinkWrite.Provider createWriteProvider(String initialCommitUser) {
if (table.options().changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION
- && !table.options().writeCompactionSkip()) {
+ && !table.options().writeOnly()) {
long fullCompactionThresholdMs =
table.options().changelogProducerFullCompactionTriggerInterval().toMillis();
return (table, context, ioManager) ->
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index fdab6d1f..fc7ce5a1 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -56,9 +56,8 @@ public class StoreCompactOperator extends PrepareCommitOperator {
StoreSinkWrite.Provider storeSinkWriteProvider,
boolean isStreaming) {
Preconditions.checkArgument(
- !table.options().writeCompactionSkip(),
- CoreOptions.WRITE_COMPACTION_SKIP.key()
- + " should not be true for StoreCompactOperator.");
+ !table.options().writeOnly(),
+ CoreOptions.WRITE_ONLY.key() + " should not be true for StoreCompactOperator.");
this.table = table;
this.storeSinkWriteProvider = storeSinkWriteProvider;
this.isStreaming = isStreaming;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
index 21a73a05..3a0c508f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
@@ -228,7 +228,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
+ " 'bucket' = '1',"
+ " 'changelog-producer' = 'full-compaction',"
+ " 'changelog-producer.compaction-interval' = '2s',"
- + " 'write.compaction-skip' = 'true'"
+ + " 'write-only' = 'true'"
+ ")");
// run select job
@@ -357,7 +357,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
random.nextBoolean() ? "512kb" : "1mb")
+ "'changelog-producer' = 'full-compaction',"
+ "'changelog-producer.compaction-interval' = '2s',"
- + "'write.compaction-skip' = 'true'");
+ + "'write-only' = 'true'");
// sleep for a random amount of time to check
// if dedicated compactor job can find first snapshot to compact correctly
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
index 803ea251..643880c5 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
@@ -86,7 +86,7 @@ public class CompactActionITCase extends AbstractTestBase {
@Test(timeout = 60000)
public void testBatchCompact() throws Exception {
Map<String, String> options = new HashMap<>();
- options.put(CoreOptions.WRITE_COMPACTION_SKIP.key(), "true");
+ options.put(CoreOptions.WRITE_ONLY.key(), "true");
FileStoreTable table = createFileStoreTable(options);
SnapshotManager snapshotManager = table.snapshotManager();
@@ -138,7 +138,10 @@ public class CompactActionITCase extends AbstractTestBase {
options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
options.put(CoreOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL.key(), "1s");
options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
- options.put(CoreOptions.WRITE_COMPACTION_SKIP.key(), "true");
+ options.put(CoreOptions.WRITE_ONLY.key(), "true");
+ // test that dedicated compact job will expire snapshots
+ options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
+ options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
FileStoreTable table = createFileStoreTable(options);
SnapshotManager snapshotManager = table.snapshotManager();
@@ -211,6 +214,9 @@ public class CompactActionITCase extends AbstractTestBase {
"-U 1|100|15|20221208",
"-U 1|100|15|20221209"),
actual);
+
+ // assert dedicated compact job will expire snapshots
+ Assert.assertEquals(2L, (long) snapshotManager.earliestSnapshotId());
}
private List<Map<String, String>> getSpecifiedPartitions() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 368620d6..dbc419cf 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -167,11 +167,14 @@ public class CoreOptions implements Serializable {
.defaultValue(WriteMode.CHANGE_LOG)
.withDescription("Specify the write mode for table.");
- public static final ConfigOption<Boolean> WRITE_COMPACTION_SKIP =
- ConfigOptions.key("write.compaction-skip")
+ public static final ConfigOption<Boolean> WRITE_ONLY =
+ ConfigOptions.key("write-only")
.booleanType()
.defaultValue(false)
- .withDescription("Whether to skip compaction on write.");
+ .withDeprecatedKeys("write.compaction-skip")
+ .withDescription(
+ "If set to true, compactions and snapshot expiration will be skipped. "
+ + "This option is used along with dedicated compact jobs.");
public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
ConfigOptions.key("source.split.target-size")
@@ -590,8 +593,8 @@ public class CoreOptions implements Serializable {
return options.get(WRITE_MODE);
}
- public boolean writeCompactionSkip() {
- return options.get(WRITE_COMPACTION_SKIP);
+ public boolean writeOnly() {
+ return options.get(WRITE_ONLY);
}
/** Specifies the merge engine for table with primary key. */
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
index db014c45..1fff0226 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
@@ -44,7 +44,7 @@ public class NoopCompactManager implements CompactManager {
!fullCompaction,
"NoopCompactManager does not support user triggered compaction.\n"
+ "If you really need a guaranteed compaction, please set "
- + CoreOptions.WRITE_COMPACTION_SKIP.key()
+ + CoreOptions.WRITE_ONLY.key()
+ " property of this table to false.");
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index 261b7c3f..9d086a21 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -77,7 +77,7 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
this.compactionMinFileNum = options.compactionMinFileNum();
this.compactionMaxFileNum = options.compactionMaxFileNum();
this.commitForceCompact = options.commitForceCompact();
- this.skipCompaction = options.writeCompactionSkip();
+ this.skipCompaction = options.writeOnly();
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 6e901453..acb8519a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -184,7 +184,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
CompactStrategy compactStrategy,
ExecutorService compactExecutor,
Levels levels) {
- if (options.writeCompactionSkip()) {
+ if (options.writeOnly()) {
return new NoopCompactManager();
} else {
Comparator<RowData> keyComparator = keyComparatorSupplier.get();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
index ead3f0f5..83022584 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
@@ -106,6 +106,7 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
@Override
public TableCommit newCommit(String commitUser) {
- return new TableCommit(store().newCommit(commitUser), store().newExpire());
+ return new TableCommit(
+ store().newCommit(commitUser), options().writeOnly() ? null : store().newExpire());
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
index c763cc1e..dd6056d5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
@@ -37,12 +37,12 @@ import java.util.Map;
public class TableCommit implements AutoCloseable {
private final FileStoreCommit commit;
- private final FileStoreExpire expire;
+ @Nullable private final FileStoreExpire expire;
@Nullable private Map<String, String> overwritePartition = null;
@Nullable private Lock lock;
- public TableCommit(FileStoreCommit commit, FileStoreExpire expire) {
+ public TableCommit(FileStoreCommit commit, @Nullable FileStoreExpire expire) {
this.commit = commit;
this.expire = expire;
}
@@ -54,7 +54,11 @@ public class TableCommit implements AutoCloseable {
public TableCommit withLock(Lock lock) {
commit.withLock(lock);
- expire.withLock(lock);
+
+ if (expire != null) {
+ expire.withLock(lock);
+ }
+
this.lock = lock;
return this;
}
@@ -97,7 +101,10 @@ public class TableCommit implements AutoCloseable {
}
commit.overwrite(overwritePartition, committable, new HashMap<>());
}
- expire.expire();
+
+ if (expire != null) {
+ expire.expire();
+ }
}
@Override
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index a19a4e79..95a01374 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -70,7 +70,9 @@ import static org.apache.flink.table.store.CoreOptions.BUCKET;
import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
import static org.apache.flink.table.store.CoreOptions.COMPACTION_MAX_FILE_NUM;
import static org.apache.flink.table.store.CoreOptions.FILE_FORMAT;
-import static org.apache.flink.table.store.CoreOptions.WRITE_COMPACTION_SKIP;
+import static org.apache.flink.table.store.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
+import static org.apache.flink.table.store.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
+import static org.apache.flink.table.store.CoreOptions.WRITE_ONLY;
import static org.assertj.core.api.Assertions.assertThat;
/** Base test class for {@link FileStoreTable}. */
@@ -317,12 +319,16 @@ public abstract class FileStoreTableTestBase {
}
@Test
- public void testWriteWithoutCompaction() throws Exception {
+ public void testWriteWithoutCompactionAndExpiration() throws Exception {
FileStoreTable table =
createFileStoreTable(
conf -> {
- conf.set(WRITE_COMPACTION_SKIP, true);
+ conf.set(WRITE_ONLY, true);
conf.set(COMPACTION_MAX_FILE_NUM, 5);
+ // 'write-only' options will also skip expiration
+ // these options shouldn't have any effect
+ conf.set(SNAPSHOT_NUM_RETAINED_MIN, 3);
+ conf.set(SNAPSHOT_NUM_RETAINED_MAX, 3);
});
TableWrite write = table.newWrite(commitUser);
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
index 3d682efa..956d2051 100644
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
@@ -77,7 +77,7 @@ public class FlinkActionsE2eTest extends E2eTestBase {
+ ") PARTITIONED BY (dt) WITH (\n"
+ " 'changelog-producer' = 'full-compaction',\n"
+ " 'changelog-producer.compaction-interval' = '1s',\n"
- + " 'write.compaction-skip' = 'true'\n"
+ + " 'write-only' = 'true'\n"
+ ");";
// insert data into table store