You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/12/06 07:44:38 UTC
[flink-table-store] branch master updated: [FLINK-30294] Change table property key 'log.scan' to 'scan.mode' and add a default startup mode in Table Store
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new dadd735d [FLINK-30294] Change table property key 'log.scan' to 'scan.mode' and add a default startup mode in Table Store
dadd735d is described below
commit dadd735d362d355d60b737bf2c9669cb2fc880f6
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Dec 6 15:44:34 2022 +0800
[FLINK-30294] Change table property key 'log.scan' to 'scan.mode' and add a default startup mode in Table Store
This closes #422.
---
docs/content/docs/development/streaming-query.md | 34 +++++++--
.../shortcodes/generated/core_configuration.html | 24 +++---
.../store/connector/source/FlinkSourceBuilder.java | 9 ++-
.../ComputedColumnAndWatermarkTableITCase.java | 12 +--
.../store/connector/ReadWriteTableTestBase.java | 12 +--
.../org/apache/flink/table/store/CoreOptions.java | 89 +++++++++++++++-------
.../store/table/source/TableStreamingReader.java | 2 +-
.../snapshot/DataFileSnapshotEnumerator.java | 12 +--
.../source/snapshot/DeltaSnapshotEnumerator.java | 2 +-
.../FullCompactionChangelogSnapshotEnumerator.java | 2 +-
.../snapshot/InputChangelogSnapshotEnumerator.java | 2 +-
.../apache/flink/table/store/CoreOptionsTest.java | 53 +++++++++++++
.../table/ChangelogWithKeyFileStoreTableTest.java | 2 +-
.../snapshot/DeltaSnapshotEnumeratorTest.java | 6 +-
...lCompactionChangelogSnapshotEnumeratorTest.java | 6 +-
.../InputChangelogSnapshotEnumeratorTest.java | 6 +-
.../table/store/kafka/KafkaLogSourceProvider.java | 6 +-
.../table/store/kafka/KafkaLogStoreFactory.java | 9 ++-
18 files changed, 199 insertions(+), 89 deletions(-)
diff --git a/docs/content/docs/development/streaming-query.md b/docs/content/docs/development/streaming-query.md
index e7c0208b..8acf9531 100644
--- a/docs/content/docs/development/streaming-query.md
+++ b/docs/content/docs/development/streaming-query.md
@@ -42,10 +42,10 @@ SELECT * FROM MyTable;
-- Streaming mode, streaming reading, read latest incremental
SET 'execution.runtime-mode' = 'streaming';
-SELECT * FROM MyTable /*+ OPTIONS ('log.scan'='latest') */;
+SELECT * FROM MyTable /*+ OPTIONS ('scan.mode'='latest') */;
```
-Different `log.scan` mode will result in different consuming behavior under streaming mode.
+Different `scan.mode` will result in different consuming behavior under streaming mode.
<table class="table table-bordered">
<thead>
<tr>
@@ -56,14 +56,36 @@ Different `log.scan` mode will result in different consuming behavior under stre
</thead>
<tbody>
<tr>
- <td><h5>FULL</h5></td>
+ <td><h5>default</h5></td>
<td>Yes</td>
- <td>FULL scan mode performs a hybrid reading with a snapshot scan and the streaming incremental scan.</td>
+ <td>
+ Determines actual startup mode according to other table properties.
+ If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp".
+ Otherwise the actual startup mode will be "full".
+ </td>
</tr>
<tr>
- <td><h5>LATEST</h5></td>
+ <td><h5>full</h5></td>
<td>No</td>
- <td>LATEST scan mode only reads incremental data from the latest offset.</td>
+ <td>
+ Produces a snapshot on the table upon first startup,
+ and continue to read the latest changes.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>latest</h5></td>
+ <td>No</td>
+ <td>
+ Continuously reads latest changes without producing a snapshot at the beginning.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>from-timestamp</h5></td>
+ <td>No</td>
+ <td>
+ Continuously reads changes starting from timestamp specified by "scan.timestamp-millis",
+ without producing a snapshot at the beginning.
+ </td>
</tr>
</tbody>
</table>
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 1f7401ad..14c721f3 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -122,24 +122,12 @@
<td>Duration</td>
<td>It means how long changes log will be kept. The default value is from the log system cluster.</td>
</tr>
- <tr>
- <td><h5>log.scan</h5></td>
- <td style="word-wrap: break-word;">full</td>
- <td><p>Enum</p></td>
- <td>Specify the startup mode for log consumer.<br /><br />Possible values:<ul><li>"full": Perform a snapshot on the table upon first startup, and continue to read the latest changes.</li><li>"latest": Start from the latest.</li><li>"from-timestamp": Start from user-supplied timestamp.</li></ul></td>
- </tr>
<tr>
<td><h5>log.scan.remove-normalize</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to force the removal of the normalize node when streaming read. Note: This is dangerous and is likely to cause data errors if downstream is used to calculate aggregation and the input is not complete changelog.</td>
</tr>
- <tr>
- <td><h5>log.scan.timestamp-millis</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>Long</td>
- <td>Optional timestamp used in case of "from-timestamp" scan mode</td>
- </tr>
<tr>
<td><h5>manifest.format</h5></td>
<td style="word-wrap: break-word;">"avro"</td>
@@ -206,6 +194,18 @@
<td>Boolean</td>
<td>Whether to read compacted snapshot only.</td>
</tr>
+ <tr>
+ <td><h5>scan.mode</h5></td>
+ <td style="word-wrap: break-word;">deafult</td>
+ <td><p>Enum</p></td>
+ <td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"deafult": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp". Otherwise the actual startup mode will be "full".</li><li>"full": For streaming sources, produces a snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce a snapshot but doe [...]
+ </tr>
+ <tr>
+ <td><h5>scan.timestamp-millis</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Long</td>
+ <td>Optional timestamp used in case of "from-timestamp" scan mode.</td>
+ </tr>
<tr>
<td><h5>sequence.field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index 57b544e4..b2495234 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -29,8 +29,9 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.store.CoreOptions.LogStartupMode;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.CoreOptions.MergeEngine;
+import org.apache.flink.table.store.CoreOptions.StartupMode;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.log.LogSourceProvider;
import org.apache.flink.table.store.table.FileStoreTable;
@@ -46,7 +47,6 @@ import java.util.Optional;
import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.flink.table.store.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
import static org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION;
-import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
import static org.apache.flink.table.store.CoreOptions.MERGE_ENGINE;
import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
@@ -146,11 +146,12 @@ public class FlinkSourceBuilder {
+ "You can use full compaction changelog producer to support streaming reading.");
}
- LogStartupMode startupMode = conf.get(LOG_SCAN);
+ // TODO visit all options through CoreOptions
+ StartupMode startupMode = CoreOptions.startupMode(conf);
if (logSourceProvider == null) {
return buildContinuousFileSource();
} else {
- if (startupMode != LogStartupMode.FULL) {
+ if (startupMode != StartupMode.FULL) {
return logSourceProvider.createSource(null);
}
return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
index febc784d..80199c87 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java
@@ -19,7 +19,7 @@
package org.apache.flink.table.store.connector;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.store.CoreOptions.LogStartupMode;
+import org.apache.flink.table.store.CoreOptions.StartupMode;
import org.junit.Test;
@@ -29,7 +29,7 @@ import java.util.Collections;
import java.util.stream.Collectors;
import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
-import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
+import static org.apache.flink.table.store.CoreOptions.SCAN_MODE;
import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.rates;
import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.ratesWithTimestamp;
@@ -146,7 +146,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
null,
false,
Collections.singletonMap(
- LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()),
+ SCAN_MODE.key(), StartupMode.LATEST.name().toLowerCase()),
"rate_by_to_currency IS NULL",
Arrays.asList(
"corrected_rate_by_to_currency",
@@ -190,7 +190,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
WatermarkSpec.of("ts", "ts - INTERVAL '3' YEAR"),
false,
Collections.singletonMap(
- LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()),
+ SCAN_MODE.key(), StartupMode.LATEST.name().toLowerCase()),
lateEventFilter,
Collections.emptyList(), // projection
Collections.singletonList(
@@ -213,7 +213,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
WatermarkSpec.of("ts1", "ts1 - INTERVAL '3' YEAR"),
false,
Collections.singletonMap(
- LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()),
+ SCAN_MODE.key(), StartupMode.LATEST.name().toLowerCase()),
lateEventFilter.replaceAll("ts", "ts1"),
Arrays.asList("currency", "rate", "ts1"),
Collections.singletonList(
@@ -237,7 +237,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas
WatermarkSpec.of("ts", "ts - INTERVAL '3' YEAR"),
false,
Collections.singletonMap(
- LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()),
+ SCAN_MODE.key(), StartupMode.LATEST.name().toLowerCase()),
lateEventFilter,
Arrays.asList(
"currency",
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
index 1da0c1ed..41867850 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
@@ -30,7 +30,7 @@ import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.store.CoreOptions.LogStartupMode;
+import org.apache.flink.table.store.CoreOptions.StartupMode;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
import org.apache.flink.types.Row;
@@ -49,8 +49,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
-import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
-import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_TIMESTAMP_MILLS;
+import static org.apache.flink.table.store.CoreOptions.SCAN_MODE;
+import static org.apache.flink.table.store.CoreOptions.SCAN_TIMESTAMP_MILLIS;
import static org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM;
import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH;
import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath;
@@ -232,7 +232,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
List<Row> expected)
throws Exception {
Map<String, String> hints = new HashMap<>();
- hints.put(LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase());
+ hints.put(SCAN_MODE.key(), StartupMode.LATEST.name().toLowerCase());
collectAndCheckUnderSameEnv(
true,
true,
@@ -258,8 +258,8 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
List<Row> expected)
throws Exception {
Map<String, String> hints = new HashMap<>();
- hints.put(LOG_SCAN.key(), "from-timestamp");
- hints.put(LOG_SCAN_TIMESTAMP_MILLS.key(), String.valueOf(timestamp));
+ hints.put(SCAN_MODE.key(), "from-timestamp");
+ hints.put(SCAN_TIMESTAMP_MILLIS.key(), String.valueOf(timestamp));
collectAndCheckUnderSameEnv(
true,
true,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 08bad53b..ed5b1474 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.core.fs.Path;
@@ -327,18 +328,20 @@ public class CoreOptions implements Serializable {
"The field that generates the sequence number for primary key table,"
+ " the sequence number determines which data is the most recent.");
- public static final ConfigOption<LogStartupMode> LOG_SCAN =
- ConfigOptions.key("log.scan")
- .enumType(LogStartupMode.class)
- .defaultValue(LogStartupMode.FULL)
- .withDescription("Specify the startup mode for log consumer.");
+ public static final ConfigOption<StartupMode> SCAN_MODE =
+ ConfigOptions.key("scan.mode")
+ .enumType(StartupMode.class)
+ .defaultValue(StartupMode.DEFAULT)
+ .withDeprecatedKeys("log.scan")
+ .withDescription("Specify the scanning behavior of the source.");
- public static final ConfigOption<Long> LOG_SCAN_TIMESTAMP_MILLS =
- ConfigOptions.key("log.scan.timestamp-millis")
+ public static final ConfigOption<Long> SCAN_TIMESTAMP_MILLIS =
+ ConfigOptions.key("scan.timestamp-millis")
.longType()
.noDefaultValue()
+ .withDeprecatedKeys("log.scan.timestamp-millis")
.withDescription(
- "Optional timestamp used in case of \"from-timestamp\" scan mode");
+ "Optional timestamp used in case of \"from-timestamp\" scan mode.");
public static final ConfigOption<Duration> LOG_RETENTION =
ConfigOptions.key("log.retention")
@@ -526,12 +529,25 @@ public class CoreOptions implements Serializable {
return options.get(CHANGELOG_PRODUCER);
}
- public LogStartupMode logStartupMode() {
- return options.get(LOG_SCAN);
+ public StartupMode startupMode() {
+ return startupMode(options);
+ }
+
+ public static StartupMode startupMode(ReadableConfig options) {
+ StartupMode mode = options.get(SCAN_MODE);
+ if (mode == StartupMode.DEFAULT) {
+ if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()) {
+ return StartupMode.FROM_TIMESTAMP;
+ } else {
+ return StartupMode.FULL;
+ }
+ } else {
+ return mode;
+ }
}
public Long logScanTimestampMills() {
- return options.get(LOG_SCAN_TIMESTAMP_MILLS);
+ return options.get(SCAN_TIMESTAMP_MILLIS);
}
public Duration changelogProducerFullCompactionTriggerInterval() {
@@ -582,20 +598,37 @@ public class CoreOptions implements Serializable {
}
/** Specifies the startup mode for log consumer. */
- public enum LogStartupMode implements DescribedEnum {
+ public enum StartupMode implements DescribedEnum {
+ DEFAULT(
+ "deafult",
+ "Determines actual startup mode according to other table properties. "
+ + "If \"scan.timestamp-millis\" is set the actual startup mode will be \"from-timestamp\". "
+ + "Otherwise the actual startup mode will be \"full\"."),
+
FULL(
"full",
- "Perform a snapshot on the table upon first startup,"
- + " and continue to read the latest changes."),
-
- LATEST("latest", "Start from the latest."),
-
- FROM_TIMESTAMP("from-timestamp", "Start from user-supplied timestamp.");
+ "For streaming sources, produces a snapshot on the table upon first startup,"
+ + " and continue to read the latest changes. "
+ + "For batch sources, just produce a snapshot but does not read new changes."),
+
+ LATEST(
+ "latest",
+ "For streaming sources, continuously reads latest changes "
+ + "without producing a snapshot at the beginning. "
+ + "For batch sources, behaves the same as the \"full\" startup mode."),
+
+ FROM_TIMESTAMP(
+ "from-timestamp",
+ "For streaming sources, continuously reads changes "
+ + "starting from timestamp specified by \"scan.timestamp-millis\", "
+ + "without producing a snapshot at the beginning. "
+ + "For batch sources, produces a snapshot at timestamp specified by \"scan.timestamp-millis\" "
+ + "but does not read new changes.");
private final String value;
private final String description;
- LogStartupMode(String value, String description) {
+ StartupMode(String value, String description) {
this.value = value;
this.description = description;
}
@@ -707,8 +740,8 @@ public class CoreOptions implements Serializable {
* @param options the options to set default values
*/
public static void setDefaultValues(Configuration options) {
- if (options.contains(LOG_SCAN_TIMESTAMP_MILLS) && !options.contains(LOG_SCAN)) {
- options.set(LOG_SCAN, LogStartupMode.FROM_TIMESTAMP);
+ if (options.contains(SCAN_TIMESTAMP_MILLIS) && !options.contains(SCAN_MODE)) {
+ options.set(SCAN_MODE, StartupMode.FROM_TIMESTAMP);
}
}
@@ -721,22 +754,22 @@ public class CoreOptions implements Serializable {
*/
public static void validateTableSchema(TableSchema schema) {
CoreOptions options = new CoreOptions(schema.options());
- if (options.logStartupMode() == LogStartupMode.FROM_TIMESTAMP) {
+ if (options.startupMode() == StartupMode.FROM_TIMESTAMP) {
Preconditions.checkArgument(
options.logScanTimestampMills() != null,
String.format(
"%s can not be null when you use %s for %s",
- LOG_SCAN_TIMESTAMP_MILLS.key(),
- LogStartupMode.FROM_TIMESTAMP,
- LOG_SCAN.key()));
+ SCAN_TIMESTAMP_MILLIS.key(),
+ StartupMode.FROM_TIMESTAMP,
+ SCAN_MODE.key()));
} else {
Preconditions.checkArgument(
options.logScanTimestampMills() == null,
String.format(
"%s should be %s when you set %s",
- LOG_SCAN.key(),
- LogStartupMode.FROM_TIMESTAMP,
- LOG_SCAN_TIMESTAMP_MILLS.key()));
+ SCAN_MODE.key(),
+ StartupMode.FROM_TIMESTAMP,
+ SCAN_TIMESTAMP_MILLIS.key()));
}
Preconditions.checkArgument(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
index be2d05e5..b875d220 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
@@ -89,7 +89,7 @@ public class TableStreamingReader {
}
enumerator =
new DeltaSnapshotEnumerator(
- table.location(), scan, CoreOptions.LogStartupMode.FULL, null, null);
+ table.location(), scan, CoreOptions.StartupMode.FULL, null, null);
}
@Nullable
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java
index c345eac5..54d201dd 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java
@@ -41,7 +41,7 @@ public abstract class DataFileSnapshotEnumerator implements SnapshotEnumerator {
private final SnapshotManager snapshotManager;
private final DataTableScan scan;
- private final CoreOptions.LogStartupMode startupMode;
+ private final CoreOptions.StartupMode startupMode;
private @Nullable final Long startupMillis;
private @Nullable Long nextSnapshotId;
@@ -49,7 +49,7 @@ public abstract class DataFileSnapshotEnumerator implements SnapshotEnumerator {
public DataFileSnapshotEnumerator(
Path tablePath,
DataTableScan scan,
- CoreOptions.LogStartupMode startupMode,
+ CoreOptions.StartupMode startupMode,
@Nullable Long startupMillis,
@Nullable Long nextSnapshotId) {
this.snapshotManager = new SnapshotManager(tablePath);
@@ -86,9 +86,9 @@ public abstract class DataFileSnapshotEnumerator implements SnapshotEnumerator {
startupMillis,
String.format(
"%s can not be null when you use %s for %s",
- CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(),
- CoreOptions.LogStartupMode.FROM_TIMESTAMP,
- CoreOptions.LOG_SCAN.key()));
+ CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
+ CoreOptions.StartupMode.FROM_TIMESTAMP,
+ CoreOptions.SCAN_MODE.key()));
startingSnapshotId = snapshotManager.earlierThanTimeMills(startupMillis);
plan = new DataTableScan.DataFilePlan(startingSnapshotId, Collections.emptyList());
break;
@@ -133,7 +133,7 @@ public abstract class DataFileSnapshotEnumerator implements SnapshotEnumerator {
public static DataFileSnapshotEnumerator create(
FileStoreTable table, DataTableScan scan, Long nextSnapshotId) {
Path location = table.location();
- CoreOptions.LogStartupMode startupMode = table.options().logStartupMode();
+ CoreOptions.StartupMode startupMode = table.options().startupMode();
Long startupMillis = table.options().logScanTimestampMills();
switch (table.options().changelogProducer()) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java
index 857ef88d..36642ff8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumerator.java
@@ -40,7 +40,7 @@ public class DeltaSnapshotEnumerator extends DataFileSnapshotEnumerator {
public DeltaSnapshotEnumerator(
Path tablePath,
DataTableScan scan,
- CoreOptions.LogStartupMode startupMode,
+ CoreOptions.StartupMode startupMode,
@Nullable Long startupMillis,
@Nullable Long nextSnapshotId) {
super(tablePath, scan, startupMode, startupMillis, nextSnapshotId);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java
index 85c90f72..ed9e5f16 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumerator.java
@@ -46,7 +46,7 @@ public class FullCompactionChangelogSnapshotEnumerator extends DataFileSnapshotE
Path tablePath,
DataTableScan scan,
int maxLevel,
- CoreOptions.LogStartupMode startupMode,
+ CoreOptions.StartupMode startupMode,
@Nullable Long startupMillis,
@Nullable Long nextSnapshotId) {
super(tablePath, scan.withLevel(maxLevel), startupMode, startupMillis, nextSnapshotId);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java
index 03f221bf..fffe71f4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumerator.java
@@ -44,7 +44,7 @@ public class InputChangelogSnapshotEnumerator extends DataFileSnapshotEnumerator
public InputChangelogSnapshotEnumerator(
Path tablePath,
DataTableScan scan,
- CoreOptions.LogStartupMode startupMode,
+ CoreOptions.StartupMode startupMode,
@Nullable Long startupMillis,
Long nextSnapshotId) {
super(tablePath, scan, startupMode, startupMillis, nextSnapshotId);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/CoreOptionsTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/CoreOptionsTest.java
new file mode 100644
index 00000000..f7c91231
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/CoreOptionsTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CoreOptions}. */
+public class CoreOptionsTest {
+
+ @Test
+ public void testDefaultStartupMode() {
+ Configuration conf = new Configuration();
+ assertThat(conf.get(CoreOptions.SCAN_MODE)).isEqualTo(CoreOptions.StartupMode.DEFAULT);
+ assertThat(new CoreOptions(conf).startupMode()).isEqualTo(CoreOptions.StartupMode.FULL);
+
+ conf = new Configuration();
+ conf.set(CoreOptions.SCAN_TIMESTAMP_MILLIS, System.currentTimeMillis());
+ assertThat(new CoreOptions(conf).startupMode())
+ .isEqualTo(CoreOptions.StartupMode.FROM_TIMESTAMP);
+ }
+
+ @Test
+ public void testStartupModeCompatibility() {
+ Configuration conf = new Configuration();
+ conf.setString("log.scan", "latest");
+ assertThat(new CoreOptions(conf).startupMode()).isEqualTo(CoreOptions.StartupMode.LATEST);
+
+ conf = new Configuration();
+ conf.setString("log.scan.timestamp-millis", String.valueOf(System.currentTimeMillis()));
+ assertThat(new CoreOptions(conf).startupMode())
+ .isEqualTo(CoreOptions.StartupMode.FROM_TIMESTAMP);
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 8342bbd8..aae4a784 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -374,7 +374,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
SnapshotEnumerator enumerator =
new InputChangelogSnapshotEnumerator(
- tablePath, table.newScan(), CoreOptions.LogStartupMode.FULL, null, 1L);
+ tablePath, table.newScan(), CoreOptions.StartupMode.FULL, null, 1L);
FunctionWithException<Integer, Void, Exception> assertNextSnapshot =
i -> {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java
index 65fe9f53..3252d022 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/DeltaSnapshotEnumeratorTest.java
@@ -44,7 +44,7 @@ public class DeltaSnapshotEnumeratorTest extends DataFileSnapshotEnumeratorTestB
TableCommit commit = table.newCommit(commitUser);
SnapshotEnumerator enumerator =
new DeltaSnapshotEnumerator(
- tablePath, table.newScan(), CoreOptions.LogStartupMode.FULL, null, null);
+ tablePath, table.newScan(), CoreOptions.StartupMode.FULL, null, null);
// first call without any snapshot, should return null
assertThat(enumerator.enumerate()).isNull();
@@ -133,7 +133,7 @@ public class DeltaSnapshotEnumeratorTest extends DataFileSnapshotEnumeratorTestB
new DeltaSnapshotEnumerator(
tablePath,
table.newScan(),
- CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+ CoreOptions.StartupMode.FROM_TIMESTAMP,
startMillis,
null);
@@ -192,7 +192,7 @@ public class DeltaSnapshotEnumeratorTest extends DataFileSnapshotEnumeratorTestB
SnapshotEnumerator enumerator =
new DeltaSnapshotEnumerator(
- tablePath, table.newScan(), CoreOptions.LogStartupMode.LATEST, null, null);
+ tablePath, table.newScan(), CoreOptions.StartupMode.LATEST, null, null);
DataTableScan.DataFilePlan plan = enumerator.enumerate();
assertThat(plan.snapshotId).isEqualTo(2);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java
index 72531f4f..9a3e890f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/FullCompactionChangelogSnapshotEnumeratorTest.java
@@ -48,7 +48,7 @@ public class FullCompactionChangelogSnapshotEnumeratorTest
tablePath,
table.newScan(),
table.options().numLevels() - 1,
- CoreOptions.LogStartupMode.FULL,
+ CoreOptions.StartupMode.FULL,
null,
null);
@@ -148,7 +148,7 @@ public class FullCompactionChangelogSnapshotEnumeratorTest
tablePath,
table.newScan(),
table.options().numLevels() - 1,
- CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+ CoreOptions.StartupMode.FROM_TIMESTAMP,
startMillis,
null);
@@ -206,7 +206,7 @@ public class FullCompactionChangelogSnapshotEnumeratorTest
tablePath,
table.newScan(),
table.options().numLevels() - 1,
- CoreOptions.LogStartupMode.LATEST,
+ CoreOptions.StartupMode.LATEST,
null,
null);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java
index 5760110c..2c2fc091 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/InputChangelogSnapshotEnumeratorTest.java
@@ -44,7 +44,7 @@ public class InputChangelogSnapshotEnumeratorTest extends DataFileSnapshotEnumer
TableCommit commit = table.newCommit(commitUser);
SnapshotEnumerator enumerator =
new InputChangelogSnapshotEnumerator(
- tablePath, table.newScan(), CoreOptions.LogStartupMode.FULL, null, null);
+ tablePath, table.newScan(), CoreOptions.StartupMode.FULL, null, null);
// first call without any snapshot, should return null
assertThat(enumerator.enumerate()).isNull();
@@ -134,7 +134,7 @@ public class InputChangelogSnapshotEnumeratorTest extends DataFileSnapshotEnumer
new InputChangelogSnapshotEnumerator(
tablePath,
table.newScan(),
- CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+ CoreOptions.StartupMode.FROM_TIMESTAMP,
startMillis,
null);
@@ -194,7 +194,7 @@ public class InputChangelogSnapshotEnumeratorTest extends DataFileSnapshotEnumer
SnapshotEnumerator enumerator =
new InputChangelogSnapshotEnumerator(
- tablePath, table.newScan(), CoreOptions.LogStartupMode.LATEST, null, null);
+ tablePath, table.newScan(), CoreOptions.StartupMode.LATEST, null, null);
DataTableScan.DataFilePlan plan = enumerator.enumerate();
assertThat(plan.snapshotId).isEqualTo(2);
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
index c1bcc617..ba3ab9e8 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
@@ -25,7 +25,7 @@ import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsIni
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions.LogConsistency;
-import org.apache.flink.table.store.CoreOptions.LogStartupMode;
+import org.apache.flink.table.store.CoreOptions.StartupMode;
import org.apache.flink.table.store.log.LogSourceProvider;
import org.apache.flink.table.types.DataType;
@@ -61,7 +61,7 @@ public class KafkaLogSourceProvider implements LogSourceProvider {
private final LogConsistency consistency;
- private final LogStartupMode scanMode;
+ private final StartupMode scanMode;
@Nullable private final Long timestampMills;
@@ -74,7 +74,7 @@ public class KafkaLogSourceProvider implements LogSourceProvider {
DeserializationSchema<RowData> valueDeserializer,
@Nullable int[][] projectFields,
LogConsistency consistency,
- LogStartupMode scanMode,
+ StartupMode scanMode,
@Nullable Long timestampMills) {
this.topic = topic;
this.properties = properties;
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index d05a06cc..6a76e2eb 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -58,9 +59,8 @@ import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHel
import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
import static org.apache.flink.table.store.CoreOptions.LOG_RETENTION;
-import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
-import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_TIMESTAMP_MILLS;
import static org.apache.flink.table.store.CoreOptions.LogConsistency;
+import static org.apache.flink.table.store.CoreOptions.SCAN_TIMESTAMP_MILLIS;
import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
import static org.apache.flink.table.store.kafka.KafkaLogOptions.TOPIC;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
@@ -198,8 +198,9 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
valueDeserializer,
projectFields,
helper.getOptions().get(LOG_CONSISTENCY),
- helper.getOptions().get(LOG_SCAN),
- helper.getOptions().get(LOG_SCAN_TIMESTAMP_MILLS));
+ // TODO visit all options through CoreOptions
+ CoreOptions.startupMode(helper.getOptions()),
+ helper.getOptions().get(SCAN_TIMESTAMP_MILLIS));
}
@Override