You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/17 10:44:58 UTC

[flink-table-store] branch master updated: [FLINK-26679] Add FileStore Continuous Reading ITCase

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b34179  [FLINK-26679] Add FileStore Continuous Reading ITCase
5b34179 is described below

commit 5b34179725ef8dfe204df06e14111b060ecba013
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Mar 17 18:44:54 2022 +0800

    [FLINK-26679] Add FileStore Continuous Reading ITCase
    
    This closes #48
---
 .../flink/table/store/connector/TableStore.java    |  12 ++
 .../table/store/connector/TableStoreFactory.java   | 123 ++++++++------
 .../store/connector/TableStoreFactoryOptions.java  |   3 +-
 .../table/store/connector/sink/TableStoreSink.java |   4 +-
 .../source/ContinuousFileSplitEnumerator.java      |   2 +-
 .../store/connector/source/FileStoreSource.java    |  30 +++-
 .../store/connector/source/TableStoreSource.java   |   4 +-
 .../store/connector/ContinuousFileStoreITCase.java | 181 +++++++++++++++++++++
 .../store/connector/TableStoreFactoryTest.java     |  13 --
 .../table/store/connector/TableStoreTestBase.java  |   2 +
 .../table/store/file/operation/FileStoreScan.java  |   2 +
 .../store/file/operation/FileStoreScanImpl.java    |   5 +
 12 files changed, 306 insertions(+), 75 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 64f8da4..acfd094 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.connector.files.shaded.org.apache.flink.connector.base.source.hybrid.HybridSource;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -45,6 +46,7 @@ import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunct
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
 import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.log.LogOptions.LogStartupMode;
 import org.apache.flink.table.store.log.LogSinkProvider;
 import org.apache.flink.table.store.log.LogSourceProvider;
 import org.apache.flink.table.store.utils.TypeUtils;
@@ -60,6 +62,8 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
 import static org.apache.flink.table.store.file.FileStoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.log.LogOptions.SCAN;
 
 /** A table store api to create source and sink. */
 @Experimental
@@ -214,11 +218,19 @@ public class TableStore {
 
         private FileStoreSource buildFileSource(boolean isContinuous) {
             FileStore fileStore = buildFileStore();
+
+            boolean latestContinuous = false;
+            if (isContinuous) {
+                LogStartupMode startupMode =
+                        new DelegatingConfiguration(options, LOG_PREFIX).get(SCAN);
+                latestContinuous = startupMode == LogStartupMode.LATEST;
+            }
             return new FileStoreSource(
                     fileStore,
                     primaryKeys.length == 0,
                     isContinuous,
                     discoveryIntervalMills(),
+                    latestContinuous,
                     projectedFields,
                     partitionPredicate,
                     fieldPredicate);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
index 198ba1b..782e331 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
@@ -22,9 +22,11 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -40,7 +42,9 @@ import org.apache.flink.table.store.connector.source.TableStoreSource;
 import org.apache.flink.table.store.connector.utils.TableConfigUtils;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
-import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
+import org.apache.flink.table.store.log.LogOptions.LogConsistency;
+import org.apache.flink.table.store.log.LogOptions.LogStartupMode;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
@@ -50,6 +54,7 @@ import java.io.UncheckedIOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -59,7 +64,9 @@ import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
 import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
 import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
 import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
+import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
 import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+import static org.apache.flink.table.store.log.LogOptions.SCAN;
 import static org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
 
 /** Default implementation of {@link ManagedTableFactory}. */
@@ -107,13 +114,16 @@ public class TableStoreFactory
         }
 
         if (enableChangeTracking(options)) {
-            createLogStoreTableFactory(context)
-                    .onCreateTable(
-                            createLogContext(context),
-                            Integer.parseInt(
-                                    options.getOrDefault(
-                                            BUCKET.key(), BUCKET.defaultValue().toString())),
-                            ignoreIfExists);
+            createOptionalLogStoreFactory(context)
+                    .ifPresent(
+                            factory ->
+                                    factory.onCreateTable(
+                                            createLogContext(context),
+                                            Integer.parseInt(
+                                                    options.getOrDefault(
+                                                            BUCKET.key(),
+                                                            BUCKET.defaultValue().toString())),
+                                            ignoreIfExists));
         }
     }
 
@@ -136,8 +146,11 @@ public class TableStoreFactory
             throw new UncheckedIOException(e);
         }
         if (enableChangeTracking(options)) {
-            createLogStoreTableFactory(context)
-                    .onDropTable(createLogContext(context), ignoreIfNotExists);
+            createOptionalLogStoreFactory(context)
+                    .ifPresent(
+                            factory ->
+                                    factory.onDropTable(
+                                            createLogContext(context), ignoreIfNotExists));
         }
     }
 
