You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/12/12 08:15:00 UTC

[flink-table-store] branch master updated: [FLINK-30341] Introduce audit_log system table

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 81f4efc1 [FLINK-30341] Introduce audit_log system table
81f4efc1 is described below

commit 81f4efc101620a134601c69a3b873f2a8d39fb70
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Dec 12 16:14:54 2022 +0800

    [FLINK-30341] Introduce audit_log system table
    
    This closes #428
---
 .../flink/table/store/utils/ProjectedRowData.java  |   7 +-
 .../store/connector/AbstractTableStoreFactory.java |  12 +-
 .../flink/table/store/connector/FlinkCatalog.java  |   2 +-
 ...taCatalogTable.java => SystemCatalogTable.java} |   8 +-
 .../source/ContinuousFileStoreSource.java          |  13 +-
 .../store/connector/source/FlinkSourceBuilder.java |   8 +-
 ...MetadataSource.java => SimpleSystemSource.java} |  18 +-
 .../connector/source/StaticFileStoreSource.java    |   6 +-
 ...dataTableSource.java => SystemTableSource.java} |  32 ++-
 .../table/store/connector/CatalogITCaseBase.java   |  39 ++-
 .../table/store/connector/CatalogTableITCase.java  |   6 +-
 .../connector/FullCompactionFileStoreITCase.java   |  63 ++++-
 .../source/FileStoreSourceSplitGeneratorTest.java  |   3 +-
 .../org/apache/flink/table/store/CoreOptions.java  |   4 +
 .../table/store/file/catalog/AbstractCatalog.java  |  16 +-
 .../flink/table/store/file/catalog/Catalog.java    |   2 +-
 .../store/table/AppendOnlyFileStoreTable.java      |   6 +-
 .../table/ChangelogValueCountFileStoreTable.java   |   6 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |   6 +-
 .../table/{FileStoreTable.java => DataTable.java}  |  42 +--
 .../flink/table/store/table/FileStoreTable.java    |  15 +-
 ...taTableScan.java => AbstractDataTableScan.java} |  38 +--
 .../table/store/table/source/DataTableScan.java    | 137 +---------
 .../ContinuousDataFileSnapshotEnumerator.java      |  10 +-
 .../snapshot/StaticDataFileSnapshotEnumerator.java |   5 +-
 .../table/store/table/system/AuditLogTable.java    | 282 +++++++++++++++++++++
 .../table/{metadata => system}/OptionsTable.java   |   6 +-
 .../table/{metadata => system}/SchemasTable.java   |   6 +-
 .../table/{metadata => system}/SnapshotsTable.java |   6 +-
 .../SystemTableLoader.java}                        |  23 +-
 .../store/table/AppendOnlyFileStoreTableTest.java  |   4 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  80 ++++++
 .../flink/table/store/spark/SparkReadITCase.java   |   2 +-
 33 files changed, 594 insertions(+), 319 deletions(-)

diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/ProjectedRowData.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/ProjectedRowData.java
index 9f45ec88..35a71bfb 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/ProjectedRowData.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/ProjectedRowData.java
@@ -42,11 +42,11 @@ import java.util.Arrays;
  */
 public class ProjectedRowData implements RowData {
 
-    private final int[] indexMapping;
+    protected final int[] indexMapping;
 
-    private RowData row;
+    protected RowData row;
 
-    private ProjectedRowData(int[] indexMapping) {
+    protected ProjectedRowData(int[] indexMapping) {
         this.indexMapping = indexMapping;
     }
 
@@ -81,6 +81,7 @@ public class ProjectedRowData implements RowData {
     @Override
     public boolean isNullAt(int pos) {
         if (indexMapping[pos] < 0) {
+            // TODO move this logical to hive
             return true;
         }
         return row.isNullAt(indexMapping[pos]);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
index eb638104..082aa959 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
@@ -34,7 +34,7 @@ import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
 import org.apache.flink.table.store.CoreOptions.LogConsistency;
 import org.apache.flink.table.store.connector.sink.TableStoreSink;
-import org.apache.flink.table.store.connector.source.MetadataTableSource;
+import org.apache.flink.table.store.connector.source.SystemTableSource;
 import org.apache.flink.table.store.connector.source.TableStoreSource;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
@@ -64,14 +64,16 @@ public abstract class AbstractTableStoreFactory
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
         CatalogTable origin = context.getCatalogTable().getOrigin();
-        if (origin instanceof MetadataCatalogTable) {
-            return new MetadataTableSource(((MetadataCatalogTable) origin).table());
+        boolean isStreamingMode =
+                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING;
+        if (origin instanceof SystemCatalogTable) {
+            return new SystemTableSource(((SystemCatalogTable) origin).table(), isStreamingMode);
         }
         return new TableStoreSource(
                 context.getObjectIdentifier(),
                 buildFileStoreTable(context),
-                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
-                        == RuntimeExecutionMode.STREAMING,
+                isStreamingMode,
                 context,
                 createOptionalLogStoreFactory(context).orElse(null));
     }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index d3ada964..253714db 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -164,7 +164,7 @@ public class FlinkCatalog extends AbstractCatalog {
                     .put(PATH.key(), catalog.getTableLocation(tablePath).toString());
             return catalogTable;
         } else {
-            return new MetadataCatalogTable(table);
+            return new SystemCatalogTable(table);
         }
     }
 
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/MetadataCatalogTable.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/SystemCatalogTable.java
similarity index 90%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/MetadataCatalogTable.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/SystemCatalogTable.java
index 43ceb72e..b7b3d892 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/MetadataCatalogTable.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/SystemCatalogTable.java
@@ -28,12 +28,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-/** A {@link CatalogTable} to represent metadata table. */
-public class MetadataCatalogTable implements CatalogTable {
+/** A {@link CatalogTable} to represent system table. */
+public class SystemCatalogTable implements CatalogTable {
 
     private final Table table;
 
-    public MetadataCatalogTable(Table table) {
+    public SystemCatalogTable(Table table) {
         this.table = table;
     }
 
@@ -60,7 +60,7 @@ public class MetadataCatalogTable implements CatalogTable {
 
     @Override
     public CatalogTable copy(Map<String, String> map) {
-        return new MetadataCatalogTable(table.copy(map));
+        return new SystemCatalogTable(table.copy(map));
     }
 
     @Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
index bf9dd90d..00c52d16 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.DataTable;
 import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
 
@@ -34,20 +34,17 @@ import java.util.Collection;
 /** Unbounded {@link FlinkSource} for reading records. It continuously monitors new snapshots. */
 public class ContinuousFileStoreSource extends FlinkSource {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
-    private final FileStoreTable table;
-    private final long discoveryInterval;
+    private final DataTable table;
 
     public ContinuousFileStoreSource(
-            FileStoreTable table,
-            long discoveryInterval,
+            DataTable table,
             @Nullable int[][] projectedFields,
             @Nullable Predicate predicate,
             @Nullable Long limit) {
         super(table, projectedFields, predicate, limit);
         this.table = table;
-        this.discoveryInterval = discoveryInterval;
     }
 
     @Override
@@ -75,7 +72,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
                 context,
                 splits,
                 nextSnapshotId,
-                discoveryInterval,
+                table.options().continuousDiscoveryInterval().toMillis(),
                 ContinuousDataFileSnapshotEnumerator.create(table, scan, nextSnapshotId));
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index 97217cee..bb007e26 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -42,7 +42,6 @@ import javax.annotation.Nullable;
 
 import java.util.Optional;
 
-import static org.apache.flink.table.store.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
 import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
 
 /** Source builder to build a Flink {@link Source}. */
@@ -108,17 +107,12 @@ public class FlinkSourceBuilder {
         return this;
     }
 
-    private long discoveryIntervalMills() {
-        return conf.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
-    }
-
     private StaticFileStoreSource buildStaticFileSource() {
         return new StaticFileStoreSource(table, projectedFields, predicate, limit);
     }
 
     private ContinuousFileStoreSource buildContinuousFileSource() {
-        return new ContinuousFileStoreSource(
-                table, discoveryIntervalMills(), projectedFields, predicate, limit);
+        return new ContinuousFileStoreSource(table, projectedFields, predicate, limit);
     }
 
     private Source<RowData, ?, ?> buildSource() {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/SimpleSystemSource.java
similarity index 79%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataSource.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/SimpleSystemSource.java
index 9155c27c..0925eabe 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/SimpleSystemSource.java
@@ -27,12 +27,14 @@ import org.apache.flink.table.store.table.source.TableScan;
 
 import javax.annotation.Nullable;
 
-/** A {@link FlinkSource} for metadata table. */
-public class MetadataSource extends FlinkSource {
+import java.util.Collection;
 
-    private static final long serialVersionUID = 1L;
+/** A {@link FlinkSource} for system table. */
+public class SimpleSystemSource extends FlinkSource {
 
-    public MetadataSource(
+    private static final long serialVersionUID = 2L;
+
+    public SimpleSystemSource(
             Table table,
             @Nullable int[][] projectedFields,
             @Nullable Predicate predicate,
@@ -54,7 +56,11 @@ public class MetadataSource extends FlinkSource {
             scan.withFilter(predicate);
         }
 
-        return new StaticFileStoreSplitEnumerator(
-                context, null, new FileStoreSourceSplitGenerator().createSplits(scan.plan()));
+        Collection<FileStoreSourceSplit> splits =
+                checkpoint == null
+                        ? new FileStoreSourceSplitGenerator().createSplits(scan.plan())
+                        : checkpoint.splits();
+
+        return new StaticFileStoreSplitEnumerator(context, null, splits);
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
index e3f67955..c479c322 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.DataTable;
 import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
 import org.apache.flink.table.store.table.source.snapshot.StaticDataFileSnapshotEnumerator;
@@ -39,10 +39,10 @@ public class StaticFileStoreSource extends FlinkSource {
 
     private static final long serialVersionUID = 1L;
 
-    private final FileStoreTable table;
+    private final DataTable table;
 
     public StaticFileStoreSource(
-            FileStoreTable table,
+            DataTable table,
             @Nullable int[][] projectedFields,
             @Nullable Predicate predicate,
             @Nullable Long limit) {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataTableSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/SystemTableSource.java
similarity index 59%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataTableSource.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/SystemTableSource.java
index 63941196..45bf4854 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataTableSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/SystemTableSource.java
@@ -18,31 +18,38 @@
 
 package org.apache.flink.table.store.connector.source;
 
+import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.table.DataTable;
 import org.apache.flink.table.store.table.Table;
 
 import javax.annotation.Nullable;
 
-/** A {@link FlinkTableSource} for metadata table. */
-public class MetadataTableSource extends FlinkTableSource {
+/** A {@link FlinkTableSource} for system table. */
+public class SystemTableSource extends FlinkTableSource {
 
     private final Table table;
+    private final boolean isStreamingMode;
 
-    public MetadataTableSource(Table table) {
+    public SystemTableSource(Table table, boolean isStreamingMode) {
         super(table);
         this.table = table;
+        this.isStreamingMode = isStreamingMode;
     }
 
-    public MetadataTableSource(
+    public SystemTableSource(
             Table table,
+            boolean isStreamingMode,
             @Nullable Predicate predicate,
             @Nullable int[][] projectFields,
             @Nullable Long limit) {
         super(table, predicate, projectFields, limit);
         this.table = table;
+        this.isStreamingMode = isStreamingMode;
     }
 
     @Override
@@ -52,16 +59,27 @@ public class MetadataTableSource extends FlinkTableSource {
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
-        return SourceProvider.of(new MetadataSource(table, projectFields, predicate, limit));
+        Source<RowData, ?, ?> source;
+        if (table instanceof DataTable) {
+            DataTable dataTable = (DataTable) table;
+            source =
+                    isStreamingMode
+                            ? new ContinuousFileStoreSource(
+                                    dataTable, projectFields, predicate, limit)
+                            : new StaticFileStoreSource(dataTable, projectFields, predicate, limit);
+        } else {
+            source = new SimpleSystemSource(table, projectFields, predicate, limit);
+        }
+        return SourceProvider.of(source);
     }
 
     @Override
     public DynamicTableSource copy() {
-        return new MetadataTableSource(table, predicate, projectFields, limit);
+        return new SystemTableSource(table, isStreamingMode, predicate, projectFields, limit);
     }
 
     @Override
     public String asSummaryString() {
-        return "TableStore-MetadataSource";
+        return "TableStore-SystemTable-Source";
     }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
index c705e21f..17d066cd 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.table.store.connector;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -34,24 +36,49 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
 import org.junit.Before;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.List;
 
+import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+
 /** ITCase for catalog. */
 public abstract class CatalogITCaseBase extends AbstractTestBase {
 
     protected TableEnvironment tEnv;
+    protected TableEnvironment sEnv;
 
     @Before
     public void before() throws IOException {
         tEnv =
                 TableEnvironmentTestUtils.create(
                         EnvironmentSettings.newInstance().inBatchMode().build());
+        String catalog = "TABLE_STORE";
         tEnv.executeSql(
                 String.format(
-                        "CREATE CATALOG TABLE_STORE WITH ("
-                                + "'type'='table-store', 'warehouse'='%s')",
-                        TEMPORARY_FOLDER.newFolder().toURI()));
-        tEnv.useCatalog("TABLE_STORE");
+                        "CREATE CATALOG %s WITH (" + "'type'='table-store', 'warehouse'='%s')",
+                        catalog, TEMPORARY_FOLDER.newFolder().toURI()));
+        tEnv.useCatalog(catalog);
+
+        sEnv =
+                TableEnvironmentTestUtils.create(
+                        EnvironmentSettings.newInstance().inStreamingMode().build());
+        sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
+        sEnv.registerCatalog(catalog, tEnv.getCatalog(catalog).get());
+        sEnv.useCatalog(catalog);
+
+        prepareConfiguration(tEnv);
+        prepareConfiguration(sEnv);
+    }
+
+    private void prepareConfiguration(TableEnvironment env) {
+        Configuration config = env.getConfig().getConfiguration();
+        config.set(
+                ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
+                defaultParallelism());
+    }
+
+    protected int defaultParallelism() {
+        return 2;
     }
 
     protected List<Row> sql(String query, Object... args) throws Exception {
@@ -60,6 +87,10 @@ public abstract class CatalogITCaseBase extends AbstractTestBase {
         }
     }
 
+    protected CloseableIterator<Row> streamSqlIter(String query, Object... args) {
+        return sEnv.executeSql(String.format(query, args)).collect();
+    }
+
     protected CatalogTable table(String tableName) throws TableNotExistException {
         Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get();
         CatalogBaseTable table =
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java
index 9aa1d8bb..f2f652d5 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java
@@ -25,7 +25,7 @@ import org.junit.Test;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.table.store.file.catalog.Catalog.METADATA_TABLE_SPLITTER;
+import static org.apache.flink.table.store.file.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -64,12 +64,12 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                 .hasRootCauseMessage(
                         String.format(
                                 "Table name[%s] cannot contain '%s' separator",
-                                "T$snapshots", METADATA_TABLE_SPLITTER));
+                                "T$snapshots", SYSTEM_TABLE_SPLITTER));
         assertThatThrownBy(() -> sql("CREATE TABLE T$aa$bb (a INT, b INT)"))
                 .hasRootCauseMessage(
                         String.format(
                                 "Table name[%s] cannot contain '%s' separator",
-                                "T$aa$bb", METADATA_TABLE_SPLITTER));
+                                "T$aa$bb", SYSTEM_TABLE_SPLITTER));
     }
 
     @Test
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
index 81d7f3bc..47fb004e 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
@@ -20,23 +20,26 @@ package org.apache.flink.table.store.connector;
 
 import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 
+import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Arrays;
-import java.util.List;
+import java.io.IOException;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** SQL ITCase for continuous file store. */
-public class FullCompactionFileStoreITCase extends FileStoreTableITCase {
+public class FullCompactionFileStoreITCase extends CatalogITCaseBase {
     private final String table = "T";
 
     @Override
-    protected List<String> ddl() {
+    @Before
+    public void before() throws IOException {
+        super.before();
         String options =
                 " WITH('changelog-producer'='full-compaction', 'changelog-producer.compaction-interval' = '1s')";
-        return Arrays.asList(
+        tEnv.executeSql(
                 "CREATE TABLE IF NOT EXISTS T (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)"
                         + options);
     }
@@ -46,11 +49,11 @@ public class FullCompactionFileStoreITCase extends FileStoreTableITCase {
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
 
-        batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
+        sql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
         assertThat(iterator.collect(2))
                 .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
 
-        batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
+        sql("INSERT INTO %s VALUES ('7', '8', '9')", table);
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
     }
 
@@ -61,15 +64,55 @@ public class FullCompactionFileStoreITCase extends FileStoreTableITCase {
                         streamSqlIter(
                                 "SELECT * FROM %s /*+ OPTIONS('scan.mode'='compacted') */", table));
 
-        batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
+        sql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
         assertThat(iterator.collect(2))
                 .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
 
-        batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
+        sql("INSERT INTO %s VALUES ('7', '8', '9')", table);
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
 
-        assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='compacted') */"))
+        assertThat(sql("SELECT * FROM T /*+ OPTIONS('scan.mode'='compacted') */"))
                 .containsExactlyInAnyOrder(
                         Row.of("1", "2", "3"), Row.of("4", "5", "6"), Row.of("7", "8", "9"));
     }
+
+    @Test
+    public void testUpdate() throws Exception {
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
+
+        sql("INSERT INTO %s VALUES ('1', '2', '3')", table);
+        assertThat(iterator.collect(1))
+                .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1", "2", "3"));
+
+        sql("INSERT INTO %s VALUES ('1', '4', '5')", table);
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(
+                        Row.ofKind(RowKind.UPDATE_BEFORE, "1", "2", "3"),
+                        Row.ofKind(RowKind.UPDATE_AFTER, "1", "4", "5"));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testUpdateAuditLog() throws Exception {
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(streamSqlIter("SELECT * FROM %s$audit_log", table));
+
+        sql("INSERT INTO %s VALUES ('1', '2', '3')", table);
+        assertThat(iterator.collect(1))
+                .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "+I", "1", "2", "3"));
+
+        sql("INSERT INTO %s VALUES ('1', '4', '5')", table);
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(
+                        Row.ofKind(RowKind.INSERT, "-U", "1", "2", "3"),
+                        Row.ofKind(RowKind.INSERT, "+U", "1", "4", "5"));
+
+        iterator.close();
+
+        // BATCH mode
+        assertThat(sql("SELECT * FROM %s$audit_log", table))
+                .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "+I", "1", "4", "5"));
+    }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 9e30c242..00ccf65f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.store.file.manifest.FileKind;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
+import org.apache.flink.table.store.table.source.AbstractDataTableScan;
 import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.store.table.source.DataTableScan;
 
@@ -76,7 +77,7 @@ public class FileStoreSourceSplitGeneratorTest {
                     }
                 };
         List<DataSplit> scanSplits =
-                DataTableScan.generateSplits(
+                AbstractDataTableScan.generateSplits(
                         false,
                         Collections::singletonList,
                         plan.groupByPartFiles(plan.files(FileKind.ADD)));
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index f4512740..aa8d40f5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -466,6 +466,10 @@ public class CoreOptions implements Serializable {
         return options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore);
     }
 
+    public Duration continuousDiscoveryInterval() {
+        return options.get(CONTINUOUS_DISCOVERY_INTERVAL);
+    }
+
     public int localSortMaxNumFileHandles() {
         return options.get(LOCAL_SORT_MAX_NUM_FILE_HANDLES);
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
index 6cbbdcf0..4b7a7b70 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
 import org.apache.flink.table.store.table.Table;
-import org.apache.flink.table.store.table.metadata.MetadataTableLoader;
+import org.apache.flink.table.store.table.system.SystemTableLoader;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -34,11 +34,11 @@ public abstract class AbstractCatalog implements Catalog {
 
     @Override
     public Path getTableLocation(ObjectPath tablePath) {
-        if (tablePath.getObjectName().contains(METADATA_TABLE_SPLITTER)) {
+        if (tablePath.getObjectName().contains(SYSTEM_TABLE_SPLITTER)) {
             throw new IllegalArgumentException(
                     String.format(
                             "Table name[%s] cannot contain '%s' separator",
-                            tablePath.getObjectName(), METADATA_TABLE_SPLITTER));
+                            tablePath.getObjectName(), SYSTEM_TABLE_SPLITTER));
         }
         return new Path(databasePath(tablePath.getDatabaseName()), tablePath.getObjectName());
     }
@@ -46,21 +46,21 @@ public abstract class AbstractCatalog implements Catalog {
     @Override
     public Table getTable(ObjectPath tablePath) throws TableNotExistException {
         String inputTableName = tablePath.getObjectName();
-        if (inputTableName.contains(METADATA_TABLE_SPLITTER)) {
-            String[] splits = StringUtils.split(inputTableName, METADATA_TABLE_SPLITTER);
+        if (inputTableName.contains(SYSTEM_TABLE_SPLITTER)) {
+            String[] splits = StringUtils.split(inputTableName, SYSTEM_TABLE_SPLITTER);
             if (splits.length != 2) {
                 throw new IllegalArgumentException(
-                        "Metadata table can only contain one '$' separator, but this is: "
+                        "System table can only contain one '$' separator, but this is: "
                                 + inputTableName);
             }
             String table = splits[0];
-            String metadata = splits[1];
+            String type = splits[1];
             ObjectPath originTablePath = new ObjectPath(tablePath.getDatabaseName(), table);
             if (!tableExists(originTablePath)) {
                 throw new TableNotExistException(tablePath);
             }
             Path location = getTableLocation(originTablePath);
-            return MetadataTableLoader.load(metadata, location);
+            return SystemTableLoader.load(type, location);
         } else {
             TableSchema tableSchema = getTableSchema(tablePath);
             return FileStoreTableFactory.create(getTableLocation(tablePath), tableSchema);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index a29977cf..919f6067 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -36,7 +36,7 @@ public interface Catalog extends AutoCloseable {
 
     String DEFAULT_DATABASE = "default";
 
-    String METADATA_TABLE_SPLITTER = "$";
+    String SYSTEM_TABLE_SPLITTER = "$";
 
     /**
      * Get lock factory from catalog. Lock is used to support multiple concurrent writes on the
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 52dd03f6..04adebfd 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -32,9 +32,9 @@ import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.sink.TableWriteImpl;
+import org.apache.flink.table.store.table.source.AbstractDataTableScan;
 import org.apache.flink.table.store.table.source.AppendOnlySplitGenerator;
 import org.apache.flink.table.store.table.source.DataSplit;
-import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.SplitGenerator;
 import org.apache.flink.table.store.table.source.TableRead;
@@ -75,9 +75,9 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
     }
 
     @Override
-    public DataTableScan newScan() {
+    public AbstractDataTableScan newScan() {
         AppendOnlyFileStoreScan scan = store().newScan();
-        return new DataTableScan(scan, tableSchema, store().pathFactory(), options()) {
+        return new AbstractDataTableScan(scan, tableSchema, store().pathFactory(), options()) {
             @Override
             protected SplitGenerator splitGenerator(FileStorePathFactory pathFactory) {
                 return new AppendOnlySplitGenerator(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index b6ba6111..0da5c85b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -38,7 +38,7 @@ import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.sink.TableWriteImpl;
-import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.AbstractDataTableScan;
 import org.apache.flink.table.store.table.source.KeyValueTableRead;
 import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
 import org.apache.flink.table.store.table.source.SplitGenerator;
@@ -90,9 +90,9 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
     }
 
     @Override
-    public DataTableScan newScan() {
+    public AbstractDataTableScan newScan() {
         KeyValueFileStoreScan scan = store().newScan();
-        return new DataTableScan(scan, tableSchema, store().pathFactory(), options()) {
+        return new AbstractDataTableScan(scan, tableSchema, store().pathFactory(), options()) {
             @Override
             protected SplitGenerator splitGenerator(FileStorePathFactory pathFactory) {
                 return new MergeTreeSplitGenerator(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 803535c4..f92cfd07 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -41,7 +41,7 @@ import org.apache.flink.table.store.table.sink.SequenceGenerator;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.sink.TableWriteImpl;
-import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.AbstractDataTableScan;
 import org.apache.flink.table.store.table.source.KeyValueTableRead;
 import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
 import org.apache.flink.table.store.table.source.SplitGenerator;
@@ -146,9 +146,9 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
     }
 
     @Override
-    public DataTableScan newScan() {
+    public AbstractDataTableScan newScan() {
         KeyValueFileStoreScan scan = store().newScan();
-        return new DataTableScan(scan, tableSchema, store().pathFactory(), options()) {
+        return new AbstractDataTableScan(scan, tableSchema, store().pathFactory(), options()) {
             @Override
             protected SplitGenerator splitGenerator(FileStorePathFactory pathFactory) {
                 return new MergeTreeSplitGenerator(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/DataTable.java
similarity index 55%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/DataTable.java
index b806ea2d..60e62685 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/DataTable.java
@@ -20,52 +20,18 @@ package org.apache.flink.table.store.table;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.table.sink.BucketComputer;
 import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.types.logical.RowType;
 
-import java.util.List;
-import java.util.Map;
+/** A {@link Table} for data. */
+public interface DataTable extends Table {
 
-/**
- * An abstraction layer above {@link org.apache.flink.table.store.file.FileStore} to provide reading
- * and writing of {@link org.apache.flink.table.data.RowData}.
- */
-public interface FileStoreTable extends Table, SupportsPartition, SupportsWrite {
+    @Override
+    DataTableScan newScan();
 
     CoreOptions options();
 
     Path location();
 
-    @Override
-    default String name() {
-        return location().getName();
-    }
-
-    @Override
-    default RowType rowType() {
-        return schema().logicalRowType();
-    }
-
-    @Override
-    default List<String> partitionKeys() {
-        return schema().partitionKeys();
-    }
-
-    TableSchema schema();
-
     SnapshotManager snapshotManager();
-
-    @Override
-    FileStoreTable copy(Map<String, String> dynamicOptions);
-
-    @Override
-    DataTableScan newScan();
-
-    @Override
-    default BucketComputer bucketComputer() {
-        return new BucketComputer(schema());
-    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index b806ea2d..877a5690 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -18,12 +18,8 @@
 
 package org.apache.flink.table.store.table;
 
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.sink.BucketComputer;
-import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.List;
@@ -33,11 +29,7 @@ import java.util.Map;
  * An abstraction layer above {@link org.apache.flink.table.store.file.FileStore} to provide reading
  * and writing of {@link org.apache.flink.table.data.RowData}.
  */
-public interface FileStoreTable extends Table, SupportsPartition, SupportsWrite {
-
-    CoreOptions options();
-
-    Path location();
+public interface FileStoreTable extends DataTable, SupportsPartition, SupportsWrite {
 
     @Override
     default String name() {
@@ -56,14 +48,9 @@ public interface FileStoreTable extends Table, SupportsPartition, SupportsWrite
 
     TableSchema schema();
 
-    SnapshotManager snapshotManager();
-
     @Override
     FileStoreTable copy(Map<String, String> dynamicOptions);
 
-    @Override
-    DataTableScan newScan();
-
     @Override
     default BucketComputer bucketComputer() {
         return new BucketComputer(schema());
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
similarity index 85%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
index 73e23986..c545aeca 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
@@ -31,8 +31,6 @@ import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -41,7 +39,7 @@ import java.util.Optional;
 import static org.apache.flink.table.store.file.predicate.PredicateBuilder.transformFieldMapping;
 
 /** An abstraction layer above {@link FileStoreScan} to provide input split generation. */
-public abstract class DataTableScan implements TableScan {
+public abstract class AbstractDataTableScan implements DataTableScan {
 
     private final FileStoreScan scan;
     private final TableSchema tableSchema;
@@ -50,7 +48,7 @@ public abstract class DataTableScan implements TableScan {
 
     private ScanKind scanKind = ScanKind.ALL;
 
-    protected DataTableScan(
+    protected AbstractDataTableScan(
             FileStoreScan scan,
             TableSchema tableSchema,
             FileStorePathFactory pathFactory,
@@ -61,13 +59,14 @@ public abstract class DataTableScan implements TableScan {
         this.options = options;
     }
 
-    public DataTableScan withSnapshot(long snapshotId) {
+    @Override
+    public AbstractDataTableScan withSnapshot(long snapshotId) {
         scan.withSnapshot(snapshotId);
         return this;
     }
 
     @Override
-    public DataTableScan withFilter(Predicate predicate) {
+    public AbstractDataTableScan withFilter(Predicate predicate) {
         List<String> partitionKeys = tableSchema.partitionKeys();
         int[] fieldIdxToPartitionIdx =
                 tableSchema.fields().stream()
@@ -94,19 +93,21 @@ public abstract class DataTableScan implements TableScan {
         return this;
     }
 
-    public DataTableScan withKind(ScanKind scanKind) {
+    @Override
+    public AbstractDataTableScan withKind(ScanKind scanKind) {
         this.scanKind = scanKind;
         scan.withKind(scanKind);
         return this;
     }
 
-    public DataTableScan withLevel(int level) {
+    @Override
+    public AbstractDataTableScan withLevel(int level) {
         scan.withLevel(level);
         return this;
     }
 
     @VisibleForTesting
-    public DataTableScan withBucket(int bucket) {
+    public AbstractDataTableScan withBucket(int bucket) {
         scan.withBucket(bucket);
         return this;
     }
@@ -160,23 +161,4 @@ public abstract class DataTableScan implements TableScan {
     public SnapshotManager snapshotManager() {
         return new SnapshotManager(pathFactory.root());
     }
-
-    /** Scanning plan containing snapshot ID and input splits. */
-    public static class DataFilePlan implements Plan {
-
-        @Nullable public final Long snapshotId;
-        public final List<DataSplit> splits;
-
-        @VisibleForTesting
-        public DataFilePlan(@Nullable Long snapshotId, List<DataSplit> splits) {
-            this.snapshotId = snapshotId;
-            this.splits = splits;
-        }
-
-        @SuppressWarnings({"unchecked", "rawtypes"})
-        @Override
-        public List<Split> splits() {
-            return (List) splits;
-        }
-    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
index 73e23986..5509929f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
@@ -19,150 +19,29 @@
 package org.apache.flink.table.store.table.source;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.apache.flink.table.store.file.manifest.FileKind;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.operation.ScanKind;
 import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.file.predicate.PredicateBuilder;
-import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Optional;
 
-import static org.apache.flink.table.store.file.predicate.PredicateBuilder.transformFieldMapping;
+/** A {@link TableScan} for reading data. */
+public interface DataTableScan extends TableScan {
 
-/** An abstraction layer above {@link FileStoreScan} to provide input split generation. */
-public abstract class DataTableScan implements TableScan {
+    DataTableScan withKind(ScanKind kind);
 
-    private final FileStoreScan scan;
-    private final TableSchema tableSchema;
-    private final FileStorePathFactory pathFactory;
-    private final CoreOptions options;
+    DataTableScan withSnapshot(long snapshotId);
 
-    private ScanKind scanKind = ScanKind.ALL;
-
-    protected DataTableScan(
-            FileStoreScan scan,
-            TableSchema tableSchema,
-            FileStorePathFactory pathFactory,
-            CoreOptions options) {
-        this.scan = scan;
-        this.tableSchema = tableSchema;
-        this.pathFactory = pathFactory;
-        this.options = options;
-    }
-
-    public DataTableScan withSnapshot(long snapshotId) {
-        scan.withSnapshot(snapshotId);
-        return this;
-    }
+    DataTableScan withLevel(int level);
 
     @Override
-    public DataTableScan withFilter(Predicate predicate) {
-        List<String> partitionKeys = tableSchema.partitionKeys();
-        int[] fieldIdxToPartitionIdx =
-                tableSchema.fields().stream()
-                        .mapToInt(f -> partitionKeys.indexOf(f.name()))
-                        .toArray();
+    DataTableScan withFilter(Predicate predicate);
 
-        List<Predicate> partitionFilters = new ArrayList<>();
-        List<Predicate> nonPartitionFilters = new ArrayList<>();
-        for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
-            Optional<Predicate> mapped = transformFieldMapping(p, fieldIdxToPartitionIdx);
-            if (mapped.isPresent()) {
-                partitionFilters.add(mapped.get());
-            } else {
-                nonPartitionFilters.add(p);
-            }
-        }
-
-        if (partitionFilters.size() > 0) {
-            scan.withPartitionFilter(PredicateBuilder.and(partitionFilters));
-        }
-        if (nonPartitionFilters.size() > 0) {
-            withNonPartitionFilter(PredicateBuilder.and(nonPartitionFilters));
-        }
-        return this;
-    }
-
-    public DataTableScan withKind(ScanKind scanKind) {
-        this.scanKind = scanKind;
-        scan.withKind(scanKind);
-        return this;
-    }
-
-    public DataTableScan withLevel(int level) {
-        scan.withLevel(level);
-        return this;
-    }
-
-    @VisibleForTesting
-    public DataTableScan withBucket(int bucket) {
-        scan.withBucket(bucket);
-        return this;
-    }
-
-    @Override
-    public DataFilePlan plan() {
-        FileStoreScan.Plan plan = scan.plan();
-        return new DataFilePlan(
-                plan.snapshotId(), generateSplits(plan.groupByPartFiles(plan.files(FileKind.ADD))));
-    }
-
-    private List<DataSplit> generateSplits(
-            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupedDataFiles) {
-        return generateSplits(
-                scanKind != ScanKind.ALL, splitGenerator(pathFactory), groupedDataFiles);
-    }
-
-    @VisibleForTesting
-    public static List<DataSplit> generateSplits(
-            boolean isIncremental,
-            SplitGenerator splitGenerator,
-            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupedDataFiles) {
-        List<DataSplit> splits = new ArrayList<>();
-        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> entry :
-                groupedDataFiles.entrySet()) {
-            BinaryRowData partition = entry.getKey();
-            Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
-            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry : buckets.entrySet()) {
-                int bucket = bucketEntry.getKey();
-                if (isIncremental) {
-                    // Don't split when incremental
-                    splits.add(new DataSplit(partition, bucket, bucketEntry.getValue(), true));
-                } else {
-                    splitGenerator.split(bucketEntry.getValue()).stream()
-                            .map(files -> new DataSplit(partition, bucket, files, false))
-                            .forEach(splits::add);
-                }
-            }
-        }
-        return splits;
-    }
-
-    protected abstract SplitGenerator splitGenerator(FileStorePathFactory pathFactory);
-
-    protected abstract void withNonPartitionFilter(Predicate predicate);
-
-    public CoreOptions options() {
-        return options;
-    }
-
-    public SnapshotManager snapshotManager() {
-        return new SnapshotManager(pathFactory.root());
-    }
+    DataTableScan.DataFilePlan plan();
 
     /** Scanning plan containing snapshot ID and input splits. */
-    public static class DataFilePlan implements Plan {
+    class DataFilePlan implements Plan {
 
         @Nullable public final Long snapshotId;
         public final List<DataSplit> splits;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
index d1a462ef..bb9debfa 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
@@ -24,7 +24,7 @@ import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.DataTable;
 import org.apache.flink.table.store.table.source.DataTableScan;
 import org.apache.flink.util.Preconditions;
 
@@ -109,7 +109,7 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
     // ------------------------------------------------------------------------
 
     public static ContinuousDataFileSnapshotEnumerator createWithSnapshotStarting(
-            FileStoreTable table, DataTableScan scan) {
+            DataTable table, DataTableScan scan) {
         StartingScanner startingScanner =
                 table.options().startupMode() == CoreOptions.StartupMode.COMPACTED
                         ? new CompactedStartingScanner()
@@ -119,7 +119,7 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
     }
 
     public static ContinuousDataFileSnapshotEnumerator create(
-            FileStoreTable table, DataTableScan scan, @Nullable Long nextSnapshotId) {
+            DataTable table, DataTableScan scan, @Nullable Long nextSnapshotId) {
         return new ContinuousDataFileSnapshotEnumerator(
                 table.location(),
                 scan,
@@ -128,7 +128,7 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
                 nextSnapshotId);
     }
 
-    private static StartingScanner createStartingScanner(FileStoreTable table) {
+    private static StartingScanner createStartingScanner(DataTable table) {
         CoreOptions.StartupMode startupMode = table.options().startupMode();
         Long startupMillis = table.options().logScanTimestampMills();
         if (startupMode == CoreOptions.StartupMode.FULL) {
@@ -151,7 +151,7 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
         }
     }
 
-    private static FollowUpScanner createFollowUpScanner(FileStoreTable table, DataTableScan scan) {
+    private static FollowUpScanner createFollowUpScanner(DataTable table, DataTableScan scan) {
         CoreOptions.ChangelogProducer changelogProducer = table.options().changelogProducer();
         if (changelogProducer == CoreOptions.ChangelogProducer.NONE) {
             return new DeltaFollowUpScanner();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
index f12e9ac1..582ba9c2 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.table.source.snapshot;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.DataTable;
 import org.apache.flink.table.store.table.source.DataTableScan;
 
 import javax.annotation.Nullable;
@@ -59,8 +59,7 @@ public class StaticDataFileSnapshotEnumerator implements SnapshotEnumerator {
     //  static create methods
     // ------------------------------------------------------------------------
 
-    public static StaticDataFileSnapshotEnumerator create(
-            FileStoreTable table, DataTableScan scan) {
+    public static StaticDataFileSnapshotEnumerator create(DataTable table, DataTableScan scan) {
         CoreOptions.StartupMode startupMode = table.options().startupMode();
         StartingScanner startingScanner;
         if (startupMode == CoreOptions.StartupMode.FULL
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
new file mode 100644
index 00000000..52ba766e
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.system;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.file.predicate.LeafPredicate;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.predicate.PredicateReplaceVisitor;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.DataTable;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.Table;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.utils.ProjectedRowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.flink.shaded.guava30.com.google.common.primitives.Ints;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.store.file.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
+import static org.apache.flink.table.store.file.utils.RecordReaderUtils.transform;
+
+/** A {@link Table} for reading audit log of table. */
+public class AuditLogTable implements DataTable {
+
+    public static final String AUDIT_LOG = "audit_log";
+
+    public static final String ROW_KIND = "rowkind";
+
+    public static final PredicateReplaceVisitor PREDICATE_CONVERTER =
+            p -> {
+                if (p.index() == 0) {
+                    return Optional.empty();
+                }
+                return Optional.of(
+                        new LeafPredicate(
+                                p.function(),
+                                p.type(),
+                                p.index() - 1,
+                                p.fieldName(),
+                                p.literals()));
+            };
+
+    private final FileStoreTable dataTable;
+
+    public AuditLogTable(FileStoreTable dataTable) {
+        this.dataTable = dataTable;
+    }
+
+    @Override
+    public String name() {
+        return dataTable.name() + SYSTEM_TABLE_SPLITTER + AUDIT_LOG;
+    }
+
+    @Override
+    public RowType rowType() {
+        List<RowField> fields = new ArrayList<>();
+        fields.add(new RowField(ROW_KIND, new VarCharType(VarCharType.MAX_LENGTH)));
+        fields.addAll(dataTable.rowType().getFields());
+        return new RowType(fields);
+    }
+
+    @Override
+    public DataTableScan newScan() {
+        return new AuditLogScan(dataTable.newScan());
+    }
+
+    @Override
+    public CoreOptions options() {
+        return dataTable.options();
+    }
+
+    @Override
+    public Path location() {
+        return dataTable.location();
+    }
+
+    @Override
+    public SnapshotManager snapshotManager() {
+        return dataTable.snapshotManager();
+    }
+
+    @Override
+    public TableRead newRead() {
+        return new AuditLogRead(dataTable.newRead());
+    }
+
+    @Override
+    public Table copy(Map<String, String> dynamicOptions) {
+        return new AuditLogTable(dataTable.copy(dynamicOptions));
+    }
+
+    /** Push down predicate to dataScan and dataRead. */
+    private Optional<Predicate> convert(Predicate predicate) {
+        List<Predicate> result =
+                PredicateBuilder.splitAnd(predicate).stream()
+                        .map(p -> p.visit(PREDICATE_CONVERTER))
+                        .filter(Optional::isPresent)
+                        .map(Optional::get)
+                        .collect(Collectors.toList());
+        if (result.isEmpty()) {
+            return Optional.empty();
+        }
+        return Optional.of(PredicateBuilder.and(result));
+    }
+
+    private class AuditLogScan implements DataTableScan {
+
+        private final DataTableScan dataScan;
+
+        private AuditLogScan(DataTableScan dataScan) {
+            this.dataScan = dataScan;
+        }
+
+        @Override
+        public DataTableScan withFilter(Predicate predicate) {
+            convert(predicate).ifPresent(dataScan::withFilter);
+            return this;
+        }
+
+        @Override
+        public DataTableScan withKind(ScanKind kind) {
+            dataScan.withKind(kind);
+            return this;
+        }
+
+        @Override
+        public DataTableScan withSnapshot(long snapshotId) {
+            dataScan.withSnapshot(snapshotId);
+            return this;
+        }
+
+        @Override
+        public DataTableScan withLevel(int level) {
+            dataScan.withLevel(level);
+            return this;
+        }
+
+        @Override
+        public DataTableScan.DataFilePlan plan() {
+            return dataScan.plan();
+        }
+    }
+
+    private class AuditLogRead implements TableRead {
+
+        private final TableRead dataRead;
+
+        private int[] readProjection;
+
+        private AuditLogRead(TableRead dataRead) {
+            this.dataRead = dataRead;
+            this.readProjection = defaultProjection();
+        }
+
+        /** Default projection, just add row kind to the first. */
+        private int[] defaultProjection() {
+            int dataFieldCount = dataTable.rowType().getFieldCount();
+            int[] projection = new int[dataFieldCount + 1];
+            projection[0] = -1;
+            for (int i = 0; i < dataFieldCount; i++) {
+                projection[i + 1] = i;
+            }
+            return projection;
+        }
+
+        @Override
+        public TableRead withFilter(Predicate predicate) {
+            convert(predicate).ifPresent(dataRead::withFilter);
+            return this;
+        }
+
+        @Override
+        public TableRead withProjection(int[][] projection) {
+            // data projection to push down to dataRead
+            List<int[]> dataProjection = new ArrayList<>();
+            // read projection to handle record returned by dataRead
+            List<Integer> readProjection = new ArrayList<>();
+            boolean rowKindAppeared = false;
+            for (int i = 0; i < projection.length; i++) {
+                int[] field = projection[i];
+                int topField = field[0];
+                if (topField == 0) {
+                    rowKindAppeared = true;
+                    readProjection.add(-1);
+                } else {
+                    int[] newField = Arrays.copyOf(field, field.length);
+                    newField[0] = newField[0] - 1;
+                    dataProjection.add(newField);
+
+                    // There is no row kind field. Keep it as it is
+                    // Row kind field has occurred, and the following fields are offset by 1
+                    // position
+                    readProjection.add(rowKindAppeared ? i - 1 : i);
+                }
+            }
+            this.readProjection = Ints.toArray(readProjection);
+            dataRead.withProjection(dataProjection.toArray(new int[0][]));
+            return this;
+        }
+
+        @Override
+        public RecordReader<RowData> createReader(Split split) throws IOException {
+            return transform(dataRead.createReader(split), this::convertRow);
+        }
+
+        private RowData convertRow(RowData data) {
+            return new AuditLogRowData(readProjection, data);
+        }
+    }
+
+    /** A {@link ProjectedRowData} which returns row kind when mapping index is negative. */
+    private static class AuditLogRowData extends ProjectedRowData {
+
+        private AuditLogRowData(int[] indexMapping, RowData row) {
+            super(indexMapping);
+            replaceRow(row);
+        }
+
+        @Override
+        public RowKind getRowKind() {
+            return RowKind.INSERT;
+        }
+
+        @Override
+        public void setRowKind(RowKind kind) {
+            throw new UnsupportedOperationException(
+                    "Set row kind is not supported in AuditLogRowData.");
+        }
+
+        @Override
+        public boolean isNullAt(int pos) {
+            if (indexMapping[pos] < 0) {
+                // row kind is always not null
+                return false;
+            }
+            return super.isNullAt(pos);
+        }
+
+        @Override
+        public StringData getString(int pos) {
+            if (indexMapping[pos] < 0) {
+                return StringData.fromString(row.getRowKind().shortString());
+            }
+            return super.getString(pos);
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/OptionsTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/OptionsTable.java
similarity index 97%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/OptionsTable.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/OptionsTable.java
index dc906b23..c5901267 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/OptionsTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/OptionsTable.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.table.metadata;
+package org.apache.flink.table.store.table.system;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
@@ -42,7 +42,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 
-import static org.apache.flink.table.store.file.catalog.Catalog.METADATA_TABLE_SPLITTER;
+import static org.apache.flink.table.store.file.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
 import static org.apache.flink.table.store.file.utils.SerializationUtils.newStringType;
 
 /** A {@link Table} for showing options of table. */
@@ -66,7 +66,7 @@ public class OptionsTable implements Table {
 
     @Override
     public String name() {
-        return location.getName() + METADATA_TABLE_SPLITTER + OPTIONS;
+        return location.getName() + SYSTEM_TABLE_SPLITTER + OPTIONS;
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SchemasTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SchemasTable.java
similarity index 97%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SchemasTable.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SchemasTable.java
index 7128c854..30619b6f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SchemasTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SchemasTable.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.table.metadata;
+package org.apache.flink.table.store.table.system;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
@@ -46,7 +46,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 
-import static org.apache.flink.table.store.file.catalog.Catalog.METADATA_TABLE_SPLITTER;
+import static org.apache.flink.table.store.file.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
 
 /** A {@link Table} for showing schemas of table. */
 public class SchemasTable implements Table {
@@ -77,7 +77,7 @@ public class SchemasTable implements Table {
 
     @Override
     public String name() {
-        return location.getName() + METADATA_TABLE_SPLITTER + SCHEMAS;
+        return location.getName() + SYSTEM_TABLE_SPLITTER + SCHEMAS;
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SnapshotsTable.java
similarity index 97%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SnapshotsTable.java
index 7d1672f8..8890431e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SnapshotsTable.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.table.metadata;
+package org.apache.flink.table.store.table.system;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
@@ -50,7 +50,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 
-import static org.apache.flink.table.store.file.catalog.Catalog.METADATA_TABLE_SPLITTER;
+import static org.apache.flink.table.store.file.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
 
 /** A {@link Table} for showing committing snapshots of table. */
 public class SnapshotsTable implements Table {
@@ -79,7 +79,7 @@ public class SnapshotsTable implements Table {
 
     @Override
     public String name() {
-        return location.getName() + METADATA_TABLE_SPLITTER + SNAPSHOTS;
+        return location.getName() + SYSTEM_TABLE_SPLITTER + SNAPSHOTS;
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/MetadataTableLoader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SystemTableLoader.java
similarity index 58%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/MetadataTableLoader.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SystemTableLoader.java
index 0b23ada0..620ff8d6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/MetadataTableLoader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SystemTableLoader.java
@@ -16,29 +16,32 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.table.metadata;
+package org.apache.flink.table.store.table.system;
 
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
 import org.apache.flink.table.store.table.Table;
 
-import static org.apache.flink.table.store.table.metadata.OptionsTable.OPTIONS;
-import static org.apache.flink.table.store.table.metadata.SchemasTable.SCHEMAS;
-import static org.apache.flink.table.store.table.metadata.SnapshotsTable.SNAPSHOTS;
+import static org.apache.flink.table.store.table.system.AuditLogTable.AUDIT_LOG;
+import static org.apache.flink.table.store.table.system.OptionsTable.OPTIONS;
+import static org.apache.flink.table.store.table.system.SchemasTable.SCHEMAS;
+import static org.apache.flink.table.store.table.system.SnapshotsTable.SNAPSHOTS;
 
-/** Loader to load metadata {@link Table}s. */
-public class MetadataTableLoader {
+/** Loader to load system {@link Table}s. */
+public class SystemTableLoader {
 
-    public static Table load(String metadata, Path location) {
-        switch (metadata.toLowerCase()) {
+    public static Table load(String type, Path location) {
+        switch (type.toLowerCase()) {
             case SNAPSHOTS:
                 return new SnapshotsTable(location);
             case OPTIONS:
                 return new OptionsTable(location);
             case SCHEMAS:
                 return new SchemasTable(location);
+            case AUDIT_LOG:
+                return new AuditLogTable(FileStoreTableFactory.create(location));
             default:
-                throw new UnsupportedOperationException(
-                        "Unsupported metadata table type: " + metadata);
+                throw new UnsupportedOperationException("Unsupported system table type: " + type);
         }
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 81648141..e1cca63f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.AbstractDataTableScan;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
 
@@ -203,7 +203,7 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         Predicate partitionFilter =
                 new PredicateBuilder(table.schema().logicalRowType()).equal(0, partition);
         List<Split> splits =
-                ((DataTableScan) table.newScan())
+                ((AbstractDataTableScan) table.newScan())
                         .withFilter(partitionFilter)
                         .withBucket(bucket)
                         .plan()
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 34ab1a17..1d47e5c2 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.store.table;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.conversion.RowRowConverter;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
 import org.apache.flink.table.store.file.WriteMode;
@@ -42,6 +44,7 @@ import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnap
 import org.apache.flink.table.store.table.source.snapshot.FullStartingScanner;
 import org.apache.flink.table.store.table.source.snapshot.InputChangelogFollowUpScanner;
 import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
+import org.apache.flink.table.store.table.system.AuditLogTable;
 import org.apache.flink.table.store.utils.CompatibilityTestUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -615,6 +618,83 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                 .isNotEqualTo(splits0.get(0).files().get(0).fileName());
     }
 
+    @Test
+    public void testAuditLog() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER, ChangelogProducer.INPUT));
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        // first commit
+        write.write(rowDataWithKind(RowKind.INSERT, 1, 10, 100L));
+        write.write(rowDataWithKind(RowKind.INSERT, 2, 20, 200L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        // second commit
+        write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 30, 300L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.close();
+        commit.close();
+
+        AuditLogTable auditLogTable = new AuditLogTable(table);
+        RowRowConverter converter =
+                RowRowConverter.create(
+                        DataTypes.ROW(
+                                DataTypes.STRING(),
+                                DataTypes.INT(),
+                                DataTypes.INT(),
+                                DataTypes.BIGINT()));
+        Function<RowData, String> rowDataToString = row -> converter.toExternal(row).toString();
+        PredicateBuilder predicateBuilder = new PredicateBuilder(auditLogTable.rowType());
+
+        // Read all
+        TableScan scan = auditLogTable.newScan();
+        TableRead read = auditLogTable.newRead();
+        List<String> result = getResult(read, scan.plan().splits(), rowDataToString);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        "+I[+I, 2, 20, 200]", "+I[+U, 1, 30, 300]", "+I[+I, 1, 10, 100]");
+
+        // Read by filter row kind (No effect)
+        Predicate rowKindEqual = predicateBuilder.equal(0, StringData.fromString("+I"));
+        scan = auditLogTable.newScan().withFilter(rowKindEqual);
+        read = auditLogTable.newRead().withFilter(rowKindEqual);
+        result = getResult(read, scan.plan().splits(), rowDataToString);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        "+I[+I, 2, 20, 200]", "+I[+U, 1, 30, 300]", "+I[+I, 1, 10, 100]");
+
+        // Read by filter
+        scan = auditLogTable.newScan().withFilter(predicateBuilder.equal(2, 10));
+        read = auditLogTable.newRead().withFilter(predicateBuilder.equal(2, 10));
+        result = getResult(read, scan.plan().splits(), rowDataToString);
+        assertThat(result).containsExactlyInAnyOrder("+I[+I, 1, 10, 100]");
+
+        // Read by projection
+        scan = auditLogTable.newScan();
+        read = auditLogTable.newRead().withProjection(new int[] {2, 0, 1});
+        RowRowConverter projConverter1 =
+                RowRowConverter.create(
+                        DataTypes.ROW(DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()));
+        Function<RowData, String> projectToString1 =
+                row -> projConverter1.toExternal(row).toString();
+        result = getResult(read, scan.plan().splits(), projectToString1);
+        assertThat(result)
+                .containsExactlyInAnyOrder("+I[20, +I, 2]", "+I[30, +U, 1]", "+I[10, +I, 1]");
+
+        // Read by projection without row kind
+        scan = auditLogTable.newScan();
+        read = auditLogTable.newRead().withProjection(new int[] {2, 1});
+        RowRowConverter projConverter2 =
+                RowRowConverter.create(DataTypes.ROW(DataTypes.INT(), DataTypes.INT()));
+        Function<RowData, String> projectToString2 =
+                row -> projConverter2.toExternal(row).toString();
+        result = getResult(read, scan.plan().splits(), projectToString2);
+        assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]", "+I[10, 1]");
+    }
+
     @Override
     protected FileStoreTable createFileStoreTable(Consumer<Configuration> configure)
             throws Exception {
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 4903c59c..1e3e35d3 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -78,7 +78,7 @@ public class SparkReadITCase extends SparkReadTestBase {
     }
 
     @Test
-    public void testMetadataTable() {
+    public void testSnapshotsTable() {
         List<Row> rows =
                 spark.table("tablestore.default.`t1$snapshots`")
                         .select("snapshot_id", "schema_id", "commit_user", "commit_kind")