You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/17 10:44:58 UTC
[flink-table-store] branch master updated: [FLINK-26679] Add FileStore Continuous Reading ITCase
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 5b34179 [FLINK-26679] Add FileStore Continuous Reading ITCase
5b34179 is described below
commit 5b34179725ef8dfe204df06e14111b060ecba013
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Mar 17 18:44:54 2022 +0800
[FLINK-26679] Add FileStore Continuous Reading ITCase
This closes #48
---
.../flink/table/store/connector/TableStore.java | 12 ++
.../table/store/connector/TableStoreFactory.java | 123 ++++++++------
.../store/connector/TableStoreFactoryOptions.java | 3 +-
.../table/store/connector/sink/TableStoreSink.java | 4 +-
.../source/ContinuousFileSplitEnumerator.java | 2 +-
.../store/connector/source/FileStoreSource.java | 30 +++-
.../store/connector/source/TableStoreSource.java | 4 +-
.../store/connector/ContinuousFileStoreITCase.java | 181 +++++++++++++++++++++
.../store/connector/TableStoreFactoryTest.java | 13 --
.../table/store/connector/TableStoreTestBase.java | 2 +
.../table/store/file/operation/FileStoreScan.java | 2 +
.../store/file/operation/FileStoreScanImpl.java | 5 +
12 files changed, 306 insertions(+), 75 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 64f8da4..acfd094 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.connector.files.shaded.org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -45,6 +46,7 @@ import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunct
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.log.LogOptions.LogStartupMode;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.log.LogSourceProvider;
import org.apache.flink.table.store.utils.TypeUtils;
@@ -60,6 +62,8 @@ import java.util.stream.Collectors;
import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
import static org.apache.flink.table.store.file.FileStoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.log.LogOptions.SCAN;
/** A table store api to create source and sink. */
@Experimental
@@ -214,11 +218,19 @@ public class TableStore {
private FileStoreSource buildFileSource(boolean isContinuous) {
FileStore fileStore = buildFileStore();
+
+ boolean latestContinuous = false;
+ if (isContinuous) {
+ LogStartupMode startupMode =
+ new DelegatingConfiguration(options, LOG_PREFIX).get(SCAN);
+ latestContinuous = startupMode == LogStartupMode.LATEST;
+ }
return new FileStoreSource(
fileStore,
primaryKeys.length == 0,
isContinuous,
discoveryIntervalMills(),
+ latestContinuous,
projectedFields,
partitionPredicate,
fieldPredicate);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
index 198ba1b..782e331 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
@@ -22,9 +22,11 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -40,7 +42,9 @@ import org.apache.flink.table.store.connector.source.TableStoreSource;
import org.apache.flink.table.store.connector.utils.TableConfigUtils;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
-import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
+import org.apache.flink.table.store.log.LogOptions.LogConsistency;
+import org.apache.flink.table.store.log.LogOptions.LogStartupMode;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
@@ -50,6 +54,7 @@ import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -59,7 +64,9 @@ import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
+import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.log.LogOptions.SCAN;
import static org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
/** Default implementation of {@link ManagedTableFactory}. */
@@ -107,13 +114,16 @@ public class TableStoreFactory
}
if (enableChangeTracking(options)) {
- createLogStoreTableFactory(context)
- .onCreateTable(
- createLogContext(context),
- Integer.parseInt(
- options.getOrDefault(
- BUCKET.key(), BUCKET.defaultValue().toString())),
- ignoreIfExists);
+ createOptionalLogStoreFactory(context)
+ .ifPresent(
+ factory ->
+ factory.onCreateTable(
+ createLogContext(context),
+ Integer.parseInt(
+ options.getOrDefault(
+ BUCKET.key(),
+ BUCKET.defaultValue().toString())),
+ ignoreIfExists));
}
}
@@ -136,8 +146,11 @@ public class TableStoreFactory
throw new UncheckedIOException(e);
}
if (enableChangeTracking(options)) {
- createLogStoreTableFactory(context)
- .onDropTable(createLogContext(context), ignoreIfNotExists);
+ createOptionalLogStoreFactory(context)
+ .ifPresent(
+ factory ->
+ factory.onDropTable(
+ createLogContext(context), ignoreIfNotExists));
}
}
@@ -149,37 +162,26 @@ public class TableStoreFactory
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
- boolean streaming =
- context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
- == RuntimeExecutionMode.STREAMING;
- Context logStoreContext = null;
- LogStoreTableFactory logStoreTableFactory = null;
-
- if (enableChangeTracking(context.getCatalogTable().getOptions())) {
- logStoreContext = createLogContext(context);
- logStoreTableFactory = createLogStoreTableFactory(context);
- }
return new TableStoreSource(
- buildTableStore(context), streaming, logStoreContext, logStoreTableFactory);
+ buildTableStore(context),
+ context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+ == RuntimeExecutionMode.STREAMING,
+ createLogContext(context),
+ createOptionalLogStoreFactory(context).orElse(null));
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
- Map<String, String> options = context.getCatalogTable().getOptions();
- Context logStoreContext = null;
- LogStoreTableFactory logStoreTableFactory = null;
- if (enableChangeTracking(options)) {
- logStoreContext = createLogContext(context);
- logStoreTableFactory = createLogStoreTableFactory(context);
- }
return new TableStoreSink(
buildTableStore(context),
- LogOptions.LogChangelogMode.valueOf(
- options.getOrDefault(
- LOG_PREFIX + CHANGELOG_MODE.key(),
- CHANGELOG_MODE.defaultValue().toString().toUpperCase())),
- logStoreContext,
- logStoreTableFactory);
+ LogChangelogMode.valueOf(
+ context.getCatalogTable()
+ .getOptions()
+ .getOrDefault(
+ LOG_PREFIX + CHANGELOG_MODE.key(),
+ CHANGELOG_MODE.defaultValue().toString().toUpperCase())),
+ createLogContext(context),
+ createOptionalLogStoreFactory(context).orElse(null));
}
@Override
@@ -197,12 +199,42 @@ public class TableStoreFactory
// ~ Tools ------------------------------------------------------------------
- private static LogStoreTableFactory createLogStoreTableFactory(Context context) {
- return discoverLogStoreFactory(
- context.getClassLoader(),
- context.getCatalogTable()
- .getOptions()
- .getOrDefault(LOG_SYSTEM.key(), LOG_SYSTEM.defaultValue()));
+ private static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(Context context) {
+ Configuration options = new Configuration();
+ context.getCatalogTable().getOptions().forEach(options::setString);
+
+ if (!options.get(CHANGE_TRACKING)) {
+ return Optional.empty();
+ }
+
+ if (options.get(LOG_SYSTEM) == null) {
+ // Use file store continuous reading
+ validateFileStoreContinuous(options);
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ discoverLogStoreFactory(context.getClassLoader(), options.get(LOG_SYSTEM)));
+ }
+
+ private static void validateFileStoreContinuous(Configuration options) {
+ Configuration logOptions = new DelegatingConfiguration(options, LOG_PREFIX);
+ LogChangelogMode changelogMode = logOptions.get(CHANGELOG_MODE);
+ if (changelogMode == LogChangelogMode.UPSERT) {
+ throw new ValidationException(
+ "File store continuous reading dose not support upsert changelog mode.");
+ }
+ LogConsistency consistency = logOptions.get(CONSISTENCY);
+ if (consistency == LogConsistency.EVENTUAL) {
+ throw new ValidationException(
+ "File store continuous reading dose not support eventual consistency mode.");
+ }
+ LogStartupMode startupMode = logOptions.get(SCAN);
+ if (startupMode == LogStartupMode.FROM_TIMESTAMP) {
+ throw new ValidationException(
+ "File store continuous reading dose not support from_timestamp scan mode, "
+ + "you can add timestamp filters instead.");
+ }
}
private static Context createLogContext(Context context) {
@@ -219,6 +251,7 @@ public class TableStoreFactory
@VisibleForTesting
static Map<String, String> filterLogStoreOptions(Map<String, String> options) {
return options.entrySet().stream()
+ .filter(entry -> !entry.getKey().equals(LOG_SYSTEM.key())) // exclude log.system
.filter(entry -> entry.getKey().startsWith(LOG_PREFIX))
.collect(
Collectors.toMap(
@@ -227,13 +260,6 @@ public class TableStoreFactory
}
@VisibleForTesting
- static Map<String, String> filterFileStoreOptions(Map<String, String> options) {
- return options.entrySet().stream()
- .filter(entry -> !entry.getKey().startsWith(LOG_PREFIX))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- }
-
- @VisibleForTesting
static Path tablePath(Map<String, String> options, ObjectIdentifier identifier) {
Preconditions.checkArgument(
options.containsKey(FILE_PATH.key()),
@@ -271,8 +297,7 @@ public class TableStoreFactory
.mapToInt(rowType.getFieldNames()::indexOf)
.toArray();
}
- return new TableStore(
- Configuration.fromMap(filterFileStoreOptions(catalogTable.getOptions())))
+ return new TableStore(Configuration.fromMap(catalogTable.getOptions()))
.withTableIdentifier(context.getObjectIdentifier())
.withSchema(rowType)
.withPrimaryKeys(primaryKeys)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
index 21393ac..c6cde95 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigOptions;
/** Options for {@link TableStoreFactory}. */
public class TableStoreFactoryOptions {
+ // TODO remove CHANGE_TRACKING, just ignore changes for overwrite
public static final ConfigOption<Boolean> CHANGE_TRACKING =
ConfigOptions.key("change-tracking")
.booleanType()
@@ -34,6 +35,6 @@ public class TableStoreFactoryOptions {
public static final ConfigOption<String> LOG_SYSTEM =
ConfigOptions.key("log.system")
.stringType()
- .defaultValue("kafka")
+ .noDefaultValue()
.withDescription("The log system used to keep changes of the table.");
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index 9b442a3..808024c 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -47,7 +47,7 @@ public class TableStoreSink
private final TableStore tableStore;
private final LogOptions.LogChangelogMode logChangelogMode;
- @Nullable private final DynamicTableFactory.Context logStoreContext;
+ private final DynamicTableFactory.Context logStoreContext;
@Nullable private final LogStoreTableFactory logStoreTableFactory;
private Map<String, String> staticPartitions = new HashMap<>();
@@ -57,7 +57,7 @@ public class TableStoreSink
public TableStoreSink(
TableStore tableStore,
LogOptions.LogChangelogMode logChangelogMode,
- @Nullable DynamicTableFactory.Context logStoreContext,
+ DynamicTableFactory.Context logStoreContext,
@Nullable LogStoreTableFactory logStoreTableFactory) {
this.tableStore = tableStore;
this.logChangelogMode = logChangelogMode;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
index 35e42cf..ef0e4db 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
@@ -172,8 +172,8 @@ public class ContinuousFileSplitEnumerator
if (!context.registeredReaders().containsKey(task)) {
return;
}
+ context.assignSplit(splits.poll(), task);
}
- context.assignSplit(splits.poll(), task);
}
});
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 36b7c68..ba38ae3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -33,9 +33,11 @@ import org.apache.flink.table.store.file.predicate.Predicate;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Collection;
import static org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
+import static org.apache.flink.util.Preconditions.checkArgument;
/** {@link Source} of file store. */
public class FileStoreSource
@@ -51,6 +53,8 @@ public class FileStoreSource
private final long discoveryInterval;
+ private final boolean latestContinuous;
+
@Nullable private final int[][] projectedFields;
@Nullable private final Predicate partitionPredicate;
@@ -62,13 +66,15 @@ public class FileStoreSource
boolean valueCountMode,
boolean isContinuous,
long discoveryInterval,
+ boolean latestContinuous,
@Nullable int[][] projectedFields,
@Nullable Predicate partitionPredicate,
- final Predicate fieldPredicate) {
+ @Nullable Predicate fieldPredicate) {
this.fileStore = fileStore;
this.valueCountMode = valueCountMode;
this.isContinuous = isContinuous;
this.discoveryInterval = discoveryInterval;
+ this.latestContinuous = latestContinuous;
this.projectedFields = projectedFields;
this.partitionPredicate = partitionPredicate;
this.fieldPredicate = fieldPredicate;
@@ -76,8 +82,7 @@ public class FileStoreSource
@Override
public Boundedness getBoundedness() {
- // TODO supports streaming reading for file store
- return Boundedness.BOUNDED;
+ return isContinuous ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
}
@Override
@@ -126,10 +131,20 @@ public class FileStoreSource
Long snapshotId;
Collection<FileStoreSourceSplit> splits;
if (checkpoint == null) {
- FileStoreScan.Plan plan = scan.plan();
- snapshotId = plan.snapshotId();
- splits = new FileStoreSourceSplitGenerator().createSplits(plan);
+ // first, create new enumerator, plan splits
+ if (latestContinuous) {
+ checkArgument(
+ isContinuous,
+ "The latest continuous can only be true when isContinuous is true.");
+ snapshotId = scan.latestSnapshot();
+ splits = new ArrayList<>();
+ } else {
+ FileStoreScan.Plan plan = scan.plan();
+ snapshotId = plan.snapshotId();
+ splits = new FileStoreSourceSplitGenerator().createSplits(plan);
+ }
} else {
+ // restore from checkpoint
snapshotId = checkpoint.currentSnapshotId();
if (snapshotId == INVALID_SNAPSHOT) {
snapshotId = null;
@@ -137,11 +152,12 @@ public class FileStoreSource
splits = checkpoint.splits();
}
+ // create enumerator from snapshotId and splits
if (isContinuous) {
long currentSnapshot = snapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : snapshotId;
return new ContinuousFileSplitEnumerator(
context,
- scan.withIncremental(true),
+ scan.withIncremental(true), // the subsequent planning is all incremental
splits,
currentSnapshot,
discoveryInterval);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 66c2da2..6dfbc99 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -58,7 +58,7 @@ public class TableStoreSource
private final TableStore tableStore;
private final boolean streaming;
- @Nullable private final DynamicTableFactory.Context logStoreContext;
+ private final DynamicTableFactory.Context logStoreContext;
@Nullable private final LogStoreTableFactory logStoreTableFactory;
private List<ResolvedExpression> partitionFilters = new ArrayList<>();
@@ -68,7 +68,7 @@ public class TableStoreSource
public TableStoreSource(
TableStore tableStore,
boolean streaming,
- @Nullable DynamicTableFactory.Context logStoreContext,
+ DynamicTableFactory.Context logStoreContext,
@Nullable LogStoreTableFactory logStoreTableFactory) {
this.tableStore = tableStore;
this.streaming = streaming;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
new file mode 100644
index 0000000..e5eebbb
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
+import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** SQL ITCase for continuous file store. */
+public class ContinuousFileStoreITCase extends AbstractTestBase {
+
+ private TableEnvironment bEnv;
+ private TableEnvironment sEnv;
+
+ @Before
+ public void before() throws IOException {
+ bEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+ sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
+ String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+ prepareEnv(bEnv, path);
+ prepareEnv(sEnv, path);
+ }
+
+ private void prepareEnv(TableEnvironment env, String path) {
+ Configuration config = env.getConfig().getConfiguration();
+ config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+ config.setString(TABLE_STORE_PREFIX + FILE_PATH.key(), path);
+ env.executeSql("CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)");
+ env.executeSql(
+ "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)");
+ }
+
+ @Test
+ public void testWithoutPrimaryKey() throws ExecutionException, InterruptedException {
+ testSimple("T1");
+ }
+
+ @Test
+ public void testWithPrimaryKey() throws ExecutionException, InterruptedException {
+ testSimple("T2");
+ }
+
+ @Test
+ public void testProjectionWithoutPrimaryKey() throws ExecutionException, InterruptedException {
+ testProjection("T1");
+ }
+
+ @Test
+ public void testProjectionWithPrimaryKey() throws ExecutionException, InterruptedException {
+ testProjection("T2");
+ }
+
+ private void testSimple(String table) throws ExecutionException, InterruptedException {
+ CloseableIterator<Row> iterator = sEnv.executeSql("SELECT * FROM " + table).collect();
+
+ bEnv.executeSql(
+ String.format(
+ "INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table))
+ .await();
+ assertThat(collectFromUnbounded(iterator, 2))
+ .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
+
+ bEnv.executeSql(String.format("INSERT INTO %s VALUES ('7', '8', '9')", table)).await();
+ assertThat(collectFromUnbounded(iterator, 1))
+ .containsExactlyInAnyOrder(Row.of("7", "8", "9"));
+ }
+
+ private void testProjection(String table) throws ExecutionException, InterruptedException {
+ CloseableIterator<Row> iterator = sEnv.executeSql("SELECT b, c FROM " + table).collect();
+
+ bEnv.executeSql(
+ String.format(
+ "INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table))
+ .await();
+ assertThat(collectFromUnbounded(iterator, 2))
+ .containsExactlyInAnyOrder(Row.of("2", "3"), Row.of("5", "6"));
+
+ bEnv.executeSql(String.format("INSERT INTO %s VALUES ('7', '8', '9')", table)).await();
+ assertThat(collectFromUnbounded(iterator, 1)).containsExactlyInAnyOrder(Row.of("8", "9"));
+ }
+
+ @Test
+ public void testContinuousLatest() throws ExecutionException, InterruptedException {
+ bEnv.executeSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')").await();
+
+ CloseableIterator<Row> iterator =
+ sEnv.executeSql("SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest') */").collect();
+
+ bEnv.executeSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')").await();
+ assertThat(collectFromUnbounded(iterator, 2))
+ .containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10", "11", "12"));
+ }
+
+ @Test
+ public void testUnsupportedUpsert() {
+ assertThatThrownBy(
+ () ->
+ sEnv.executeSql(
+ "SELECT * FROM T1 /*+ OPTIONS('log.changelog-mode'='upsert') */")
+ .collect(),
+ "File store continuous reading dose not support upsert changelog mode");
+ }
+
+ @Test
+ public void testUnsupportedEventual() {
+ assertThatThrownBy(
+ () ->
+ sEnv.executeSql(
+ "SELECT * FROM T1 /*+ OPTIONS('log.consistency'='eventual') */")
+ .collect(),
+ "File store continuous reading dose not support eventual consistency mode");
+ }
+
+ @Test
+ public void testUnsupportedStartupTimestamp() {
+ assertThatThrownBy(
+ () ->
+ sEnv.executeSql(
+ "SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp') */")
+ .collect(),
+ "File store continuous reading dose not support from_timestamp scan mode, "
+ + "you can add timestamp filters instead.");
+ }
+
+ private List<Row> collectFromUnbounded(CloseableIterator<Row> iterator, int numElements) {
+ if (numElements == 0) {
+ return Collections.emptyList();
+ }
+
+ List<Row> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+
+ if (result.size() == numElements) {
+ return result;
+ }
+ }
+
+ throw new IllegalArgumentException(
+ String.format(
+ "The stream ended before reaching the requested %d records. Only %d records were received.",
+ numElements, result.size()));
+ }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
index 9b8d006..bc356eb 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
@@ -163,19 +163,6 @@ public class TableStoreFactoryTest {
}
@Test
- public void testFilterFileStoreOptions() {
- // mix invalid key and leave value to empty to emphasize the deferred validation
- Map<String, String> expectedFileStoreOptions =
- of("dummy.key", "", FILE_PATH.key(), "dummy:/foo/bar");
- Map<String, String> enrichedOptions = new HashMap<>(expectedFileStoreOptions);
- enrichedOptions.put("log.foo", "bar");
- enrichedOptions.put("log.bar", "foo");
-
- assertThat(TableStoreFactory.filterFileStoreOptions(enrichedOptions))
- .containsExactlyInAnyOrderEntriesOf(expectedFileStoreOptions);
- }
-
- @Test
public void testTablePath() {
Map<String, String> options = of(FILE_PATH.key(), "dummy:/foo/bar");
assertThat(TableStoreFactory.tablePath(options, TABLE_IDENTIFIER))
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index a1795ff..83df9fd 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -51,6 +51,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
@@ -113,6 +114,7 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
configuration.setString(
TABLE_STORE_PREFIX + LOG_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers());
configuration.setBoolean(TABLE_STORE_PREFIX + CHANGE_TRACKING.key(), enableChangeTracking);
+ configuration.setString(TABLE_STORE_PREFIX + LOG_SYSTEM.key(), "kafka");
}
protected static ResolvedCatalogTable createResolvedTable(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 3718dd2..403adc7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -38,6 +38,8 @@ import static org.apache.flink.util.Preconditions.checkArgument;
/** Scan operation which produces a plan. */
public interface FileStoreScan {
+ Long latestSnapshot();
+
boolean snapshotExists(long snapshotId);
Snapshot snapshot(long snapshotId);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
index bccb0f2..8a5e307 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -79,6 +79,11 @@ public class FileStoreScanImpl implements FileStoreScan {
}
@Override
+ public Long latestSnapshot() {
+ return pathFactory.latestSnapshotId();
+ }
+
+ @Override
public boolean snapshotExists(long snapshotId) {
Path path = pathFactory.toSnapshotPath(snapshotId);
try {