@@ -149,37 +162,26 @@ public class TableStoreFactory
 
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
-        boolean streaming =
-                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
-                        == RuntimeExecutionMode.STREAMING;
-        Context logStoreContext = null;
-        LogStoreTableFactory logStoreTableFactory = null;
-
-        if (enableChangeTracking(context.getCatalogTable().getOptions())) {
-            logStoreContext = createLogContext(context);
-            logStoreTableFactory = createLogStoreTableFactory(context);
-        }
         return new TableStoreSource(
-                buildTableStore(context), streaming, logStoreContext, logStoreTableFactory);
+                buildTableStore(context),
+                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING,
+                createLogContext(context),
+                createOptionalLogStoreFactory(context).orElse(null));
     }
 
     @Override
     public DynamicTableSink createDynamicTableSink(Context context) {
-        Map<String, String> options = context.getCatalogTable().getOptions();
-        Context logStoreContext = null;
-        LogStoreTableFactory logStoreTableFactory = null;
-        if (enableChangeTracking(options)) {
-            logStoreContext = createLogContext(context);
-            logStoreTableFactory = createLogStoreTableFactory(context);
-        }
         return new TableStoreSink(
                 buildTableStore(context),
-                LogOptions.LogChangelogMode.valueOf(
-                        options.getOrDefault(
-                                LOG_PREFIX + CHANGELOG_MODE.key(),
-                                CHANGELOG_MODE.defaultValue().toString().toUpperCase())),
-                logStoreContext,
-                logStoreTableFactory);
+                LogChangelogMode.valueOf(
+                        context.getCatalogTable()
+                                .getOptions()
+                                .getOrDefault(
+                                        LOG_PREFIX + CHANGELOG_MODE.key(),
+                                        CHANGELOG_MODE.defaultValue().toString().toUpperCase())),
+                createLogContext(context),
+                createOptionalLogStoreFactory(context).orElse(null));
     }
 
     @Override
@@ -197,12 +199,42 @@ public class TableStoreFactory
 
     // ~ Tools ------------------------------------------------------------------
 
