You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/12/12 08:15:00 UTC
[flink-table-store] branch master updated: [FLINK-30341] Introduce audit_log system table
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 81f4efc1 [FLINK-30341] Introduce audit_log system table
81f4efc1 is described below
commit 81f4efc101620a134601c69a3b873f2a8d39fb70
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Dec 12 16:14:54 2022 +0800
[FLINK-30341] Introduce audit_log system table
This closes #428
---
.../flink/table/store/utils/ProjectedRowData.java | 7 +-
.../store/connector/AbstractTableStoreFactory.java | 12 +-
.../flink/table/store/connector/FlinkCatalog.java | 2 +-
...taCatalogTable.java => SystemCatalogTable.java} | 8 +-
.../source/ContinuousFileStoreSource.java | 13 +-
.../store/connector/source/FlinkSourceBuilder.java | 8 +-
...MetadataSource.java => SimpleSystemSource.java} | 18 +-
.../connector/source/StaticFileStoreSource.java | 6 +-
...dataTableSource.java => SystemTableSource.java} | 32 ++-
.../table/store/connector/CatalogITCaseBase.java | 39 ++-
.../table/store/connector/CatalogTableITCase.java | 6 +-
.../connector/FullCompactionFileStoreITCase.java | 63 ++++-
.../source/FileStoreSourceSplitGeneratorTest.java | 3 +-
.../org/apache/flink/table/store/CoreOptions.java | 4 +
.../table/store/file/catalog/AbstractCatalog.java | 16 +-
.../flink/table/store/file/catalog/Catalog.java | 2 +-
.../store/table/AppendOnlyFileStoreTable.java | 6 +-
.../table/ChangelogValueCountFileStoreTable.java | 6 +-
.../table/ChangelogWithKeyFileStoreTable.java | 6 +-
.../table/{FileStoreTable.java => DataTable.java} | 42 +--
.../flink/table/store/table/FileStoreTable.java | 15 +-
...taTableScan.java => AbstractDataTableScan.java} | 38 +--
.../table/store/table/source/DataTableScan.java | 137 +---------
.../ContinuousDataFileSnapshotEnumerator.java | 10 +-
.../snapshot/StaticDataFileSnapshotEnumerator.java | 5 +-
.../table/store/table/system/AuditLogTable.java | 282 +++++++++++++++++++++
.../table/{metadata => system}/OptionsTable.java | 6 +-
.../table/{metadata => system}/SchemasTable.java | 6 +-
.../table/{metadata => system}/SnapshotsTable.java | 6 +-
.../SystemTableLoader.java} | 23 +-
.../store/table/AppendOnlyFileStoreTableTest.java | 4 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 80 ++++++
.../flink/table/store/spark/SparkReadITCase.java | 2 +-
33 files changed, 594 insertions(+), 319 deletions(-)
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/ProjectedRowData.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/ProjectedRowData.java
index 9f45ec88..35a71bfb 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/ProjectedRowData.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/ProjectedRowData.java
@@ -42,11 +42,11 @@ import java.util.Arrays;
*/
public class ProjectedRowData implements RowData {
- private final int[] indexMapping;
+ protected final int[] indexMapping;
- private RowData row;
+ protected RowData row;
- private ProjectedRowData(int[] indexMapping) {
+ protected ProjectedRowData(int[] indexMapping) {
this.indexMapping = indexMapping;
}
@@ -81,6 +81,7 @@ public class ProjectedRowData implements RowData {
@Override
public boolean isNullAt(int pos) {
if (indexMapping[pos] < 0) {
+ // TODO move this logical to hive
return true;
}
return row.isNullAt(indexMapping[pos]);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
index eb638104..082aa959 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
@@ -34,7 +34,7 @@ import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
import org.apache.flink.table.store.CoreOptions.LogConsistency;
import org.apache.flink.table.store.connector.sink.TableStoreSink;
-import org.apache.flink.table.store.connector.source.MetadataTableSource;
+import org.apache.flink.table.store.connector.source.SystemTableSource;
import org.apache.flink.table.store.connector.source.TableStoreSource;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
@@ -64,14 +64,16 @@ public abstract class AbstractTableStoreFactory
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
CatalogTable origin = context.getCatalogTable().getOrigin();
- if (origin instanceof MetadataCatalogTable) {
- return new MetadataTableSource(((MetadataCatalogTable) origin).table());
+ boolean isStreamingMode =
+ context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+ == RuntimeExecutionMode.STREAMING;
+ if (origin instanceof SystemCatalogTable) {
+ return new SystemTableSource(((SystemCatalogTable) origin).table(), isStreamingMode);
}
return new TableStoreSource(
context.getObjectIdentifier(),
buildFileStoreTable(context),
- context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
- == RuntimeExecutionMode.STREAMING,
+ isStreamingMode,
context,
createOptionalLogStoreFactory(context).orElse(null));
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index d3ada964..253714db 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -164,7 +164,7 @@ public class FlinkCatalog extends AbstractCatalog {
.put(PATH.key(), catalog.getTableLocation(tablePath).toString());
return catalogTable;
} else {
- return new MetadataCatalogTable(table);
+ return new SystemCatalogTable(table);
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/MetadataCatalogTable.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/SystemCatalogTable.java
similarity index 90%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/MetadataCatalogTable.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/SystemCatalogTable.java
index 43ceb72e..b7b3d892 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/MetadataCatalogTable.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/SystemCatalogTable.java
@@ -28,12 +28,12 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
-/** A {@link CatalogTable} to represent metadata table. */
-public class MetadataCatalogTable implements CatalogTable {
+/** A {@link CatalogTable} to represent system table. */
+public class SystemCatalogTable implements CatalogTable {
private final Table table;
- public MetadataCatalogTable(Table table) {
+ public SystemCatalogTable(Table table) {
this.table = table;
}
@@ -60,7 +60,7 @@ public class MetadataCatalogTable implements CatalogTable {
@Override
public CatalogTable copy(Map<String, String> map) {
- return new MetadataCatalogTable(table.copy(map));
+ return new SystemCatalogTable(table.copy(map));
}
@Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
index bf9dd90d..00c52d16 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.DataTable;
import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
@@ -34,20 +34,17 @@ import java.util.Collection;
/** Unbounded {@link FlinkSource} for reading records. It continuously monitors new snapshots. */
public class ContinuousFileStoreSource extends FlinkSource {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
- private final FileStoreTable table;
- private final long discoveryInterval;
+ private final DataTable table;
public ContinuousFileStoreSource(
- FileStoreTable table,
- long discoveryInterval,
+ DataTable table,
@Nullable int[][] projectedFields,
@Nullable Predicate predicate,
@Nullable Long limit) {
super(table, projectedFields, predicate, limit);
this.table = table;
- this.discoveryInterval = discoveryInterval;
}
@Override
@@ -75,7 +72,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
context,
splits,
nextSnapshotId,
- discoveryInterval,
+ table.options().continuousDiscoveryInterval().toMillis(),
ContinuousDataFileSnapshotEnumerator.create(table, scan, nextSnapshotId));
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index 97217cee..bb007e26 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -42,7 +42,6 @@ import javax.annotation.Nullable;
import java.util.Optional;
-import static org.apache.flink.table.store.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
/** Source builder to build a Flink {@link Source}. */
@@ -108,17 +107,12 @@ public class FlinkSourceBuilder {
return this;
}
- private long discoveryIntervalMills() {
- return conf.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
- }
-
private StaticFileStoreSource buildStaticFileSource() {
return new StaticFileStoreSource(table, projectedFields, predicate, limit);
}
private ContinuousFileStoreSource buildContinuousFileSource() {
- return new ContinuousFileStoreSource(
- table, discoveryIntervalMills(), projectedFields, predicate, limit);
+ return new ContinuousFileStoreSource(table, projectedFields, predicate, limit);
}
private Source<RowData, ?, ?> buildSource() {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/SimpleSystemSource.java
similarity index 79%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataSource.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/SimpleSystemSource.java
index 9155c27c..0925eabe 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/SimpleSystemSource.java
@@ -27,12 +27,14 @@ import org.apache.flink.table.store.table.source.TableScan;
import javax.annotation.Nullable;
-/** A {@link FlinkSource} for metadata table. */
-public class MetadataSource extends FlinkSource {
+import java.util.Collection;
- private static final long serialVersionUID = 1L;
+/** A {@link FlinkSource} for system table. */
+public class SimpleSystemSource extends FlinkSource {
- public MetadataSource(
+ private static final long serialVersionUID = 2L;
+
+ public SimpleSystemSource(
Table table,
@Nullable int[][] projectedFields,
@Nullable Predicate predicate,
@@ -54,7 +56,11 @@ public class MetadataSource extends FlinkSource {
scan.withFilter(predicate);
}
- return new StaticFileStoreSplitEnumerator(
- context, null, new FileStoreSourceSplitGenerator().createSplits(scan.plan()));
+ Collection<FileStoreSourceSplit> splits =
+ checkpoint == null
+ ? new FileStoreSourceSplitGenerator().createSplits(scan.plan())
+ : checkpoint.splits();
+
+ return new StaticFileStoreSplitEnumerator(context, null, splits);
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
index e3f67955..c479c322 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.DataTable;
import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
import org.apache.flink.table.store.table.source.snapshot.StaticDataFileSnapshotEnumerator;
@@ -39,10 +39,10 @@ public class StaticFileStoreSource extends FlinkSource {
private static final long serialVersionUID = 1L;
- private final FileStoreTable table;
+ private final DataTable table;
public StaticFileStoreSource(
- FileStoreTable table,
+ DataTable table,
@Nullable int[][] projectedFields,
@Nullable Predicate predicate,
@Nullable Long limit) {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataTableSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/SystemTableSource.java
similarity index 59%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataTableSource.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/SystemTableSource.java
index 63941196..45bf4854 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/MetadataTableSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/SystemTableSource.java
@@ -18,31 +18,38 @@
package org.apache.flink.table.store.connector.source;
+import org.apache.flink.api.connector.source.Source;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.table.DataTable;
import org.apache.flink.table.store.table.Table;
import javax.annotation.Nullable;
-/** A {@link FlinkTableSource} for metadata table. */
-public class MetadataTableSource extends FlinkTableSource {
+/** A {@link FlinkTableSource} for system table. */
+public class SystemTableSource extends FlinkTableSource {
private final Table table;
+ private final boolean isStreamingMode;
- public MetadataTableSource(Table table) {
+ public SystemTableSource(Table table, boolean isStreamingMode) {
super(table);
this.table = table;
+ this.isStreamingMode = isStreamingMode;
}
- public MetadataTableSource(
+ public SystemTableSource(
Table table,
+ boolean isStreamingMode,
@Nullable Predicate predicate,
@Nullable int[][] projectFields,
@Nullable Long limit) {
super(table, predicate, projectFields, limit);
this.table = table;
+ this.isStreamingMode = isStreamingMode;
}
@Override
@@ -52,16 +59,27 @@ public class MetadataTableSource extends FlinkTableSource {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
- return SourceProvider.of(new MetadataSource(table, projectFields, predicate, limit));
+ Source<RowData, ?, ?> source;
+ if (table instanceof DataTable) {
+ DataTable dataTable = (DataTable) table;
+ source =
+ isStreamingMode
+ ? new ContinuousFileStoreSource(
+ dataTable, projectFields, predicate, limit)
+ : new StaticFileStoreSource(dataTable, projectFields, predicate, limit);
+ } else {
+ source = new SimpleSystemSource(table, projectFields, predicate, limit);
+ }
+ return SourceProvider.of(source);
}
@Override
public DynamicTableSource copy() {
- return new MetadataTableSource(table, predicate, projectFields, limit);
+ return new SystemTableSource(table, isStreamingMode, predicate, projectFields, limit);
}
@Override
public String asSummaryString() {
- return "TableStore-MetadataSource";
+ return "TableStore-SystemTable-Source";
}
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
index c705e21f..17d066cd 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
@@ -18,8 +18,10 @@
package org.apache.flink.table.store.connector;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
@@ -34,24 +36,49 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.junit.Before;
import java.io.IOException;
+import java.time.Duration;
import java.util.List;
+import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+
/** ITCase for catalog. */
public abstract class CatalogITCaseBase extends AbstractTestBase {
protected TableEnvironment tEnv;
+ protected TableEnvironment sEnv;
@Before
public void before() throws IOException {
tEnv =
TableEnvironmentTestUtils.create(
EnvironmentSettings.newInstance().inBatchMode().build());
+ String catalog = "TABLE_STORE";
tEnv.executeSql(
String.format(
- "CREATE CATALOG TABLE_STORE WITH ("
- + "'type'='table-store', 'warehouse'='%s')",
- TEMPORARY_FOLDER.newFolder().toURI()));
- tEnv.useCatalog("TABLE_STORE");
+ "CREATE CATALOG %s WITH (" + "'type'='table-store', 'warehouse'='%s')",
+ catalog, TEMPORARY_FOLDER.newFolder().toURI()));
+ tEnv.useCatalog(catalog);
+
+ sEnv =
+ TableEnvironmentTestUtils.create(
+ EnvironmentSettings.newInstance().inStreamingMode().build());
+ sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
+ sEnv.registerCatalog(catalog, tEnv.getCatalog(catalog).get());
+ sEnv.useCatalog(catalog);
+
+ prepareConfiguration(tEnv);
+ prepareConfiguration(sEnv);
+ }
+
+ private void prepareConfiguration(TableEnvironment env) {
+ Configuration config = env.getConfig().getConfiguration();
+ config.set(
+ ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
+ defaultParallelism());
+ }
+
+ protected int defaultParallelism() {
+ return 2;
}
protected List<Row> sql(String query, Object... args) throws Exception {
@@ -60,6 +87,10 @@ public abstract class CatalogITCaseBase extends AbstractTestBase {
}
}
+ protected CloseableIterator<Row> streamSqlIter(String query, Object... args) {
+ return sEnv.executeSql(String.format(query, args)).collect();
+ }
+
protected CatalogTable table(String tableName) throws TableNotExistException {
Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get();
CatalogBaseTable table =
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java
index 9aa1d8bb..f2f652d5 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java
@@ -25,7 +25,7 @@ import org.junit.Test;
import java.util.List;
import java.util.stream.Collectors;
-import static org.apache.flink.table.store.file.catalog.Catalog.METADATA_TABLE_SPLITTER;
+import static org.apache.flink.table.store.file.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -64,12 +64,12 @@ public class CatalogTableITCase extends CatalogITCaseBase {
.hasRootCauseMessage(
String.format(
"Table name[%s] cannot contain '%s' separator",
- "T$snapshots", METADATA_TABLE_SPLITTER));
+ "T$snapshots", SYSTEM_TABLE_SPLITTER));
assertThatThrownBy(() -> sql("CREATE TABLE T$aa$bb (a INT, b INT)"))
.hasRootCauseMessage(
String.format(
"Table name[%s] cannot contain '%s' separator",
- "T$aa$bb", METADATA_TABLE_SPLITTER));
+ "T$aa$bb", SYSTEM_TABLE_SPLITTER));
}
@Test
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
index 81d7f3bc..47fb004e 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FullCompactionFileStoreITCase.java
@@ -20,23 +20,26 @@ package org.apache.flink.table.store.connector;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.junit.Before;
import org.junit.Test;
-import java.util.Arrays;
-import java.util.List;
+import java.io.IOException;
import static org.assertj.core.api.Assertions.assertThat;
/** SQL ITCase for continuous file store. */
-public class FullCompactionFileStoreITCase extends FileStoreTableITCase {
+public class FullCompactionFileStoreITCase extends CatalogITCaseBase {
private final String table = "T";
@Override
- protected List<String> ddl() {
+ @Before
+ public void before() throws IOException {
+ super.before();
String options =
" WITH('changelog-producer'='full-compaction', 'changelog-producer.compaction-interval' = '1s')";
- return Arrays.asList(
+ tEnv.executeSql(
"CREATE TABLE IF NOT EXISTS T (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)"
+ options);
}
@@ -46,11 +49,11 @@ public class FullCompactionFileStoreITCase extends FileStoreTableITCase {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
- batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
+ sql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
- batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
+ sql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
}
@@ -61,15 +64,55 @@ public class FullCompactionFileStoreITCase extends FileStoreTableITCase {
streamSqlIter(
"SELECT * FROM %s /*+ OPTIONS('scan.mode'='compacted') */", table));
- batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
+ sql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
- batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
+ sql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
- assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='compacted') */"))
+ assertThat(sql("SELECT * FROM T /*+ OPTIONS('scan.mode'='compacted') */"))
.containsExactlyInAnyOrder(
Row.of("1", "2", "3"), Row.of("4", "5", "6"), Row.of("7", "8", "9"));
}
+
+ @Test
+ public void testUpdate() throws Exception {
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
+
+ sql("INSERT INTO %s VALUES ('1', '2', '3')", table);
+ assertThat(iterator.collect(1))
+ .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1", "2", "3"));
+
+ sql("INSERT INTO %s VALUES ('1', '4', '5')", table);
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.UPDATE_BEFORE, "1", "2", "3"),
+ Row.ofKind(RowKind.UPDATE_AFTER, "1", "4", "5"));
+
+ iterator.close();
+ }
+
+ @Test
+ public void testUpdateAuditLog() throws Exception {
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(streamSqlIter("SELECT * FROM %s$audit_log", table));
+
+ sql("INSERT INTO %s VALUES ('1', '2', '3')", table);
+ assertThat(iterator.collect(1))
+ .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "+I", "1", "2", "3"));
+
+ sql("INSERT INTO %s VALUES ('1', '4', '5')", table);
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.INSERT, "-U", "1", "2", "3"),
+ Row.ofKind(RowKind.INSERT, "+U", "1", "4", "5"));
+
+ iterator.close();
+
+ // BATCH mode
+ assertThat(sql("SELECT * FROM %s$audit_log", table))
+ .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "+I", "1", "4", "5"));
+ }
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 9e30c242..00ccf65f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.store.file.manifest.FileKind;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
+import org.apache.flink.table.store.table.source.AbstractDataTableScan;
import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.store.table.source.DataTableScan;
@@ -76,7 +77,7 @@ public class FileStoreSourceSplitGeneratorTest {
}
};
List<DataSplit> scanSplits =
- DataTableScan.generateSplits(
+ AbstractDataTableScan.generateSplits(
false,
Collections::singletonList,
plan.groupByPartFiles(plan.files(FileKind.ADD)));
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index f4512740..aa8d40f5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -466,6 +466,10 @@ public class CoreOptions implements Serializable {
return options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore);
}
+ public Duration continuousDiscoveryInterval() {
+ return options.get(CONTINUOUS_DISCOVERY_INTERVAL);
+ }
+
public int localSortMaxNumFileHandles() {
return options.get(LOCAL_SORT_MAX_NUM_FILE_HANDLES);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
index 6cbbdcf0..4b7a7b70 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.flink.table.store.table.Table;
-import org.apache.flink.table.store.table.metadata.MetadataTableLoader;
+import org.apache.flink.table.store.table.system.SystemTableLoader;
import org.apache.commons.lang3.StringUtils;
@@ -34,11 +34,11 @@ public abstract class AbstractCatalog implements Catalog {
@Override
public Path getTableLocation(ObjectPath tablePath) {
- if (tablePath.getObjectName().contains(METADATA_TABLE_SPLITTER)) {
+ if (tablePath.getObjectName().contains(SYSTEM_TABLE_SPLITTER)) {
throw new IllegalArgumentException(
String.format(
"Table name[%s] cannot contain '%s' separator",
- tablePath.getObjectName(), METADATA_TABLE_SPLITTER));
+ tablePath.getObjectName(), SYSTEM_TABLE_SPLITTER));
}
return new Path(databasePath(tablePath.getDatabaseName()), tablePath.getObjectName());
}
@@ -46,21 +46,21 @@ public abstract class AbstractCatalog implements Catalog {
@Override
public Table getTable(ObjectPath tablePath) throws TableNotExistException {
String inputTableName = tablePath.getObjectName();
- if (inputTableName.contains(METADATA_TABLE_SPLITTER)) {
- String[] splits = StringUtils.split(inputTableName, METADATA_TABLE_SPLITTER);
+ if (inputTableName.contains(SYSTEM_TABLE_SPLITTER)) {
+ String[] splits = StringUtils.split(inputTableName, SYSTEM_TABLE_SPLITTER);
if (splits.length != 2) {
throw new IllegalArgumentException(
- "Metadata table can only contain one '$' separator, but this is: "
+ "System table can only contain one '$' separator, but this is: "
+ inputTableName);
}
String table = splits[0];
- String metadata = splits[1];
+ String type = splits[1];
ObjectPath originTablePath = new ObjectPath(tablePath.getDatabaseName(), table);
if (!tableExists(originTablePath)) {
throw new TableNotExistException(tablePath);
}
Path location = getTableLocation(originTablePath);
- return MetadataTableLoader.load(metadata, location);
+ return SystemTableLoader.load(type, location);
} else {
TableSchema tableSchema = getTableSchema(tablePath);
return FileStoreTableFactory.create(getTableLocation(tablePath), tableSchema);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index a29977cf..919f6067 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -36,7 +36,7 @@ public interface Catalog extends AutoCloseable {
String DEFAULT_DATABASE = "default";
- String METADATA_TABLE_SPLITTER = "$";
+ String SYSTEM_TABLE_SPLITTER = "$";
/**
* Get lock factory from catalog. Lock is used to support multiple concurrent writes on the
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 52dd03f6..04adebfd 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -32,9 +32,9 @@ import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.sink.TableWriteImpl;
+import org.apache.flink.table.store.table.source.AbstractDataTableScan;
import org.apache.flink.table.store.table.source.AppendOnlySplitGenerator;
import org.apache.flink.table.store.table.source.DataSplit;
-import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.SplitGenerator;
import org.apache.flink.table.store.table.source.TableRead;
@@ -75,9 +75,9 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
}
@Override
- public DataTableScan newScan() {
+ public AbstractDataTableScan newScan() {
AppendOnlyFileStoreScan scan = store().newScan();
- return new DataTableScan(scan, tableSchema, store().pathFactory(), options()) {
+ return new AbstractDataTableScan(scan, tableSchema, store().pathFactory(), options()) {
@Override
protected SplitGenerator splitGenerator(FileStorePathFactory pathFactory) {
return new AppendOnlySplitGenerator(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index b6ba6111..0da5c85b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -38,7 +38,7 @@ import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.sink.TableWriteImpl;
-import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.AbstractDataTableScan;
import org.apache.flink.table.store.table.source.KeyValueTableRead;
import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
import org.apache.flink.table.store.table.source.SplitGenerator;
@@ -90,9 +90,9 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
}
@Override
- public DataTableScan newScan() {
+ public AbstractDataTableScan newScan() {
KeyValueFileStoreScan scan = store().newScan();
- return new DataTableScan(scan, tableSchema, store().pathFactory(), options()) {
+ return new AbstractDataTableScan(scan, tableSchema, store().pathFactory(), options()) {
@Override
protected SplitGenerator splitGenerator(FileStorePathFactory pathFactory) {
return new MergeTreeSplitGenerator(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 803535c4..f92cfd07 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -41,7 +41,7 @@ import org.apache.flink.table.store.table.sink.SequenceGenerator;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.sink.TableWriteImpl;
-import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.AbstractDataTableScan;
import org.apache.flink.table.store.table.source.KeyValueTableRead;
import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
import org.apache.flink.table.store.table.source.SplitGenerator;
@@ -146,9 +146,9 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
}
@Override
- public DataTableScan newScan() {
+ public AbstractDataTableScan newScan() {
KeyValueFileStoreScan scan = store().newScan();
- return new DataTableScan(scan, tableSchema, store().pathFactory(), options()) {
+ return new AbstractDataTableScan(scan, tableSchema, store().pathFactory(), options()) {
@Override
protected SplitGenerator splitGenerator(FileStorePathFactory pathFactory) {
return new MergeTreeSplitGenerator(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/DataTable.java
similarity index 55%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/DataTable.java
index b806ea2d..60e62685 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/DataTable.java
@@ -20,52 +20,18 @@ package org.apache.flink.table.store.table;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.table.sink.BucketComputer;
import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.types.logical.RowType;
-import java.util.List;
-import java.util.Map;
+/** A {@link Table} for data. */
+public interface DataTable extends Table {
-/**
- * An abstraction layer above {@link org.apache.flink.table.store.file.FileStore} to provide reading
- * and writing of {@link org.apache.flink.table.data.RowData}.
- */
-public interface FileStoreTable extends Table, SupportsPartition, SupportsWrite {
+ @Override
+ DataTableScan newScan();
CoreOptions options();
Path location();
- @Override
- default String name() {
- return location().getName();
- }
-
- @Override
- default RowType rowType() {
- return schema().logicalRowType();
- }
-
- @Override
- default List<String> partitionKeys() {
- return schema().partitionKeys();
- }
-
- TableSchema schema();
-
SnapshotManager snapshotManager();
-
- @Override
- FileStoreTable copy(Map<String, String> dynamicOptions);
-
- @Override
- DataTableScan newScan();
-
- @Override
- default BucketComputer bucketComputer() {
- return new BucketComputer(schema());
- }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index b806ea2d..877a5690 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -18,12 +18,8 @@
package org.apache.flink.table.store.table;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.sink.BucketComputer;
-import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.types.logical.RowType;
import java.util.List;
@@ -33,11 +29,7 @@ import java.util.Map;
* An abstraction layer above {@link org.apache.flink.table.store.file.FileStore} to provide reading
* and writing of {@link org.apache.flink.table.data.RowData}.
*/
-public interface FileStoreTable extends Table, SupportsPartition, SupportsWrite {
-
- CoreOptions options();
-
- Path location();
+public interface FileStoreTable extends DataTable, SupportsPartition, SupportsWrite {
@Override
default String name() {
@@ -56,14 +48,9 @@ public interface FileStoreTable extends Table, SupportsPartition, SupportsWrite
TableSchema schema();
- SnapshotManager snapshotManager();
-
@Override
FileStoreTable copy(Map<String, String> dynamicOptions);
- @Override
- DataTableScan newScan();
-
@Override
default BucketComputer bucketComputer() {
return new BucketComputer(schema());
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
similarity index 85%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
index 73e23986..c545aeca 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/AbstractDataTableScan.java
@@ -31,8 +31,6 @@ import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.SnapshotManager;
-import javax.annotation.Nullable;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -41,7 +39,7 @@ import java.util.Optional;
import static org.apache.flink.table.store.file.predicate.PredicateBuilder.transformFieldMapping;
/** An abstraction layer above {@link FileStoreScan} to provide input split generation. */
-public abstract class DataTableScan implements TableScan {
+public abstract class AbstractDataTableScan implements DataTableScan {
private final FileStoreScan scan;
private final TableSchema tableSchema;
@@ -50,7 +48,7 @@ public abstract class DataTableScan implements TableScan {
private ScanKind scanKind = ScanKind.ALL;
- protected DataTableScan(
+ protected AbstractDataTableScan(
FileStoreScan scan,
TableSchema tableSchema,
FileStorePathFactory pathFactory,
@@ -61,13 +59,14 @@ public abstract class DataTableScan implements TableScan {
this.options = options;
}
- public DataTableScan withSnapshot(long snapshotId) {
+ @Override
+ public AbstractDataTableScan withSnapshot(long snapshotId) {
scan.withSnapshot(snapshotId);
return this;
}
@Override
- public DataTableScan withFilter(Predicate predicate) {
+ public AbstractDataTableScan withFilter(Predicate predicate) {
List<String> partitionKeys = tableSchema.partitionKeys();
int[] fieldIdxToPartitionIdx =
tableSchema.fields().stream()
@@ -94,19 +93,21 @@ public abstract class DataTableScan implements TableScan {
return this;
}
- public DataTableScan withKind(ScanKind scanKind) {
+ @Override
+ public AbstractDataTableScan withKind(ScanKind scanKind) {
this.scanKind = scanKind;
scan.withKind(scanKind);
return this;
}
- public DataTableScan withLevel(int level) {
+ @Override
+ public AbstractDataTableScan withLevel(int level) {
scan.withLevel(level);
return this;
}
@VisibleForTesting
- public DataTableScan withBucket(int bucket) {
+ public AbstractDataTableScan withBucket(int bucket) {
scan.withBucket(bucket);
return this;
}
@@ -160,23 +161,4 @@ public abstract class DataTableScan implements TableScan {
public SnapshotManager snapshotManager() {
return new SnapshotManager(pathFactory.root());
}
-
- /** Scanning plan containing snapshot ID and input splits. */
- public static class DataFilePlan implements Plan {
-
- @Nullable public final Long snapshotId;
- public final List<DataSplit> splits;
-
- @VisibleForTesting
- public DataFilePlan(@Nullable Long snapshotId, List<DataSplit> splits) {
- this.snapshotId = snapshotId;
- this.splits = splits;
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- public List<Split> splits() {
- return (List) splits;
- }
- }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
index 73e23986..5509929f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
@@ -19,150 +19,29 @@
package org.apache.flink.table.store.table.source;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.apache.flink.table.store.file.manifest.FileKind;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.ScanKind;
import org.apache.flink.table.store.file.predicate.Predicate;
-import org.apache.flink.table.store.file.predicate.PredicateBuilder;
-import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
import javax.annotation.Nullable;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import static org.apache.flink.table.store.file.predicate.PredicateBuilder.transformFieldMapping;
+/** A {@link TableScan} for reading data. */
+public interface DataTableScan extends TableScan {
-/** An abstraction layer above {@link FileStoreScan} to provide input split generation. */
-public abstract class DataTableScan implements TableScan {
+ DataTableScan withKind(ScanKind kind);
- private final FileStoreScan scan;
- private final TableSchema tableSchema;
- private final FileStorePathFactory pathFactory;
- private final CoreOptions options;
+ DataTableScan withSnapshot(long snapshotId);
- private ScanKind scanKind = ScanKind.ALL;
-
- protected DataTableScan(
- FileStoreScan scan,
- TableSchema tableSchema,
- FileStorePathFactory pathFactory,
- CoreOptions options) {
- this.scan = scan;
- this.tableSchema = tableSchema;
- this.pathFactory = pathFactory;
- this.options = options;
- }
-
- public DataTableScan withSnapshot(long snapshotId) {
- scan.withSnapshot(snapshotId);
- return this;
- }
+ DataTableScan withLevel(int level);
@Override
- public DataTableScan withFilter(Predicate predicate) {
- List<String> partitionKeys = tableSchema.partitionKeys();
- int[] fieldIdxToPartitionIdx =
- tableSchema.fields().stream()
- .mapToInt(f -> partitionKeys.indexOf(f.name()))
- .toArray();
+ DataTableScan withFilter(Predicate predicate);
- List<Predicate> partitionFilters = new ArrayList<>();
- List<Predicate> nonPartitionFilters = new ArrayList<>();
- for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
- Optional<Predicate> mapped = transformFieldMapping(p, fieldIdxToPartitionIdx);
- if (mapped.isPresent()) {
- partitionFilters.add(mapped.get());
- } else {
- nonPartitionFilters.add(p);
- }
- }
-
- if (partitionFilters.size() > 0) {
- scan.withPartitionFilter(PredicateBuilder.and(partitionFilters));
- }
- if (nonPartitionFilters.size() > 0) {
- withNonPartitionFilter(PredicateBuilder.and(nonPartitionFilters));
- }
- return this;
- }
-
- public DataTableScan withKind(ScanKind scanKind) {
- this.scanKind = scanKind;
- scan.withKind(scanKind);
- return this;
- }
-
- public DataTableScan withLevel(int level) {
- scan.withLevel(level);
- return this;
- }
-
- @VisibleForTesting
- public DataTableScan withBucket(int bucket) {
- scan.withBucket(bucket);
- return this;
- }
-
- @Override
- public DataFilePlan plan() {
- FileStoreScan.Plan plan = scan.plan();
- return new DataFilePlan(
- plan.snapshotId(), generateSplits(plan.groupByPartFiles(plan.files(FileKind.ADD))));
- }
-
- private List<DataSplit> generateSplits(
- Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupedDataFiles) {
- return generateSplits(
- scanKind != ScanKind.ALL, splitGenerator(pathFactory), groupedDataFiles);
- }
-
- @VisibleForTesting
- public static List<DataSplit> generateSplits(
- boolean isIncremental,
- SplitGenerator splitGenerator,
- Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupedDataFiles) {
- List<DataSplit> splits = new ArrayList<>();
- for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> entry :
- groupedDataFiles.entrySet()) {
- BinaryRowData partition = entry.getKey();
- Map<Integer, List<DataFileMeta>> buckets = entry.getValue();
- for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry : buckets.entrySet()) {
- int bucket = bucketEntry.getKey();
- if (isIncremental) {
- // Don't split when incremental
- splits.add(new DataSplit(partition, bucket, bucketEntry.getValue(), true));
- } else {
- splitGenerator.split(bucketEntry.getValue()).stream()
- .map(files -> new DataSplit(partition, bucket, files, false))
- .forEach(splits::add);
- }
- }
- }
- return splits;
- }
-
- protected abstract SplitGenerator splitGenerator(FileStorePathFactory pathFactory);
-
- protected abstract void withNonPartitionFilter(Predicate predicate);
-
- public CoreOptions options() {
- return options;
- }
-
- public SnapshotManager snapshotManager() {
- return new SnapshotManager(pathFactory.root());
- }
+ DataTableScan.DataFilePlan plan();
/** Scanning plan containing snapshot ID and input splits. */
- public static class DataFilePlan implements Plan {
+ class DataFilePlan implements Plan {
@Nullable public final Long snapshotId;
public final List<DataSplit> splits;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
index d1a462ef..bb9debfa 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
@@ -24,7 +24,7 @@ import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.DataTable;
import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.util.Preconditions;
@@ -109,7 +109,7 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
// ------------------------------------------------------------------------
public static ContinuousDataFileSnapshotEnumerator createWithSnapshotStarting(
- FileStoreTable table, DataTableScan scan) {
+ DataTable table, DataTableScan scan) {
StartingScanner startingScanner =
table.options().startupMode() == CoreOptions.StartupMode.COMPACTED
? new CompactedStartingScanner()
@@ -119,7 +119,7 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
}
public static ContinuousDataFileSnapshotEnumerator create(
- FileStoreTable table, DataTableScan scan, @Nullable Long nextSnapshotId) {
+ DataTable table, DataTableScan scan, @Nullable Long nextSnapshotId) {
return new ContinuousDataFileSnapshotEnumerator(
table.location(),
scan,
@@ -128,7 +128,7 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
nextSnapshotId);
}
- private static StartingScanner createStartingScanner(FileStoreTable table) {
+ private static StartingScanner createStartingScanner(DataTable table) {
CoreOptions.StartupMode startupMode = table.options().startupMode();
Long startupMillis = table.options().logScanTimestampMills();
if (startupMode == CoreOptions.StartupMode.FULL) {
@@ -151,7 +151,7 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
}
}
- private static FollowUpScanner createFollowUpScanner(FileStoreTable table, DataTableScan scan) {
+ private static FollowUpScanner createFollowUpScanner(DataTable table, DataTableScan scan) {
CoreOptions.ChangelogProducer changelogProducer = table.options().changelogProducer();
if (changelogProducer == CoreOptions.ChangelogProducer.NONE) {
return new DeltaFollowUpScanner();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
index f12e9ac1..582ba9c2 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.table.source.snapshot;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.DataTable;
import org.apache.flink.table.store.table.source.DataTableScan;
import javax.annotation.Nullable;
@@ -59,8 +59,7 @@ public class StaticDataFileSnapshotEnumerator implements SnapshotEnumerator {
// static create methods
// ------------------------------------------------------------------------
- public static StaticDataFileSnapshotEnumerator create(
- FileStoreTable table, DataTableScan scan) {
+ public static StaticDataFileSnapshotEnumerator create(DataTable table, DataTableScan scan) {
CoreOptions.StartupMode startupMode = table.options().startupMode();
StartingScanner startingScanner;
if (startupMode == CoreOptions.StartupMode.FULL
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
new file mode 100644
index 00000000..52ba766e
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.system;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.file.predicate.LeafPredicate;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.predicate.PredicateReplaceVisitor;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.DataTable;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.Table;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.utils.ProjectedRowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.flink.shaded.guava30.com.google.common.primitives.Ints;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.store.file.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
+import static org.apache.flink.table.store.file.utils.RecordReaderUtils.transform;
+
+/** A {@link Table} for reading audit log of table. */
+public class AuditLogTable implements DataTable {
+
+ public static final String AUDIT_LOG = "audit_log";
+
+ public static final String ROW_KIND = "rowkind";
+
+ public static final PredicateReplaceVisitor PREDICATE_CONVERTER =
+ p -> {
+ if (p.index() == 0) {
+ return Optional.empty();
+ }
+ return Optional.of(
+ new LeafPredicate(
+ p.function(),
+ p.type(),
+ p.index() - 1,
+ p.fieldName(),
+ p.literals()));
+ };
+
+ private final FileStoreTable dataTable;
+
+ public AuditLogTable(FileStoreTable dataTable) {
+ this.dataTable = dataTable;
+ }
+
+ @Override
+ public String name() {
+ return dataTable.name() + SYSTEM_TABLE_SPLITTER + AUDIT_LOG;
+ }
+
+ @Override
+ public RowType rowType() {
+ List<RowField> fields = new ArrayList<>();
+ fields.add(new RowField(ROW_KIND, new VarCharType(VarCharType.MAX_LENGTH)));
+ fields.addAll(dataTable.rowType().getFields());
+ return new RowType(fields);
+ }
+
+ @Override
+ public DataTableScan newScan() {
+ return new AuditLogScan(dataTable.newScan());
+ }
+
+ @Override
+ public CoreOptions options() {
+ return dataTable.options();
+ }
+
+ @Override
+ public Path location() {
+ return dataTable.location();
+ }
+
+ @Override
+ public SnapshotManager snapshotManager() {
+ return dataTable.snapshotManager();
+ }
+
+ @Override
+ public TableRead newRead() {
+ return new AuditLogRead(dataTable.newRead());
+ }
+
+ @Override
+ public Table copy(Map<String, String> dynamicOptions) {
+ return new AuditLogTable(dataTable.copy(dynamicOptions));
+ }
+
+ /** Push down predicate to dataScan and dataRead. */
+ private Optional<Predicate> convert(Predicate predicate) {
+ List<Predicate> result =
+ PredicateBuilder.splitAnd(predicate).stream()
+ .map(p -> p.visit(PREDICATE_CONVERTER))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toList());
+ if (result.isEmpty()) {
+ return Optional.empty();
+ }
+ return Optional.of(PredicateBuilder.and(result));
+ }
+
+ private class AuditLogScan implements DataTableScan {
+
+ private final DataTableScan dataScan;
+
+ private AuditLogScan(DataTableScan dataScan) {
+ this.dataScan = dataScan;
+ }
+
+ @Override
+ public DataTableScan withFilter(Predicate predicate) {
+ convert(predicate).ifPresent(dataScan::withFilter);
+ return this;
+ }
+
+ @Override
+ public DataTableScan withKind(ScanKind kind) {
+ dataScan.withKind(kind);
+ return this;
+ }
+
+ @Override
+ public DataTableScan withSnapshot(long snapshotId) {
+ dataScan.withSnapshot(snapshotId);
+ return this;
+ }
+
+ @Override
+ public DataTableScan withLevel(int level) {
+ dataScan.withLevel(level);
+ return this;
+ }
+
+ @Override
+ public DataTableScan.DataFilePlan plan() {
+ return dataScan.plan();
+ }
+ }
+
+ private class AuditLogRead implements TableRead {
+
+ private final TableRead dataRead;
+
+ private int[] readProjection;
+
+ private AuditLogRead(TableRead dataRead) {
+ this.dataRead = dataRead;
+ this.readProjection = defaultProjection();
+ }
+
+ /** Default projection, just add row kind to the first. */
+ private int[] defaultProjection() {
+ int dataFieldCount = dataTable.rowType().getFieldCount();
+ int[] projection = new int[dataFieldCount + 1];
+ projection[0] = -1;
+ for (int i = 0; i < dataFieldCount; i++) {
+ projection[i + 1] = i;
+ }
+ return projection;
+ }
+
+ @Override
+ public TableRead withFilter(Predicate predicate) {
+ convert(predicate).ifPresent(dataRead::withFilter);
+ return this;
+ }
+
+ @Override
+ public TableRead withProjection(int[][] projection) {
+ // data projection to push down to dataRead
+ List<int[]> dataProjection = new ArrayList<>();
+ // read projection to handle record returned by dataRead
+ List<Integer> readProjection = new ArrayList<>();
+ boolean rowKindAppeared = false;
+ for (int i = 0; i < projection.length; i++) {
+ int[] field = projection[i];
+ int topField = field[0];
+ if (topField == 0) {
+ rowKindAppeared = true;
+ readProjection.add(-1);
+ } else {
+ int[] newField = Arrays.copyOf(field, field.length);
+ newField[0] = newField[0] - 1;
+ dataProjection.add(newField);
+
+ // There is no row kind field. Keep it as it is
+ // Row kind field has occurred, and the following fields are offset by 1
+ // position
+ readProjection.add(rowKindAppeared ? i - 1 : i);
+ }
+ }
+ this.readProjection = Ints.toArray(readProjection);
+ dataRead.withProjection(dataProjection.toArray(new int[0][]));
+ return this;
+ }
+
+ @Override
+ public RecordReader<RowData> createReader(Split split) throws IOException {
+ return transform(dataRead.createReader(split), this::convertRow);
+ }
+
+ private RowData convertRow(RowData data) {
+ return new AuditLogRowData(readProjection, data);
+ }
+ }
+
+ /** A {@link ProjectedRowData} which returns row kind when mapping index is negative. */
+ private static class AuditLogRowData extends ProjectedRowData {
+
+ private AuditLogRowData(int[] indexMapping, RowData row) {
+ super(indexMapping);
+ replaceRow(row);
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ return RowKind.INSERT;
+ }
+
+ @Override
+ public void setRowKind(RowKind kind) {
+ throw new UnsupportedOperationException(
+ "Set row kind is not supported in AuditLogRowData.");
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ if (indexMapping[pos] < 0) {
+ // row kind is always not null
+ return false;
+ }
+ return super.isNullAt(pos);
+ }
+
+ @Override
+ public StringData getString(int pos) {
+ if (indexMapping[pos] < 0) {
+ return StringData.fromString(row.getRowKind().shortString());
+ }
+ return super.getString(pos);
+ }
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/OptionsTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/OptionsTable.java
similarity index 97%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/OptionsTable.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/OptionsTable.java
index dc906b23..c5901267 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/OptionsTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/OptionsTable.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.table.metadata;
+package org.apache.flink.table.store.table.system;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
@@ -42,7 +42,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
-import static org.apache.flink.table.store.file.catalog.Catalog.METADATA_TABLE_SPLITTER;
+import static org.apache.flink.table.store.file.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
import static org.apache.flink.table.store.file.utils.SerializationUtils.newStringType;
/** A {@link Table} for showing options of table. */
@@ -66,7 +66,7 @@ public class OptionsTable implements Table {
@Override
public String name() {
- return location.getName() + METADATA_TABLE_SPLITTER + OPTIONS;
+ return location.getName() + SYSTEM_TABLE_SPLITTER + OPTIONS;
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SchemasTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SchemasTable.java
similarity index 97%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SchemasTable.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SchemasTable.java
index 7128c854..30619b6f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SchemasTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SchemasTable.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.table.metadata;
+package org.apache.flink.table.store.table.system;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
@@ -46,7 +46,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
-import static org.apache.flink.table.store.file.catalog.Catalog.METADATA_TABLE_SPLITTER;
+import static org.apache.flink.table.store.file.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
/** A {@link Table} for showing schemas of table. */
public class SchemasTable implements Table {
@@ -77,7 +77,7 @@ public class SchemasTable implements Table {
@Override
public String name() {
- return location.getName() + METADATA_TABLE_SPLITTER + SCHEMAS;
+ return location.getName() + SYSTEM_TABLE_SPLITTER + SCHEMAS;
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SnapshotsTable.java
similarity index 97%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SnapshotsTable.java
index 7d1672f8..8890431e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SnapshotsTable.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.table.metadata;
+package org.apache.flink.table.store.table.system;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
@@ -50,7 +50,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
-import static org.apache.flink.table.store.file.catalog.Catalog.METADATA_TABLE_SPLITTER;
+import static org.apache.flink.table.store.file.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
/** A {@link Table} for showing committing snapshots of table. */
public class SnapshotsTable implements Table {
@@ -79,7 +79,7 @@ public class SnapshotsTable implements Table {
@Override
public String name() {
- return location.getName() + METADATA_TABLE_SPLITTER + SNAPSHOTS;
+ return location.getName() + SYSTEM_TABLE_SPLITTER + SNAPSHOTS;
}
@Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/MetadataTableLoader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SystemTableLoader.java
similarity index 58%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/MetadataTableLoader.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SystemTableLoader.java
index 0b23ada0..620ff8d6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/MetadataTableLoader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/SystemTableLoader.java
@@ -16,29 +16,32 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.table.metadata;
+package org.apache.flink.table.store.table.system;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.flink.table.store.table.Table;
-import static org.apache.flink.table.store.table.metadata.OptionsTable.OPTIONS;
-import static org.apache.flink.table.store.table.metadata.SchemasTable.SCHEMAS;
-import static org.apache.flink.table.store.table.metadata.SnapshotsTable.SNAPSHOTS;
+import static org.apache.flink.table.store.table.system.AuditLogTable.AUDIT_LOG;
+import static org.apache.flink.table.store.table.system.OptionsTable.OPTIONS;
+import static org.apache.flink.table.store.table.system.SchemasTable.SCHEMAS;
+import static org.apache.flink.table.store.table.system.SnapshotsTable.SNAPSHOTS;
-/** Loader to load metadata {@link Table}s. */
-public class MetadataTableLoader {
+/** Loader to load system {@link Table}s. */
+public class SystemTableLoader {
- public static Table load(String metadata, Path location) {
- switch (metadata.toLowerCase()) {
+ public static Table load(String type, Path location) {
+ switch (type.toLowerCase()) {
case SNAPSHOTS:
return new SnapshotsTable(location);
case OPTIONS:
return new OptionsTable(location);
case SCHEMAS:
return new SchemasTable(location);
+ case AUDIT_LOG:
+ return new AuditLogTable(FileStoreTableFactory.create(location));
default:
- throw new UnsupportedOperationException(
- "Unsupported metadata table type: " + metadata);
+ throw new UnsupportedOperationException("Unsupported system table type: " + type);
}
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 81648141..e1cca63f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.AbstractDataTableScan;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
@@ -203,7 +203,7 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
Predicate partitionFilter =
new PredicateBuilder(table.schema().logicalRowType()).equal(0, partition);
List<Split> splits =
- ((DataTableScan) table.newScan())
+ ((AbstractDataTableScan) table.newScan())
.withFilter(partitionFilter)
.withBucket(bucket)
.plan()
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 34ab1a17..1d47e5c2 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.conversion.RowRowConverter;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
import org.apache.flink.table.store.file.WriteMode;
@@ -42,6 +44,7 @@ import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnap
import org.apache.flink.table.store.table.source.snapshot.FullStartingScanner;
import org.apache.flink.table.store.table.source.snapshot.InputChangelogFollowUpScanner;
import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
+import org.apache.flink.table.store.table.system.AuditLogTable;
import org.apache.flink.table.store.utils.CompatibilityTestUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -615,6 +618,83 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
.isNotEqualTo(splits0.get(0).files().get(0).fileName());
}
+ @Test
+ public void testAuditLog() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER, ChangelogProducer.INPUT));
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ // first commit
+ write.write(rowDataWithKind(RowKind.INSERT, 1, 10, 100L));
+ write.write(rowDataWithKind(RowKind.INSERT, 2, 20, 200L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ // second commit
+ write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 30, 300L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ write.close();
+ commit.close();
+
+ AuditLogTable auditLogTable = new AuditLogTable(table);
+ RowRowConverter converter =
+ RowRowConverter.create(
+ DataTypes.ROW(
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT()));
+ Function<RowData, String> rowDataToString = row -> converter.toExternal(row).toString();
+ PredicateBuilder predicateBuilder = new PredicateBuilder(auditLogTable.rowType());
+
+ // Read all
+ TableScan scan = auditLogTable.newScan();
+ TableRead read = auditLogTable.newRead();
+ List<String> result = getResult(read, scan.plan().splits(), rowDataToString);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ "+I[+I, 2, 20, 200]", "+I[+U, 1, 30, 300]", "+I[+I, 1, 10, 100]");
+
+ // Read by filter row kind (No effect)
+ Predicate rowKindEqual = predicateBuilder.equal(0, StringData.fromString("+I"));
+ scan = auditLogTable.newScan().withFilter(rowKindEqual);
+ read = auditLogTable.newRead().withFilter(rowKindEqual);
+ result = getResult(read, scan.plan().splits(), rowDataToString);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ "+I[+I, 2, 20, 200]", "+I[+U, 1, 30, 300]", "+I[+I, 1, 10, 100]");
+
+ // Read by filter
+ scan = auditLogTable.newScan().withFilter(predicateBuilder.equal(2, 10));
+ read = auditLogTable.newRead().withFilter(predicateBuilder.equal(2, 10));
+ result = getResult(read, scan.plan().splits(), rowDataToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[+I, 1, 10, 100]");
+
+ // Read by projection
+ scan = auditLogTable.newScan();
+ read = auditLogTable.newRead().withProjection(new int[] {2, 0, 1});
+ RowRowConverter projConverter1 =
+ RowRowConverter.create(
+ DataTypes.ROW(DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()));
+ Function<RowData, String> projectToString1 =
+ row -> projConverter1.toExternal(row).toString();
+ result = getResult(read, scan.plan().splits(), projectToString1);
+ assertThat(result)
+ .containsExactlyInAnyOrder("+I[20, +I, 2]", "+I[30, +U, 1]", "+I[10, +I, 1]");
+
+ // Read by projection without row kind
+ scan = auditLogTable.newScan();
+ read = auditLogTable.newRead().withProjection(new int[] {2, 1});
+ RowRowConverter projConverter2 =
+ RowRowConverter.create(DataTypes.ROW(DataTypes.INT(), DataTypes.INT()));
+ Function<RowData, String> projectToString2 =
+ row -> projConverter2.toExternal(row).toString();
+ result = getResult(read, scan.plan().splits(), projectToString2);
+ assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]", "+I[10, 1]");
+ }
+
@Override
protected FileStoreTable createFileStoreTable(Consumer<Configuration> configure)
throws Exception {
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 4903c59c..1e3e35d3 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -78,7 +78,7 @@ public class SparkReadITCase extends SparkReadTestBase {
}
@Test
- public void testMetadataTable() {
+ public void testSnapshotsTable() {
List<Row> rows =
spark.table("tablestore.default.`t1$snapshots`")
.select("snapshot_id", "schema_id", "commit_user", "commit_kind")