-    private static LogStoreTableFactory createLogStoreTableFactory(Context context) {
-        return discoverLogStoreFactory(
-                context.getClassLoader(),
-                context.getCatalogTable()
-                        .getOptions()
-                        .getOrDefault(LOG_SYSTEM.key(), LOG_SYSTEM.defaultValue()));
+    private static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(Context context) {
+        Configuration options = new Configuration();
+        context.getCatalogTable().getOptions().forEach(options::setString);
+
+        if (!options.get(CHANGE_TRACKING)) {
+            return Optional.empty();
+        }
+
+        if (options.get(LOG_SYSTEM) == null) {
+            // Use file store continuous reading
+            validateFileStoreContinuous(options);
+            return Optional.empty();
+        }
+
+        return Optional.of(
+                discoverLogStoreFactory(context.getClassLoader(), options.get(LOG_SYSTEM)));
+    }
+
+    private static void validateFileStoreContinuous(Configuration options) {
+        Configuration logOptions = new DelegatingConfiguration(options, LOG_PREFIX);
+        LogChangelogMode changelogMode = logOptions.get(CHANGELOG_MODE);
+        if (changelogMode == LogChangelogMode.UPSERT) {
+            throw new ValidationException(
+                    "File store continuous reading dose not support upsert changelog mode.");
+        }
+        LogConsistency consistency = logOptions.get(CONSISTENCY);
+        if (consistency == LogConsistency.EVENTUAL) {
+            throw new ValidationException(
+                    "File store continuous reading dose not support eventual consistency mode.");
+        }
+        LogStartupMode startupMode = logOptions.get(SCAN);
+        if (startupMode == LogStartupMode.FROM_TIMESTAMP) {
+            throw new ValidationException(
+                    "File store continuous reading dose not support from_timestamp scan mode, "
+                            + "you can add timestamp filters instead.");
+        }
     }
 
     private static Context createLogContext(Context context) {
@@ -219,6 +251,7 @@ public class TableStoreFactory
     @VisibleForTesting
     static Map<String, String> filterLogStoreOptions(Map<String, String> options) {
         return options.entrySet().stream()
+                .filter(entry -> !entry.getKey().equals(LOG_SYSTEM.key())) // exclude log.system
                 .filter(entry -> entry.getKey().startsWith(LOG_PREFIX))
                 .collect(
                         Collectors.toMap(
@@ -227,13 +260,6 @@ public class TableStoreFactory
     }
 
     @VisibleForTesting
-    static Map<String, String> filterFileStoreOptions(Map<String, String> options) {
-        return options.entrySet().stream()
-                .filter(entry -> !entry.getKey().startsWith(LOG_PREFIX))
-                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-    }
-
-    @VisibleForTesting
     static Path tablePath(Map<String, String> options, ObjectIdentifier identifier) {
         Preconditions.checkArgument(
                 options.containsKey(FILE_PATH.key()),
@@ -271,8 +297,7 @@ public class TableStoreFactory
                             .mapToInt(rowType.getFieldNames()::indexOf)
                             .toArray();
         }
-        return new TableStore(
-                        Configuration.fromMap(filterFileStoreOptions(catalogTable.getOptions())))
+        return new TableStore(Configuration.fromMap(catalogTable.getOptions()))
                 .withTableIdentifier(context.getObjectIdentifier())
                 .withSchema(rowType)
                 .withPrimaryKeys(primaryKeys)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
index 21393ac..c6cde95 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigOptions;
 /** Options for {@link TableStoreFactory}. */
 public class TableStoreFactoryOptions {
 
+    // TODO remove CHANGE_TRACKING, just ignore changes for overwrite
     public static final ConfigOption<Boolean> CHANGE_TRACKING =
             ConfigOptions.key("change-tracking")
                     .booleanType()
@@ -34,6 +35,6 @@ public class TableStoreFactoryOptions {
     public static final ConfigOption<String> LOG_SYSTEM =
             ConfigOptions.key("log.system")
                     .stringType()
-                    .defaultValue("kafka")
+                    .noDefaultValue()
                     .withDescription("The log system used to keep changes of the table.");
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index 9b442a3..808024c 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -47,7 +47,7 @@ public class TableStoreSink
 
     private final TableStore tableStore;
     private final LogOptions.LogChangelogMode logChangelogMode;
-    @Nullable private final DynamicTableFactory.Context logStoreContext;
+    private final DynamicTableFactory.Context logStoreContext;
     @Nullable private final LogStoreTableFactory logStoreTableFactory;
 
     private Map<String, String> staticPartitions = new HashMap<>();
@@ -57,7 +57,7 @@ public class TableStoreSink
     public TableStoreSink(
             TableStore tableStore,
             LogOptions.LogChangelogMode logChangelogMode,
-            @Nullable DynamicTableFactory.Context logStoreContext,
+            DynamicTableFactory.Context logStoreContext,
             @Nullable LogStoreTableFactory logStoreTableFactory) {
         this.tableStore = tableStore;
         this.logChangelogMode = logChangelogMode;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
index 35e42cf..ef0e4db 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
@@ -172,8 +172,8 @@ public class ContinuousFileSplitEnumerator
                             if (!context.registeredReaders().containsKey(task)) {
                                 return;
                             }
+                            context.assignSplit(splits.poll(), task);
                         }
-                        context.assignSplit(splits.poll(), task);
                     }
                 });
     }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 36b7c68..ba38ae3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -33,9 +33,11 @@ import org.apache.flink.table.store.file.predicate.Predicate;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.Collection;
 
 import static org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** {@link Source} of file store. */
 public class FileStoreSource
@@ -51,6 +53,8 @@ public class FileStoreSource
 
     private final long discoveryInterval;
 
+    private final boolean latestContinuous;
+
     @Nullable private final int[][] projectedFields;
 
     @Nullable private final Predicate partitionPredicate;
@@ -62,13 +66,15 @@ public class FileStoreSource
             boolean valueCountMode,
             boolean isContinuous,
             long discoveryInterval,
+            boolean latestContinuous,
             @Nullable int[][] projectedFields,
             @Nullable Predicate partitionPredicate,
-            final Predicate fieldPredicate) {
+            @Nullable Predicate fieldPredicate) {
         this.fileStore = fileStore;
         this.valueCountMode = valueCountMode;
         this.isContinuous = isContinuous;
         this.discoveryInterval = discoveryInterval;
+        this.latestContinuous = latestContinuous;
         this.projectedFields = projectedFields;
         this.partitionPredicate = partitionPredicate;
         this.fieldPredicate = fieldPredicate;
@@ -76,8 +82,7 @@ public class FileStoreSource
 
     @Override
     public Boundedness getBoundedness() {
-        // TODO supports streaming reading for file store
-        return Boundedness.BOUNDED;
+        return isContinuous ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
     }
 
     @Override
@@ -126,10 +131,20 @@ public class FileStoreSource
         Long snapshotId;
         Collection<FileStoreSourceSplit> splits;
         if (checkpoint == null) {
-            FileStoreScan.Plan plan = scan.plan();
-            snapshotId = plan.snapshotId();
-            splits = new FileStoreSourceSplitGenerator().createSplits(plan);
+            // first, create new enumerator, plan splits
+            if (latestContinuous) {
+                checkArgument(
+                        isContinuous,
+                        "The latest continuous can only be true when isContinuous is true.");
+                snapshotId = scan.latestSnapshot();
+                splits = new ArrayList<>();
+            } else {
+                FileStoreScan.Plan plan = scan.plan();
+                snapshotId = plan.snapshotId();
+                splits = new FileStoreSourceSplitGenerator().createSplits(plan);
+            }
         } else {
+            // restore from checkpoint
             snapshotId = checkpoint.currentSnapshotId();
             if (snapshotId == INVALID_SNAPSHOT) {
                 snapshotId = null;
@@ -137,11 +152,12 @@ public class FileStoreSource
             splits = checkpoint.splits();
         }
 
+        // create enumerator from snapshotId and splits
         if (isContinuous) {
             long currentSnapshot = snapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : snapshotId;
             return new ContinuousFileSplitEnumerator(
                     context,
-                    scan.withIncremental(true),
+                    scan.withIncremental(true), // the subsequent planning is all incremental
                     splits,
                     currentSnapshot,
                     discoveryInterval);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 66c2da2..6dfbc99 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -58,7 +58,7 @@ public class TableStoreSource
 
     private final TableStore tableStore;
     private final boolean streaming;
-    @Nullable private final DynamicTableFactory.Context logStoreContext;
+    private final DynamicTableFactory.Context logStoreContext;
     @Nullable private final LogStoreTableFactory logStoreTableFactory;
 
     private List<ResolvedExpression> partitionFilters = new ArrayList<>();
@@ -68,7 +68,7 @@ public class TableStoreSource
     public TableStoreSource(
             TableStore tableStore,
             boolean streaming,
-            @Nullable DynamicTableFactory.Context logStoreContext,
+            DynamicTableFactory.Context logStoreContext,
             @Nullable LogStoreTableFactory logStoreTableFactory) {
         this.tableStore = tableStore;
         this.streaming = streaming;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
new file mode 100644
index 0000000..e5eebbb
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
+import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** SQL ITCase for continuous file store. */
+public class ContinuousFileStoreITCase extends AbstractTestBase {
+
+    private TableEnvironment bEnv;
+    private TableEnvironment sEnv;
+
+    @Before
+    public void before() throws IOException {
+        bEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+        sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
+        String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+        prepareEnv(bEnv, path);
+        prepareEnv(sEnv, path);
+    }
+
+    private void prepareEnv(TableEnvironment env, String path) {
+        Configuration config = env.getConfig().getConfiguration();
+        config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+        config.setString(TABLE_STORE_PREFIX + FILE_PATH.key(), path);
+        env.executeSql("CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)");
+        env.executeSql(
+                "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)");
+    }
+
+    @Test
+    public void testWithoutPrimaryKey() throws ExecutionException, InterruptedException {
+        testSimple("T1");
+    }
+
+    @Test
+    public void testWithPrimaryKey() throws ExecutionException, InterruptedException {
+        testSimple("T2");
+    }
+
+    @Test
+    public void testProjectionWithoutPrimaryKey() throws ExecutionException, InterruptedException {
+        testProjection("T1");
+    }
+
+    @Test
+    public void testProjectionWithPrimaryKey() throws ExecutionException, InterruptedException {
+        testProjection("T2");
+    }
+
+    private void testSimple(String table) throws ExecutionException, InterruptedException {
+        CloseableIterator<Row> iterator = sEnv.executeSql("SELECT * FROM " + table).collect();
+
+        bEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table))
+                .await();
+        assertThat(collectFromUnbounded(iterator, 2))
+                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
+
+        bEnv.executeSql(String.format("INSERT INTO %s VALUES ('7', '8', '9')", table)).await();
+        assertThat(collectFromUnbounded(iterator, 1))
+                .containsExactlyInAnyOrder(Row.of("7", "8", "9"));
+    }
+
+    private void testProjection(String table) throws ExecutionException, InterruptedException {
+        CloseableIterator<Row> iterator = sEnv.executeSql("SELECT b, c FROM " + table).collect();
+
+        bEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table))
+                .await();
+        assertThat(collectFromUnbounded(iterator, 2))
+                .containsExactlyInAnyOrder(Row.of("2", "3"), Row.of("5", "6"));
+
+        bEnv.executeSql(String.format("INSERT INTO %s VALUES ('7', '8', '9')", table)).await();
+        assertThat(collectFromUnbounded(iterator, 1)).containsExactlyInAnyOrder(Row.of("8", "9"));
+    }
+
+    @Test
+    public void testContinuousLatest() throws ExecutionException, InterruptedException {
+        bEnv.executeSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')").await();
+
+        CloseableIterator<Row> iterator =
+                sEnv.executeSql("SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest') */").collect();
+
+        bEnv.executeSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')").await();
+        assertThat(collectFromUnbounded(iterator, 2))
+                .containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10", "11", "12"));
+    }
+
+    @Test
+    public void testUnsupportedUpsert() {
+        assertThatThrownBy(
+                () ->
+                        sEnv.executeSql(
+                                        "SELECT * FROM T1 /*+ OPTIONS('log.changelog-mode'='upsert') */")
+                                .collect(),
+                "File store continuous reading dose not support upsert changelog mode");
+    }
+
+    @Test
+    public void testUnsupportedEventual() {
+        assertThatThrownBy(
+                () ->
+                        sEnv.executeSql(
+                                        "SELECT * FROM T1 /*+ OPTIONS('log.consistency'='eventual') */")
+                                .collect(),
+                "File store continuous reading dose not support eventual consistency mode");
+    }
+
+    @Test
+    public void testUnsupportedStartupTimestamp() {
+        assertThatThrownBy(
+                () ->
+                        sEnv.executeSql(
+                                        "SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp') */")
+                                .collect(),
+                "File store continuous reading dose not support from_timestamp scan mode, "
+                        + "you can add timestamp filters instead.");
+    }
+
+    private List<Row> collectFromUnbounded(CloseableIterator<Row> iterator, int numElements) {
+        if (numElements == 0) {
+            return Collections.emptyList();
+        }
+
+        List<Row> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+
+            if (result.size() == numElements) {
+                return result;
+            }
+        }
+
+        throw new IllegalArgumentException(
+                String.format(
+                        "The stream ended before reaching the requested %d records. Only %d records were received.",
+                        numElements, result.size()));
+    }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
index 9b8d006..bc356eb 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
@@ -163,19 +163,6 @@ public class TableStoreFactoryTest {
     }
 
     @Test
-    public void testFilterFileStoreOptions() {
-        // mix invalid key and leave value to empty to emphasize the deferred validation
-        Map<String, String> expectedFileStoreOptions =
-                of("dummy.key", "", FILE_PATH.key(), "dummy:/foo/bar");
-        Map<String, String> enrichedOptions = new HashMap<>(expectedFileStoreOptions);
-        enrichedOptions.put("log.foo", "bar");
-        enrichedOptions.put("log.bar", "foo");
-
-        assertThat(TableStoreFactory.filterFileStoreOptions(enrichedOptions))
-                .containsExactlyInAnyOrderEntriesOf(expectedFileStoreOptions);
-    }
-
-    @Test
     public void testTablePath() {
         Map<String, String> options = of(FILE_PATH.key(), "dummy:/foo/bar");
         assertThat(TableStoreFactory.tablePath(options, TABLE_IDENTIFIER))
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index a1795ff..83df9fd 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -51,6 +51,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
 import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
 import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
 import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
@@ -113,6 +114,7 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
         configuration.setString(
                 TABLE_STORE_PREFIX + LOG_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers());
         configuration.setBoolean(TABLE_STORE_PREFIX + CHANGE_TRACKING.key(), enableChangeTracking);
+        configuration.setString(TABLE_STORE_PREFIX + LOG_SYSTEM.key(), "kafka");
     }
 
     protected static ResolvedCatalogTable createResolvedTable(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 3718dd2..403adc7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -38,6 +38,8 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 /** Scan operation which produces a plan. */
 public interface FileStoreScan {
 
+    Long latestSnapshot();
+
     boolean snapshotExists(long snapshotId);
 
     Snapshot snapshot(long snapshotId);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
index bccb0f2..8a5e307 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -79,6 +79,11 @@ public class FileStoreScanImpl implements FileStoreScan {
     }
 
     @Override
+    public Long latestSnapshot() {
+        return pathFactory.latestSnapshotId();
+    }
+
+    @Override
     public boolean snapshotExists(long snapshotId) {
         Path path = pathFactory.toSnapshotPath(snapshotId);
         try {