You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/15 13:16:22 UTC
[flink-table-store] branch master updated: [FLINK-26535] Introduce StoreTableSource And StoreTableSink
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 0139e9e [FLINK-26535] Introduce StoreTableSource And StoreTableSink
0139e9e is described below
commit 0139e9ef70c353bf441814b045a6372f227abcfd
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Tue Mar 15 21:16:14 2022 +0800
[FLINK-26535] Introduce StoreTableSource And StoreTableSink
This closes #41
---
flink-table-store-connector/pom.xml | 40 ++
.../flink/table/store/connector/TableStore.java | 36 +-
.../table/store/connector/TableStoreFactory.java | 121 ++++-
.../table/store/connector/sink/TableStoreSink.java | 168 ++++++
.../store/connector/source/FileStoreSource.java | 27 +-
.../store/connector/source/TableStoreSource.java | 232 +++++++++
.../table/store/connector/CreateTableITCase.java | 238 +++++++++
.../table/store/connector/DropTableITCase.java | 242 +++++++++
.../store/connector/ReadWriteTableITCase.java | 300 +++++++++++
.../table/store/connector/ShowCreateUtil.java | 180 +++++++
.../store/connector/TableStoreFactoryTest.java | 48 +-
.../table/store/connector/TableStoreITCase.java | 576 ---------------------
.../table/store/connector/TableStoreTestBase.java | 198 +++++++
.../store/file/mergetree/MergeTreeOptions.java | 16 +
.../store/file/predicate/PredicateConverter.java | 13 +
15 files changed, 1828 insertions(+), 607 deletions(-)
diff --git a/flink-table-store-connector/pom.xml b/flink-table-store-connector/pom.xml
index e4e8f38..f7baacb 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -151,6 +151,46 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-orc</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Tests: Hadoop required by ORC -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit4.version}</version>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index a8667f1..534cbe7 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -108,6 +108,23 @@ public class TableStore {
return this;
}
+ public boolean partitioned() {
+ return partitions.length > 0;
+ }
+
+ public boolean valueCountMode() {
+ return primaryKeys.length == 0;
+ }
+
+ public List<String> fieldNames() {
+ return type.getFieldNames();
+ }
+
+ public List<String> partitionKeys() {
+ RowType partitionType = ProjectionUtils.project(type, partitions);
+ return partitionType.getFieldNames();
+ }
+
public SourceBuilder sourceBuilder() {
return new SourceBuilder();
}
@@ -154,7 +171,9 @@ public class TableStore {
@Nullable private int[][] projectedFields;
- @Nullable private Predicate predicate;
+ @Nullable private Predicate partitionPredicate;
+
+ @Nullable private Predicate fieldPredicate;
@Nullable private LogSourceProvider logSourceProvider;
@@ -163,8 +182,13 @@ public class TableStore {
return this;
}
- public SourceBuilder withPredicate(Predicate predicate) {
- this.predicate = predicate;
+ public SourceBuilder withPartitionPredicate(Predicate partitionPredicate) {
+ this.partitionPredicate = partitionPredicate;
+ return this;
+ }
+
+ public SourceBuilder withFieldPredicate(Predicate fieldPredicate) {
+ this.fieldPredicate = fieldPredicate;
return this;
}
@@ -186,7 +210,11 @@ public class TableStore {
private FileStoreSource buildFileStoreSource() {
FileStore fileStore = buildFileStore();
return new FileStoreSource(
- fileStore, primaryKeys.length == 0, projectedFields, predicate);
+ fileStore,
+ primaryKeys.length == 0,
+ projectedFields,
+ partitionPredicate,
+ fieldPredicate);
}
public Source<RowData, ?, ?> build() {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
index e846a0a..731f5c3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
@@ -19,16 +19,30 @@
package org.apache.flink.table.store.connector;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.store.connector.sink.TableStoreSink;
+import org.apache.flink.table.store.connector.source.TableStoreSource;
import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -36,17 +50,20 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
import static org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
/** Default implementation of {@link ManagedTableFactory}. */
-public class TableStoreFactory implements ManagedTableFactory {
+public class TableStoreFactory
+ implements ManagedTableFactory, DynamicTableSourceFactory, DynamicTableSinkFactory {
@Override
public Map<String, String> enrichOptions(Context context) {
@@ -130,6 +147,41 @@ public class TableStoreFactory implements ManagedTableFactory {
}
@Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ boolean streaming =
+ context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+ == RuntimeExecutionMode.STREAMING;
+ Context logStoreContext = null;
+ LogStoreTableFactory logStoreTableFactory = null;
+
+ if (enableChangeTracking(context.getCatalogTable().getOptions())) {
+ logStoreContext = createLogContext(context);
+ logStoreTableFactory = createLogStoreTableFactory(context);
+ }
+ return new TableStoreSource(
+ buildTableStore(context), streaming, logStoreContext, logStoreTableFactory);
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ Map<String, String> options = context.getCatalogTable().getOptions();
+ Context logStoreContext = null;
+ LogStoreTableFactory logStoreTableFactory = null;
+ if (enableChangeTracking(options)) {
+ logStoreContext = createLogContext(context);
+ logStoreTableFactory = createLogStoreTableFactory(context);
+ }
+ return new TableStoreSink(
+ buildTableStore(context),
+ LogOptions.LogChangelogMode.valueOf(
+ options.getOrDefault(
+ LOG_PREFIX + CHANGELOG_MODE.key(),
+ CHANGELOG_MODE.defaultValue().toString().toUpperCase())),
+ logStoreContext,
+ logStoreTableFactory);
+ }
+
+ @Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}
@@ -137,13 +189,14 @@ public class TableStoreFactory implements ManagedTableFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = FileStoreOptions.allOptions();
+ options.addAll(MergeTreeOptions.allOptions());
options.add(CHANGE_TRACKING);
return options;
}
// ~ Tools ------------------------------------------------------------------
- private LogStoreTableFactory createLogStoreTableFactory(Context context) {
+ private static LogStoreTableFactory createLogStoreTableFactory(Context context) {
return discoverLogStoreFactory(
context.getClassLoader(),
context.getCatalogTable()
@@ -151,7 +204,7 @@ public class TableStoreFactory implements ManagedTableFactory {
.getOrDefault(LOG_SYSTEM.key(), LOG_SYSTEM.defaultValue()));
}
- private Context createLogContext(Context context) {
+ private static Context createLogContext(Context context) {
return new FactoryUtil.DefaultDynamicTableContext(
context.getObjectIdentifier(),
context.getCatalogTable()
@@ -163,20 +216,35 @@ public class TableStoreFactory implements ManagedTableFactory {
}
@VisibleForTesting
- Map<String, String> filterLogStoreOptions(Map<String, String> enrichedOptions) {
- Map<String, String> logStoreOptions = new HashMap<>();
- enrichedOptions.forEach(
- (k, v) -> {
- if (k.startsWith(LOG_PREFIX)) {
- logStoreOptions.put(k.substring(LOG_PREFIX.length()), v);
- }
- });
- return logStoreOptions;
+ static Map<String, String> filterLogStoreOptions(Map<String, String> options) {
+ return options.entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith(LOG_PREFIX))
+ .collect(
+ Collectors.toMap(
+ entry -> entry.getKey().substring(LOG_PREFIX.length()),
+ Map.Entry::getValue));
}
- private static Path tablePath(Map<String, String> options, ObjectIdentifier identifier) {
+ @VisibleForTesting
+ static Map<String, String> filterFileStoreOptions(Map<String, String> options) {
+ return options.entrySet().stream()
+ .filter(entry -> !entry.getKey().startsWith(LOG_PREFIX))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ @VisibleForTesting
+ static Path tablePath(Map<String, String> options, ObjectIdentifier identifier) {
+ Preconditions.checkArgument(
+ options.containsKey(FILE_PATH.key()),
+ String.format(
+ "Failed to create file store path. "
+ + "Please specify a root dir by setting session level configuration "
+ + "as `SET 'table-store.%s' = '...'`. "
+ + "Alternatively, you can use a per-table root dir "
+ + "as `CREATE TABLE ${table} (...) WITH ('%s' = '...')`",
+ FILE_PATH.key(), FILE_PATH.key()));
return new Path(
- new Path(options.get(FILE_PATH.key())),
+ options.get(FILE_PATH.key()),
String.format(
"root/%s.catalog/%s.db/%s",
identifier.getCatalogName(),
@@ -184,9 +252,32 @@ public class TableStoreFactory implements ManagedTableFactory {
identifier.getObjectName()));
}
- private static boolean enableChangeTracking(Map<String, String> options) {
+ @VisibleForTesting
+ static boolean enableChangeTracking(Map<String, String> options) {
return Boolean.parseBoolean(
options.getOrDefault(
CHANGE_TRACKING.key(), CHANGE_TRACKING.defaultValue().toString()));
}
+
+ private TableStore buildTableStore(Context context) {
+ ResolvedCatalogTable catalogTable = context.getCatalogTable();
+ ResolvedSchema schema = catalogTable.getResolvedSchema();
+ RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();
+ int[] primaryKeys = new int[0];
+ if (schema.getPrimaryKey().isPresent()) {
+ primaryKeys =
+ schema.getPrimaryKey().get().getColumns().stream()
+ .mapToInt(rowType.getFieldNames()::indexOf)
+ .toArray();
+ }
+ return new TableStore(
+ Configuration.fromMap(filterFileStoreOptions(catalogTable.getOptions())))
+ .withTableIdentifier(context.getObjectIdentifier())
+ .withSchema(rowType)
+ .withPrimaryKeys(primaryKeys)
+ .withPartitions(
+ catalogTable.getPartitionKeys().stream()
+ .mapToInt(rowType.getFieldNames()::indexOf)
+ .toArray());
+ }
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
new file mode 100644
index 0000000..9b442a3
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RequireCatalogLock;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Table sink to create {@link StoreSink}. */
+public class TableStoreSink
+ implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
+
+ private final TableStore tableStore;
+ private final LogOptions.LogChangelogMode logChangelogMode;
+ @Nullable private final DynamicTableFactory.Context logStoreContext;
+ @Nullable private final LogStoreTableFactory logStoreTableFactory;
+
+ private Map<String, String> staticPartitions = new HashMap<>();
+ private boolean overwrite = false;
+ @Nullable private CatalogLock.Factory lockFactory;
+
+ public TableStoreSink(
+ TableStore tableStore,
+ LogOptions.LogChangelogMode logChangelogMode,
+ @Nullable DynamicTableFactory.Context logStoreContext,
+ @Nullable LogStoreTableFactory logStoreTableFactory) {
+ this.tableStore = tableStore;
+ this.logChangelogMode = logChangelogMode;
+ this.logStoreContext = logStoreContext;
+ this.logStoreTableFactory = logStoreTableFactory;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ if (!tableStore.valueCountMode()
+ && logChangelogMode == LogOptions.LogChangelogMode.UPSERT) {
+ ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+ for (RowKind kind : requestedMode.getContainedKinds()) {
+ if (kind != RowKind.UPDATE_BEFORE) {
+ builder.addContainedKind(kind);
+ }
+ }
+ return builder.build();
+ }
+ return requestedMode;
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ LogSinkProvider logSinkProvider = null;
+ if (logStoreTableFactory != null) {
+ logSinkProvider =
+ logStoreTableFactory.createSinkProvider(
+ logStoreContext,
+ new LogStoreTableFactory.SinkContext() {
+ @Override
+ public boolean isBounded() {
+ return context.isBounded();
+ }
+
+ @Override
+ public <T> TypeInformation<T> createTypeInformation(
+ DataType consumedDataType) {
+ return context.createTypeInformation(consumedDataType);
+ }
+
+ @Override
+ public <T> TypeInformation<T> createTypeInformation(
+ LogicalType consumedLogicalType) {
+ return context.createTypeInformation(consumedLogicalType);
+ }
+
+ @Override
+ public DynamicTableSink.DataStructureConverter
+ createDataStructureConverter(DataType consumedDataType) {
+ return context.createDataStructureConverter(consumedDataType);
+ }
+ });
+ }
+ final LogSinkProvider finalLogSinkProvider = logSinkProvider;
+ return (DataStreamSinkProvider)
+ (providerContext, dataStream) ->
+ tableStore
+ .sinkBuilder()
+ .withInput(
+ new DataStream<>(
+ dataStream.getExecutionEnvironment(),
+ dataStream.getTransformation()))
+ .withLockFactory(lockFactory)
+ .withLogSinkProvider(finalLogSinkProvider)
+ .withOverwritePartition(overwrite ? staticPartitions : null)
+ .build();
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ TableStoreSink copied =
+ new TableStoreSink(
+ tableStore, logChangelogMode, logStoreContext, logStoreTableFactory);
+ copied.staticPartitions = new HashMap<>(staticPartitions);
+ copied.overwrite = overwrite;
+ copied.lockFactory = lockFactory;
+ return copied;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "TableStoreSink";
+ }
+
+ @Override
+ public void applyStaticPartition(Map<String, String> partition) {
+ tableStore
+ .partitionKeys()
+ .forEach(
+ partitionKey -> {
+ if (partition.containsKey(partitionKey)) {
+ this.staticPartitions.put(
+ partitionKey, partition.get(partitionKey));
+ }
+ });
+ }
+
+ @Override
+ public void applyOverwrite(boolean overwrite) {
+ this.overwrite = overwrite;
+ }
+
+ @Override
+ public void setLockFactory(@Nullable CatalogLock.Factory lockFactory) {
+ this.lockFactory = lockFactory;
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index b595df5..6a72c55 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -47,17 +47,21 @@ public class FileStoreSource
@Nullable private final int[][] projectedFields;
- @Nullable private final Predicate predicate;
+ @Nullable private final Predicate partitionPredicate;
+
+ @Nullable private final Predicate fieldPredicate;
public FileStoreSource(
FileStore fileStore,
boolean valueCountMode,
@Nullable int[][] projectedFields,
- @Nullable Predicate predicate) {
+ @Nullable Predicate partitionPredicate,
+ final Predicate fieldPredicate) {
this.fileStore = fileStore;
this.valueCountMode = valueCountMode;
this.projectedFields = projectedFields;
- this.predicate = predicate;
+ this.partitionPredicate = partitionPredicate;
+ this.fieldPredicate = fieldPredicate;
}
@Override
@@ -84,14 +88,15 @@ public class FileStoreSource
public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> createEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context) {
FileStoreScan scan = fileStore.newScan();
- if (predicate != null) {
- // TODO split predicate into partitionPredicate and fieldsPredicate
- // scan.withPartitionFilter(partitionPredicate);
- // if (keyAsRecord) {
- // scan.withKeyFilter(fieldsPredicate);
- // } else {
- // scan.withValueFilter(fieldsPredicate);
- // }
+ if (partitionPredicate != null) {
+ scan.withPartitionFilter(partitionPredicate);
+ }
+ if (fieldPredicate != null) {
+ if (valueCountMode) {
+ scan.withKeyFilter(fieldPredicate);
+ } else {
+ scan.withValueFilter(fieldPredicate);
+ }
}
return new StaticFileStoreSplitEnumerator(context, scan);
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
new file mode 100644
index 0000000..66c2da2
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.TableStore;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.log.LogSourceProvider;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Table source to create {@link FileStoreSource} under batch mode or change-tracking is disabled.
+ * For streaming mode with change-tracking enabled and FULL scan mode, it will create a {@link
+ * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link FileStoreSource} and kafka
+ * log source created by {@link LogSourceProvider}.
+ */
+public class TableStoreSource
+ implements ScanTableSource, SupportsFilterPushDown, SupportsProjectionPushDown {
+
+ private final TableStore tableStore;
+ private final boolean streaming;
+ @Nullable private final DynamicTableFactory.Context logStoreContext;
+ @Nullable private final LogStoreTableFactory logStoreTableFactory;
+
+ private List<ResolvedExpression> partitionFilters = new ArrayList<>();
+ private List<ResolvedExpression> fieldFilters = new ArrayList<>();
+ @Nullable private int[][] projectFields;
+
+ public TableStoreSource(
+ TableStore tableStore,
+ boolean streaming,
+ @Nullable DynamicTableFactory.Context logStoreContext,
+ @Nullable LogStoreTableFactory logStoreTableFactory) {
+ this.tableStore = tableStore;
+ this.streaming = streaming;
+ this.logStoreContext = logStoreContext;
+ this.logStoreTableFactory = logStoreTableFactory;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return streaming
+ ? tableStore.valueCountMode()
+ ? ChangelogMode.all()
+ // TODO: optimize upsert when consistency mode is transactional and
+ // log.changelog-mode is all
+ : ChangelogMode.upsert()
+ : ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+ LogSourceProvider logSourceProvider = null;
+ if (logStoreTableFactory != null) {
+ logSourceProvider =
+ logStoreTableFactory.createSourceProvider(
+ logStoreContext,
+ new LogStoreTableFactory.SourceContext() {
+ @Override
+ public <T> TypeInformation<T> createTypeInformation(
+ DataType producedDataType) {
+ return scanContext.createTypeInformation(producedDataType);
+ }
+
+ @Override
+ public <T> TypeInformation<T> createTypeInformation(
+ LogicalType producedLogicalType) {
+ return scanContext.createTypeInformation(producedLogicalType);
+ }
+
+ @Override
+ public DataStructureConverter createDataStructureConverter(
+ DataType producedDataType) {
+ return scanContext.createDataStructureConverter(
+ producedDataType);
+ }
+ });
+ }
+ TableStore.SourceBuilder builder =
+ tableStore
+ .sourceBuilder()
+ .withContinuousMode(streaming)
+ .withHybridMode(streaming && logSourceProvider != null)
+ .withLogSourceProvider(logSourceProvider)
+ .withProjection(projectFields)
+ .withPartitionPredicate(PredicateConverter.convert(partitionFilters))
+ .withFieldPredicate(PredicateConverter.convert(fieldFilters));
+ return SourceProvider.of(builder.build());
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ TableStoreSource copied =
+ new TableStoreSource(tableStore, streaming, logStoreContext, logStoreTableFactory);
+ copied.partitionFilters = new ArrayList<>(partitionFilters);
+ copied.fieldFilters = new ArrayList<>(fieldFilters);
+ copied.projectFields = projectFields;
+ return copied;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "TableStoreSource";
+ }
+
+ @Override
+ public Result applyFilters(List<ResolvedExpression> filters) {
+ if (tableStore.partitioned()) {
+ classifyFilters(filters);
+ } else {
+ fieldFilters = filters;
+ }
+ return Result.of(
+ new ArrayList<>(filters),
+ fieldFilters == null ? Collections.emptyList() : fieldFilters);
+ }
+
+ @Override
+ public boolean supportsNestedProjection() {
+ return false;
+ }
+
+ @Override
+ public void applyProjection(int[][] projectedFields, DataType producedDataType) {
+ this.projectFields = projectedFields;
+ }
+
+ private void classifyFilters(List<ResolvedExpression> filters) {
+ List<String> fieldNames = tableStore.fieldNames();
+ List<String> partitionKeys = tableStore.partitionKeys();
+ PartitionIndexVisitor visitor =
+ new PartitionIndexVisitor(
+ fieldNames.stream().mapToInt(partitionKeys::indexOf).toArray());
+ filters.forEach(
+ filter -> {
+ try {
+ partitionFilters.add(filter.accept(visitor));
+ } catch (FoundFieldReference e) {
+ fieldFilters.add(filter);
+ }
+ });
+ }
+
+ private static class PartitionIndexVisitor implements ExpressionVisitor<ResolvedExpression> {
+
+ private final int[] mapping;
+
+ PartitionIndexVisitor(int[] mapping) {
+ this.mapping = mapping;
+ }
+
+ @Override
+ public ResolvedExpression visit(CallExpression call) {
+ return CallExpression.anonymous(
+ call.getFunctionDefinition(),
+ call.getResolvedChildren().stream()
+ .map(e -> e.accept(this))
+ .collect(Collectors.toList()),
+ call.getOutputDataType());
+ }
+
+ @Override
+ public ResolvedExpression visit(ValueLiteralExpression valueLiteral) {
+ return valueLiteral;
+ }
+
+ @Override
+ public ResolvedExpression visit(FieldReferenceExpression fieldReference) {
+ int adjustIndex = mapping[fieldReference.getFieldIndex()];
+ if (adjustIndex == -1) {
+ // not a partition field
+ throw new FoundFieldReference();
+ }
+ return new FieldReferenceExpression(
+ fieldReference.getName(),
+ fieldReference.getOutputDataType(),
+ fieldReference.getInputIndex(),
+ adjustIndex);
+ }
+
+ @Override
+ public ResolvedExpression visit(TypeLiteralExpression typeLiteral) {
+ return typeLiteral;
+ }
+
+ @Override
+ public ResolvedExpression visit(Expression other) {
+ return (ResolvedExpression) other;
+ }
+ }
+
+ private static class FoundFieldReference extends RuntimeException {}
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
new file mode 100644
index 0000000..083aaa0
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT cases for testing create managed table ddl. */
+@RunWith(Parameterized.class)
+public class CreateTableITCase extends TableStoreTestBase {
+
+ protected final boolean ignoreException;
+
+ protected ResolvedCatalogTable resolvedTable =
+ createResolvedTable(
+ Collections.emptyMap(),
+ RowType.of(new IntType(), new VarCharType()),
+ Collections.emptyList(),
+ new int[0]);
+
+ public CreateTableITCase(
+ RuntimeExecutionMode executionMode,
+ String tableName,
+ boolean enableChangeTracking,
+ boolean ignoreException,
+ ExpectedResult expectedResult) {
+ super(executionMode, tableName, enableChangeTracking, expectedResult);
+ this.ignoreException = ignoreException;
+ }
+
+ @Test
+ public void testCreateTable() {
+ final String ddl =
+ ShowCreateUtil.buildShowCreateTable(
+ resolvedTable, tableIdentifier, ignoreException);
+ if (expectedResult.success) {
+ tEnv.executeSql(ddl);
+ // check catalog
+ assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier))
+ .isPresent();
+ // check table store
+ assertThat(Paths.get(rootPath, getRelativeFileStoreTablePath(tableIdentifier)).toFile())
+ .exists();
+ // check log store
+ assertThat(topicExists(tableIdentifier.asSummaryString()))
+ .isEqualTo(enableChangeTracking);
+ } else {
+ // check inconsistency between catalog/file store/log store
+ assertThat(ignoreException).isFalse();
+ assertThatThrownBy(() -> tEnv.executeSql(ddl))
+ .getCause()
+ .isInstanceOf(expectedResult.expectedType)
+ .hasMessageContaining(expectedResult.expectedMessage);
+
+ if (expectedResult.expectedMessage.contains(
+ String.format("already exists in Catalog %s", CURRENT_CATALOG))) {
+ assertThat(
+ ((TableEnvironmentImpl) tEnv)
+ .getCatalogManager()
+ .getTable(tableIdentifier))
+ .isPresent();
+ } else {
+ // throw exception when creating file path/topic, and catalog meta does not
+ // exist
+ assertThat(
+ ((TableEnvironmentImpl) tEnv)
+ .getCatalogManager()
+ .getTable(tableIdentifier))
+ .isNotPresent();
+ }
+ }
+ }
+
+ @Override
+ public void prepareEnv() {
+ if (expectedResult.success) {
+ // ensure catalog doesn't contain the table meta
+ tEnv.getCatalog(tEnv.getCurrentCatalog())
+ .ifPresent(
+ (catalog) -> {
+ try {
+ catalog.dropTable(tableIdentifier.toObjectPath(), false);
+ } catch (TableNotExistException ignored) {
+ // ignored
+ }
+ });
+ // ensure log store doesn't exist the topic
+ if (enableChangeTracking && !ignoreException) {
+ deleteTopicIfExists(tableIdentifier.asSummaryString());
+ }
+ } else if (expectedResult.expectedMessage.startsWith("Failed to create file store path.")) {
+ // failed when creating file store
+ Paths.get(rootPath, getRelativeFileStoreTablePath(tableIdentifier)).toFile().mkdirs();
+ } else if (expectedResult.expectedMessage.startsWith("Failed to create kafka topic.")) {
+ // failed when creating log store
+ createTopicIfNotExists(tableIdentifier.asSummaryString(), BUCKET.defaultValue());
+ } else {
+ // failed when registering schema to catalog
+ tEnv.getCatalog(tEnv.getCurrentCatalog())
+ .ifPresent(
+ (catalog) -> {
+ try {
+ catalog.createTable(
+ tableIdentifier.toObjectPath(), resolvedTable, false);
+ } catch (TableAlreadyExistException
+ | DatabaseNotExistException ignored) {
+ // ignored
+ }
+ });
+ }
+ }
+
+ @Parameterized.Parameters(
+ name =
+ "executionMode-{0}, tableName-{1}, enableChangeTracking-{2}, ignoreException-{3}, expectedResult-{4}")
+ public static List<Object[]> data() {
+ List<Object[]> specs = new ArrayList<>();
+ // successful case specs
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.STREAMING,
+ "table_" + UUID.randomUUID(),
+ true,
+ true,
+ new ExpectedResult().success(true)
+ });
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.STREAMING,
+ "table_" + UUID.randomUUID(),
+ false,
+ true,
+ new ExpectedResult().success(true)
+ });
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.STREAMING,
+ "table_" + UUID.randomUUID(),
+ true,
+ false,
+ new ExpectedResult().success(true)
+ });
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.STREAMING,
+ "table_" + UUID.randomUUID(),
+ false,
+ false,
+ new ExpectedResult().success(true)
+ });
+
+ // failed case specs
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.STREAMING,
+ "table_" + UUID.randomUUID(),
+ false,
+ false,
+ new ExpectedResult()
+ .success(false)
+ .expectedType(TableException.class)
+ .expectedMessage("Failed to create file store path.")
+ });
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.STREAMING,
+ "table_" + UUID.randomUUID(),
+ true,
+ false,
+ new ExpectedResult()
+ .success(false)
+ .expectedType(TableException.class)
+ .expectedMessage("Failed to create kafka topic.")
+ });
+
+ final String tableName = "table_" + UUID.randomUUID();
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.STREAMING,
+ tableName,
+ true,
+ false,
+ new ExpectedResult()
+ .success(false)
+ .expectedType(TableAlreadyExistException.class)
+ .expectedMessage(
+ String.format(
+ "Table (or view) %s already exists in Catalog %s.",
+ ObjectIdentifier.of(
+ CURRENT_CATALOG,
+ CURRENT_DATABASE,
+ tableName)
+ .toObjectPath()
+ .getFullName(),
+ CURRENT_CATALOG))
+ });
+ return specs;
+ }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
new file mode 100644
index 0000000..c683604
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT cases for testing drop managed table ddl. */
+@RunWith(Parameterized.class)
+public class DropTableITCase extends TableStoreTestBase {
+
+ protected final boolean ignoreException;
+
+ protected ResolvedCatalogTable resolvedTable =
+ createResolvedTable(
+ Collections.emptyMap(),
+ RowType.of(new IntType(), new VarCharType()),
+ Collections.emptyList(),
+ new int[0]);
+
+ public DropTableITCase(
+ RuntimeExecutionMode executionMode,
+ String tableName,
+ boolean enableChangeTracking,
+ boolean ignoreException,
+ ExpectedResult expectedResult) {
+ super(executionMode, tableName, enableChangeTracking, expectedResult);
+ this.ignoreException = ignoreException;
+ }
+
+ @Test
+ public void testDropTable() {
+ String ddl =
+ String.format(
+ "DROP TABLE%s%s\n",
+ ignoreException ? " IF EXISTS " : " ",
+ tableIdentifier.asSerializableString());
+
+ if (expectedResult.success) {
+ tEnv.executeSql(ddl);
+ // check catalog
+ assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier))
+ .isNotPresent();
+ // check table store
+ assertThat(Paths.get(rootPath, getRelativeFileStoreTablePath(tableIdentifier)).toFile())
+ .doesNotExist();
+ // check log store
+ assertThat(topicExists(tableIdentifier.asSummaryString())).isFalse();
+ } else {
+ // check inconsistency between catalog/file store/log store
+ assertThat(ignoreException).isFalse();
+ if (ValidationException.class.isAssignableFrom(expectedResult.expectedType)) {
+ // successfully delete path/topic, but schema doesn't exist in catalog
+ assertThatThrownBy(() -> tEnv.executeSql(ddl))
+ .isInstanceOf(expectedResult.expectedType)
+ .hasMessageContaining(expectedResult.expectedMessage);
+ assertThat(
+ ((TableEnvironmentImpl) tEnv)
+ .getCatalogManager()
+ .getTable(tableIdentifier))
+ .isNotPresent();
+ } else {
+ assertThatThrownBy(() -> tEnv.executeSql(ddl))
+ .getCause()
+ .isInstanceOf(expectedResult.expectedType)
+ .hasMessageContaining(expectedResult.expectedMessage);
+ // throw exception when deleting file path/topic, so schema still exists in
+ // catalog
+ assertThat(
+ ((TableEnvironmentImpl) tEnv)
+ .getCatalogManager()
+ .getTable(tableIdentifier))
+ .isPresent();
+ }
+ }
+ }
+
+ @Override
+ public void prepareEnv() {
+ ((TableEnvironmentImpl) tEnv)
+ .getCatalogManager()
+ .createTable(resolvedTable, tableIdentifier, false);
+ if (expectedResult.success) {
+ if (ignoreException) {
+ // delete catalog schema does not affect dropping the table
+ tEnv.getCatalog(tEnv.getCurrentCatalog())
+ .ifPresent(
+ (catalog) -> {
+ try {
+ catalog.dropTable(tableIdentifier.toObjectPath(), false);
+ } catch (TableNotExistException ignored) {
+ // ignored
+ }
+ });
+ // delete file store path does not affect dropping the table
+ deleteTablePath();
+ // delete log store topic does not affect dropping the table
+ if (enableChangeTracking) {
+ deleteTopicIfExists(tableIdentifier.asSummaryString());
+ }
+ }
+ } else if (expectedResult.expectedMessage.startsWith("Failed to delete file store path.")) {
+ // failed when deleting file path
+ deleteTablePath();
+ } else if (expectedResult.expectedMessage.startsWith("Failed to delete kafka topic.")) {
+ // failed when deleting topic
+ deleteTopicIfExists(tableIdentifier.asSummaryString());
+ } else {
+ // failed when dropping catalog schema
+ tEnv.getCatalog(tEnv.getCurrentCatalog())
+ .ifPresent(
+ (catalog) -> {
+ try {
+ catalog.dropTable(tableIdentifier.toObjectPath(), false);
+ } catch (TableNotExistException ignored) {
+ // ignored
+ }
+ });
+ }
+ }
+
+ @Parameterized.Parameters(
+ name =
+ "executionMode-{0}, tableName-{1}, enableChangeTracking-{2}, ignoreException-{3}, expectedResult-{4}")
+ public static List<Object[]> data() {
+ List<Object[]> specs = new ArrayList<>();
+ // successful case specs
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.STREAMING,
+ "table_" + UUID.randomUUID(),
+ true,
+ true,
+ new ExpectedResult().success(true)
+ });
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.STREAMING,
+ "table_" + UUID.randomUUID(),
+ false,
+ true,
+ new ExpectedResult().success(true)
+ });
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.STREAMING,
+ "table_" + UUID.randomUUID(),
+ true,
+ false,
+ new ExpectedResult().success(true)
+ });
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.STREAMING,
+ "table_" + UUID.randomUUID(),
+ false,
+ false,
+ new ExpectedResult().success(true)
+ });
+
+ // failed case specs
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.STREAMING,
+ "table_" + UUID.randomUUID(),
+ false,
+ false,
+ new ExpectedResult()
+ .success(false)
+ .expectedType(TableException.class)
+ .expectedMessage("Failed to delete file store path.")
+ });
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.STREAMING,
+ "table_" + UUID.randomUUID(),
+ true,
+ false,
+ new ExpectedResult()
+ .success(false)
+ .expectedType(TableException.class)
+ .expectedMessage("Failed to delete kafka topic.")
+ });
+ final String tableName = "table_" + UUID.randomUUID();
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.STREAMING,
+ tableName,
+ true,
+ false,
+ new ExpectedResult()
+ .success(false)
+ .expectedType(ValidationException.class)
+ .expectedMessage(
+ String.format(
+ "Table with identifier '%s' does not exist.",
+ ObjectIdentifier.of(
+ CURRENT_CATALOG,
+ CURRENT_DATABASE,
+ tableName)
+ .asSummaryString()))
+ });
+ return specs;
+ }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
new file mode 100644
index 0000000..eff7b77
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.collection.JavaConverters;
+
+import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT cases for testing querying managed table dml. */
+@RunWith(Parameterized.class)
+public class ReadWriteTableITCase extends TableStoreTestBase {
+
+ private final boolean hasPk;
+ @Nullable private final Boolean duplicate;
+
+ public ReadWriteTableITCase(
+ RuntimeExecutionMode executionMode,
+ String tableName,
+ boolean enableChangeTracking,
+ boolean hasPk,
+ @Nullable Boolean duplicate,
+ ExpectedResult expectedResult) {
+ super(executionMode, tableName, enableChangeTracking, expectedResult);
+ this.hasPk = hasPk;
+ this.duplicate = duplicate;
+ }
+
+ @Override
+ public void after() {
+ tEnv.executeSql("DROP TABLE `source_table`");
+ super.after();
+ }
+
+ @Test
+ public void testReadWriteNonPartitioned() throws Exception {
+ String statement =
+ String.format("INSERT INTO %s \nSELECT * FROM `source_table`", tableIdentifier);
+ if (expectedResult.success) {
+ tEnv.executeSql(statement).await();
+ TableResult result =
+ tEnv.executeSql(String.format("SELECT * FROM %s", tableIdentifier));
+ List<Row> actual = new ArrayList<>();
+ try (CloseableIterator<Row> iterator = result.collect()) {
+ while (iterator.hasNext()) {
+ actual.add(iterator.next());
+ }
+ }
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedResult.expectedRecords);
+ String relativeFilePath = getRelativeFileStoreTablePath(tableIdentifier);
+ // check snapshot file path
+ assertThat(Paths.get(rootPath, relativeFilePath, "snapshot")).exists();
+ // check manifest file path
+ assertThat(Paths.get(rootPath, relativeFilePath, "manifest")).exists();
+
+ if (enableChangeTracking) {
+ assertThat(topicExists(tableIdentifier.asSummaryString())).isTrue();
+ }
+ } else {
+ assertThatThrownBy(
+ () -> {
+ tEnv.executeSql(statement).await();
+ tEnv.executeSql(String.format("SELECT * FROM %s", tableIdentifier))
+ .collect();
+ })
+ .isInstanceOf(expectedResult.expectedType)
+ .hasMessageContaining(expectedResult.expectedMessage);
+ }
+ }
+
+ @Parameterized.Parameters(
+ name =
+ "executionMode-{0}, tableName-{1}, "
+ + "enableChangeTracking-{2}, hasPk-{3},"
+ + " duplicate-{4}, expectedResult-{5}")
+ public static List<Object[]> data() {
+ List<Object[]> specs = new ArrayList<>();
+ // batch cases
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.BATCH,
+ "table_" + UUID.randomUUID(),
+ false, // enable change-tracking
+ false, // has pk
+ false, // without duplicate
+ new ExpectedResult().success(true).expectedRecords(insertOnlyCities(false))
+ });
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.BATCH,
+ "table_" + UUID.randomUUID(),
+ false, // enable change-tracking
+ false, // has pk
+ true, // with duplicate
+ new ExpectedResult().success(true).expectedRecords(insertOnlyCities(true))
+ });
+ List<Row> expected = new ArrayList<>(rates());
+ expected.remove(1);
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.BATCH,
+ "table_" + UUID.randomUUID(),
+ false, // enable change-tracking
+ true, // has pk
+ null, // without delete
+ new ExpectedResult().success(true).expectedRecords(expected)
+ });
+ // TODO: streaming with change-tracking
+
+ // exception case
+ specs.add(
+ new Object[] {
+ RuntimeExecutionMode.STREAMING,
+ "table_" + UUID.randomUUID(),
+ false, // enable change-tracking
+ false, // has pk
+ null, // with duplicate
+ new ExpectedResult()
+ .success(false)
+ .expectedType(UnsupportedOperationException.class)
+ .expectedMessage("File store continuous mode is not supported yet.")
+ });
+
+ // TODO: add overwrite case
+
+ return specs;
+ }
+
+ @Override
+ protected void prepareEnv() {
+ if (hasPk) {
+ if (executionMode == RuntimeExecutionMode.STREAMING) {
+ registerUpsertRecordsWithPk();
+ } else {
+ registerInsertOnlyRecordsWithPk();
+ }
+ } else {
+ if (duplicate != null) {
+ registerInsertOnlyRecordsWithoutPk(duplicate);
+ } else {
+ registerInsertUpdateDeleteRecordsWithoutPk();
+ }
+ }
+ }
+
+ private void registerInsertUpdateDeleteRecordsWithoutPk() {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE source_table (\n"
+ + " user_id STRING,\n"
+ + " user_name STRING,\n"
+ + " email STRING,\n"
+ + " balance DECIMAL(18,2)\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = '%s',\n"
+ + " 'data-id' = '%s',\n"
+ + " 'changelog-mode' = 'I,UA,UB,D',\n"
+ + " 'disable-lookup' = 'true'\n"
+ + ")",
+ executionMode == RuntimeExecutionMode.BATCH,
+ registerData(TestData.userChangelog())));
+ registerTableStoreSink();
+ }
+
+ private void registerInsertOnlyRecordsWithoutPk(boolean duplicate) {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE source_table (\n"
+ + " name STRING NOT NULL,\n"
+ + " state STRING NOT NULL,\n"
+ + " pop INT NOT NULL\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = '%s',\n"
+ + " 'data-id' = '%s',\n"
+ + " 'changelog-mode' = 'I'\n"
+ + ")",
+ executionMode == RuntimeExecutionMode.BATCH,
+ registerData(insertOnlyCities(duplicate))));
+ registerTableStoreSink();
+ }
+
+ private void registerInsertOnlyRecordsWithPk() {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE source_table (\n"
+ + " currency STRING,\n"
+ + " rate BIGINT,\n"
+ + " PRIMARY KEY (currency) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = '%s',\n"
+ + " 'data-id' = '%s',\n"
+ + " 'changelog-mode' = 'I',\n"
+ + " 'disable-lookup' = 'true'\n"
+ + ")",
+ executionMode == RuntimeExecutionMode.BATCH, registerData(rates())));
+ registerTableStoreSink();
+ }
+
+ private void registerUpsertRecordsWithPk() {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE source_table (\n"
+ + " currency STRING,\n"
+ + " rate BIGINT,\n"
+ + " PRIMARY KEY (currency) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = '%s',\n"
+ + " 'data-id' = '%s',\n"
+ + " 'changelog-mode' = 'UA,D',\n"
+ + " 'disable-lookup' = 'true'\n"
+ + ")",
+ executionMode == RuntimeExecutionMode.BATCH,
+ registerData(ratesChangelog())));
+ registerTableStoreSink();
+ }
+
+ private static List<Row> rates() {
+ return Arrays.asList(
+ changelogRow("+I", "US Dollar", 102L),
+ changelogRow("+I", "Euro", 114L),
+ changelogRow("+I", "Yen", 1L),
+ changelogRow("+I", "Euro", 119L));
+ }
+
+ private static List<Row> ratesChangelog() {
+ return Arrays.asList(
+ changelogRow("+I", "US Dollar", 102L),
+ changelogRow("+I", "Euro", 114L),
+ changelogRow("+I", "Yen", 1L),
+ changelogRow("+U", "Euro", 116L),
+ changelogRow("-D", "Euro", 116L),
+ changelogRow("+I", "Euro", 119L),
+ changelogRow("+U", "Euro", 119L),
+ changelogRow("-D", "Yen", 1L));
+ }
+
+ private static List<Row> insertOnlyCities(boolean duplicate) {
+ List<Row> cities = JavaConverters.seqAsJavaList(TestData.citiesData());
+ return duplicate
+ ? Stream.concat(cities.stream(), cities.stream()).collect(Collectors.toList())
+ : cities;
+ }
+
+ private static List<Row> userChangelog() {
+ return Arrays.asList(
+ changelogRow("+I", "user1", "Tom", "tom123@gmail.com", new BigDecimal("8.10")),
+ changelogRow("+I", "user3", "Bailey", "bailey@qq.com", new BigDecimal("9.99")),
+ changelogRow("+I", "user4", "Tina", "tina@gmail.com", new BigDecimal("11.30")));
+ }
+
+ private void registerTableStoreSink() {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE %s LIKE `source_table` (EXCLUDING OPTIONS)",
+ tableIdentifier.asSerializableString()));
+ }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ShowCreateUtil.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ShowCreateUtil.java
new file mode 100644
index 0000000..118d5c3
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ShowCreateUtil.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * SHOW CREATE statement Util.
+ *
+ * <p>This code is mostly copied from {@link org.apache.flink.table.api.internal.ShowCreateUtil}.
+ */
+public class ShowCreateUtil {
+
+ private ShowCreateUtil() {}
+
+ static String buildShowCreateTable(
+ ResolvedCatalogBaseTable<?> table,
+ ObjectIdentifier tableIdentifier,
+ boolean ignoreIfExists) {
+ if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+ throw new TableException(
+ String.format(
+ "SHOW CREATE TABLE is only supported for tables, but %s is a view. Please use SHOW CREATE VIEW instead.",
+ tableIdentifier.asSerializableString()));
+ }
+ final String printIndent = " ";
+ StringBuilder sb =
+ new StringBuilder()
+ .append(buildCreateFormattedPrefix(ignoreIfExists, tableIdentifier));
+ sb.append(extractFormattedColumns(table, printIndent));
+ extractFormattedWatermarkSpecs(table, printIndent)
+ .ifPresent(watermarkSpecs -> sb.append(",\n").append(watermarkSpecs));
+ extractFormattedPrimaryKey(table, printIndent).ifPresent(pk -> sb.append(",\n").append(pk));
+ sb.append("\n) ");
+ extractFormattedComment(table)
+ .ifPresent(
+ c -> sb.append(String.format("COMMENT '%s'%s", c, System.lineSeparator())));
+ extractFormattedPartitionedInfo((ResolvedCatalogTable) table)
+ .ifPresent(
+ partitionedInfoFormatted ->
+ sb.append("PARTITIONED BY (")
+ .append(partitionedInfoFormatted)
+ .append(")\n"));
+ extractFormattedOptions(table, printIndent)
+ .ifPresent(v -> sb.append("WITH (\n").append(v).append("\n)\n"));
+ return sb.toString();
+ }
+
+ static String extractFormattedColumns(ResolvedCatalogBaseTable<?> table, String printIndent) {
+ return table.getResolvedSchema().getColumns().stream()
+ .map(column -> String.format("%s%s", printIndent, getColumnString(column)))
+ .collect(Collectors.joining(",\n"));
+ }
+
+ static Optional<String> extractFormattedWatermarkSpecs(
+ ResolvedCatalogBaseTable<?> table, String printIndent) {
+ if (table.getResolvedSchema().getWatermarkSpecs().isEmpty()) {
+ return Optional.empty();
+ }
+ return Optional.of(
+ table.getResolvedSchema().getWatermarkSpecs().stream()
+ .map(
+ watermarkSpec ->
+ String.format(
+ "%sWATERMARK FOR %s AS %s",
+ printIndent,
+ EncodingUtils.escapeIdentifier(
+ watermarkSpec.getRowtimeAttribute()),
+ watermarkSpec
+ .getWatermarkExpression()
+ .asSerializableString()))
+ .collect(Collectors.joining("\n")));
+ }
+
+ static Optional<String> extractFormattedComment(ResolvedCatalogBaseTable<?> table) {
+ String comment = table.getComment();
+ if (StringUtils.isNotEmpty(comment)) {
+ return Optional.of(EncodingUtils.escapeSingleQuotes(comment));
+ }
+ return Optional.empty();
+ }
+
+ static Optional<String> extractFormattedPartitionedInfo(ResolvedCatalogTable catalogTable) {
+ if (!catalogTable.isPartitioned()) {
+ return Optional.empty();
+ }
+ return Optional.of(
+ catalogTable.getPartitionKeys().stream()
+ .map(EncodingUtils::escapeIdentifier)
+ .collect(Collectors.joining(", ")));
+ }
+
+ static Optional<String> extractFormattedOptions(
+ ResolvedCatalogBaseTable<?> table, String printIndent) {
+ if (Objects.isNull(table.getOptions()) || table.getOptions().isEmpty()) {
+ return Optional.empty();
+ }
+ return Optional.of(
+ table.getOptions().entrySet().stream()
+ .map(
+ entry ->
+ String.format(
+ "%s'%s' = '%s'",
+ printIndent,
+ EncodingUtils.escapeSingleQuotes(entry.getKey()),
+ EncodingUtils.escapeSingleQuotes(entry.getValue())))
+ .collect(Collectors.joining(",\n")));
+ }
+
+ static String buildCreateFormattedPrefix(boolean ignoreIfExists, ObjectIdentifier identifier) {
+ return String.format(
+ "CREATE TABLE%s %s (%s",
+ ignoreIfExists ? " IF NOT EXISTS " : "",
+ identifier.asSerializableString(),
+ System.lineSeparator());
+ }
+
+ static Optional<String> extractFormattedPrimaryKey(
+ ResolvedCatalogBaseTable<?> table, String printIndent) {
+ Optional<UniqueConstraint> primaryKey = table.getResolvedSchema().getPrimaryKey();
+ return primaryKey.map(
+ uniqueConstraint -> String.format("%s%s", printIndent, uniqueConstraint));
+ }
+
+ static String getColumnString(Column column) {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(EncodingUtils.escapeIdentifier(column.getName()));
+ sb.append(" ");
+ // skip data type for computed column
+ if (column instanceof Column.ComputedColumn) {
+ sb.append(
+ column.explainExtras()
+ .orElseThrow(
+ () ->
+ new TableException(
+ String.format(
+ "Column expression can not be null for computed column '%s'",
+ column.getName()))));
+ } else {
+ sb.append(column.getDataType().getLogicalType().asSerializableString());
+ column.explainExtras()
+ .ifPresent(
+ e -> {
+ sb.append(" ");
+ sb.append(e);
+ });
+ }
+ // TODO: Print the column comment until FLINK-18958 is fixed
+ return sb.toString();
+ }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
index e0c9b5b..9b8d006 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreFactoryTest.java
@@ -158,10 +158,48 @@ public class TableStoreFactoryTest {
addPrefix(expectedLogOptions, LOG_PREFIX, (key) -> true);
enrichedOptions.put("foo", "bar");
- assertThat(((TableStoreFactory) tableStoreFactory).filterLogStoreOptions(enrichedOptions))
+ assertThat(TableStoreFactory.filterLogStoreOptions(enrichedOptions))
.containsExactlyInAnyOrderEntriesOf(expectedLogOptions);
}
+ @Test
+ public void testFilterFileStoreOptions() {
+ // mix invalid key and leave value to empty to emphasize the deferred validation
+ Map<String, String> expectedFileStoreOptions =
+ of("dummy.key", "", FILE_PATH.key(), "dummy:/foo/bar");
+ Map<String, String> enrichedOptions = new HashMap<>(expectedFileStoreOptions);
+ enrichedOptions.put("log.foo", "bar");
+ enrichedOptions.put("log.bar", "foo");
+
+ assertThat(TableStoreFactory.filterFileStoreOptions(enrichedOptions))
+ .containsExactlyInAnyOrderEntriesOf(expectedFileStoreOptions);
+ }
+
+ @Test
+ public void testTablePath() {
+ Map<String, String> options = of(FILE_PATH.key(), "dummy:/foo/bar");
+ assertThat(TableStoreFactory.tablePath(options, TABLE_IDENTIFIER))
+ .isEqualTo(
+ new org.apache.flink.core.fs.Path(
+ "dummy:/foo/bar/root/catalog.catalog/database.db/table"));
+
+ assertThatThrownBy(
+ () -> TableStoreFactory.tablePath(Collections.emptyMap(), TABLE_IDENTIFIER))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "Failed to create file store path. "
+ + "Please specify a root dir by setting session level configuration "
+ + "as `SET 'table-store.file.path' = '...'`. "
+ + "Alternatively, you can use a per-table root dir "
+ + "as `CREATE TABLE ${table} (...) WITH ('file.path' = '...')`");
+ }
+
+ @ParameterizedTest
+ @MethodSource("providingEnrichedOptionsForChangeTracking")
+ public void testEnableChangeTracking(Map<String, String> options, boolean expected) {
+ assertThat(TableStoreFactory.enableChangeTracking(options)).isEqualTo(expected);
+ }
+
// ~ Tools ------------------------------------------------------------------
private static Stream<Arguments> providingOptions() {
@@ -240,6 +278,14 @@ public class TableStoreFactoryTest {
Arguments.of(enrichedOptions, false));
}
+ private static Stream<Arguments> providingEnrichedOptionsForChangeTracking() {
+ return Stream.of(
+ Arguments.of(Collections.emptyMap(), true),
+ Arguments.of(of(CHANGE_TRACKING.key(), "true"), true),
+ Arguments.of(of(CHANGE_TRACKING.key(), "false"), false),
+ Arguments.of(of(TABLE_STORE_PREFIX + CHANGE_TRACKING.key(), "false"), true));
+ }
+
private static Map<String, String> addPrefix(
Map<String, String> options, String prefix, Predicate<String> predicate) {
Map<String, String> newOptions = new HashMap<>();
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreITCase.java
deleted file mode 100644
index 4206970..0000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreITCase.java
+++ /dev/null
@@ -1,576 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.internal.TableEnvironmentImpl;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.catalog.UniqueConstraint;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.store.kafka.KafkaTableTestBase;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.table.types.utils.TypeConversions;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-
-import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
-import static org.apache.flink.table.store.connector.TableStoreITCase.StatementType.CREATE_STATEMENT;
-import static org.apache.flink.table.store.connector.TableStoreITCase.StatementType.DROP_STATEMENT;
-import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
-import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
-import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
-import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
-import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** End-to-end tests for table store. */
-@RunWith(Parameterized.class)
-public class TableStoreITCase extends KafkaTableTestBase {
-
- private static final String CURRENT_CATALOG = "catalog";
- private static final String CURRENT_DATABASE = "database";
-
- private final ObjectIdentifier tableIdentifier;
- private final StatementType statementType;
- private final boolean enableChangeTracking;
- private final boolean ignoreException;
- private final ExpectedResult expectedResult;
-
- private String rootPath;
- private ResolvedCatalogTable resolvedTable;
- @Rule public TestName name = new TestName();
-
- public TableStoreITCase(
- String tableName,
- StatementType statementType,
- boolean enableChangeTracking,
- boolean ignoreException,
- ExpectedResult expectedResult) {
- this.tableIdentifier = ObjectIdentifier.of(CURRENT_CATALOG, CURRENT_DATABASE, tableName);
- this.statementType = statementType;
- this.enableChangeTracking = enableChangeTracking;
- this.ignoreException = ignoreException;
- this.expectedResult = expectedResult;
- }
-
- @Parameterized.Parameters(
- name =
- "tableName-{0}, statementType-{1}, enableChangeTracking-{2}, ignoreException-{3}, expectedResult-{4}")
- public static Collection<Object[]> data() {
- return Stream.concat(prepareCreateTableSpecs().stream(), prepareDropTableSpecs().stream())
- .collect(Collectors.toList());
- }
-
- @Before
- @Override
- public void setup() {
- super.setup();
- ((TableEnvironmentImpl) tEnv)
- .getCatalogManager()
- .registerCatalog(
- CURRENT_CATALOG,
- new GenericInMemoryCatalog(CURRENT_CATALOG, CURRENT_DATABASE));
- tEnv.useCatalog(CURRENT_CATALOG);
- resolvedTable =
- createResolvedTable(
- Collections.emptyMap(),
- RowType.of(new IntType(), new VarCharType()),
- new int[0]);
- try {
- rootPath = TEMPORARY_FOLDER.newFolder().getPath();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- prepareSessionContext();
- // match parameter type with test name to conditionally skip before setup, because junit4
- // doesn't support multiple data providers for different methods
- if (name.getMethodName().startsWith("testCreateTable")
- && statementType == CREATE_STATEMENT) {
- prepareEnvForCreateTable();
- } else if (name.getMethodName().startsWith("testDropTable")
- && statementType == DROP_STATEMENT) {
- prepareEnvForDropTable();
- }
- }
-
- @Test
- public void testCreateTable() {
- Assume.assumeTrue(statementType == CREATE_STATEMENT);
- final String ddl =
- String.format(
- "CREATE TABLE%s%s (f0 INT, f1 STRING)\n",
- ignoreException ? " IF NOT EXISTS " : " ",
- tableIdentifier.asSerializableString());
- if (expectedResult.success) {
- tEnv.executeSql(ddl);
- // check catalog
- assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier))
- .isPresent();
- // check table store
- assertThat(Paths.get(rootPath, getRelativeFileStoreTablePath(tableIdentifier)).toFile())
- .exists();
- // check log store
- assertThat(topicExists(tableIdentifier.asSummaryString()))
- .isEqualTo(enableChangeTracking);
- } else {
- // check inconsistency between catalog/file store/log store
- assertThat(ignoreException).isFalse();
- assertThatThrownBy(() -> tEnv.executeSql(ddl))
- .getCause()
- .isInstanceOf(expectedResult.expectedType)
- .hasMessageContaining(expectedResult.expectedMessage);
-
- if (expectedResult.expectedMessage.contains(
- String.format("already exists in Catalog %s", CURRENT_CATALOG))) {
- assertThat(
- ((TableEnvironmentImpl) tEnv)
- .getCatalogManager()
- .getTable(tableIdentifier))
- .isPresent();
- } else {
- // throw exception when creating file path/topic, and catalog meta does not exist
- assertThat(
- ((TableEnvironmentImpl) tEnv)
- .getCatalogManager()
- .getTable(tableIdentifier))
- .isNotPresent();
- }
- }
- }
-
- @Test
- public void testDropTable() {
- Assume.assumeTrue(statementType == DROP_STATEMENT);
- String ddl =
- String.format(
- "DROP TABLE%s%s\n",
- ignoreException ? " IF EXISTS " : " ",
- tableIdentifier.asSerializableString());
-
- if (expectedResult.success) {
- tEnv.executeSql(ddl);
- // check catalog
- assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier))
- .isNotPresent();
- // check table store
- assertThat(Paths.get(rootPath, getRelativeFileStoreTablePath(tableIdentifier)).toFile())
- .doesNotExist();
- // check log store
- assertThat(topicExists(tableIdentifier.asSummaryString())).isFalse();
- } else {
- // check inconsistency between catalog/file store/log store
- assertThat(ignoreException).isFalse();
- if (ValidationException.class.isAssignableFrom(expectedResult.expectedType)) {
- // successfully delete path/topic, but schema doesn't exist in catalog
- assertThatThrownBy(() -> tEnv.executeSql(ddl))
- .isInstanceOf(expectedResult.expectedType)
- .hasMessageContaining(expectedResult.expectedMessage);
- assertThat(
- ((TableEnvironmentImpl) tEnv)
- .getCatalogManager()
- .getTable(tableIdentifier))
- .isNotPresent();
- } else {
- assertThatThrownBy(() -> tEnv.executeSql(ddl))
- .getCause()
- .isInstanceOf(expectedResult.expectedType)
- .hasMessageContaining(expectedResult.expectedMessage);
- // throw exception when deleting file path/topic, so schema still exists in catalog
- assertThat(
- ((TableEnvironmentImpl) tEnv)
- .getCatalogManager()
- .getTable(tableIdentifier))
- .isPresent();
- }
- }
- }
-
- // ~ Tools ------------------------------------------------------------------
-
- private static List<Object[]> prepareCreateTableSpecs() {
- List<Object[]> specs = new ArrayList<>();
- // successful case specs
- specs.add(
- new Object[] {
- "table_" + UUID.randomUUID(),
- CREATE_STATEMENT,
- true,
- true,
- new ExpectedResult().success(true)
- });
- specs.add(
- new Object[] {
- "table_" + UUID.randomUUID(),
- CREATE_STATEMENT,
- false,
- true,
- new ExpectedResult().success(true)
- });
- specs.add(
- new Object[] {
- "table_" + UUID.randomUUID(),
- CREATE_STATEMENT,
- true,
- false,
- new ExpectedResult().success(true)
- });
- specs.add(
- new Object[] {
- "table_" + UUID.randomUUID(),
- CREATE_STATEMENT,
- false,
- false,
- new ExpectedResult().success(true)
- });
-
- // failed case specs
- specs.add(
- new Object[] {
- "table_" + UUID.randomUUID(),
- CREATE_STATEMENT,
- false,
- false,
- new ExpectedResult()
- .success(false)
- .expectedType(TableException.class)
- .expectedMessage("Failed to create file store path.")
- });
- specs.add(
- new Object[] {
- "table_" + UUID.randomUUID(),
- CREATE_STATEMENT,
- true,
- false,
- new ExpectedResult()
- .success(false)
- .expectedType(TableException.class)
- .expectedMessage("Failed to create kafka topic.")
- });
- final String tableName = "table_" + UUID.randomUUID();
- specs.add(
- new Object[] {
- tableName,
- CREATE_STATEMENT,
- true,
- false,
- new ExpectedResult()
- .success(false)
- .expectedType(TableAlreadyExistException.class)
- .expectedMessage(
- String.format(
- "Table (or view) %s already exists in Catalog %s.",
- ObjectIdentifier.of(
- CURRENT_CATALOG,
- CURRENT_DATABASE,
- tableName)
- .toObjectPath()
- .getFullName(),
- CURRENT_CATALOG))
- });
- return specs;
- }
-
- private static List<Object[]> prepareDropTableSpecs() {
- List<Object[]> specs = new ArrayList<>();
- // successful case specs
- specs.add(
- new Object[] {
- "table_" + UUID.randomUUID(),
- DROP_STATEMENT,
- true,
- true,
- new ExpectedResult().success(true)
- });
- specs.add(
- new Object[] {
- "table_" + UUID.randomUUID(),
- DROP_STATEMENT,
- false,
- true,
- new ExpectedResult().success(true)
- });
- specs.add(
- new Object[] {
- "table_" + UUID.randomUUID(),
- DROP_STATEMENT,
- true,
- false,
- new ExpectedResult().success(true)
- });
- specs.add(
- new Object[] {
- "table_" + UUID.randomUUID(),
- DROP_STATEMENT,
- false,
- false,
- new ExpectedResult().success(true)
- });
-
- // failed case specs
- specs.add(
- new Object[] {
- "table_" + UUID.randomUUID(),
- DROP_STATEMENT,
- false,
- false,
- new ExpectedResult()
- .success(false)
- .expectedType(TableException.class)
- .expectedMessage("Failed to delete file store path.")
- });
- specs.add(
- new Object[] {
- "table_" + UUID.randomUUID(),
- DROP_STATEMENT,
- true,
- false,
- new ExpectedResult()
- .success(false)
- .expectedType(TableException.class)
- .expectedMessage("Failed to delete kafka topic.")
- });
- final String tableName = "table_" + UUID.randomUUID();
- specs.add(
- new Object[] {
- tableName,
- DROP_STATEMENT,
- true,
- false,
- new ExpectedResult()
- .success(false)
- .expectedType(ValidationException.class)
- .expectedMessage(
- String.format(
- "Table with identifier '%s' does not exist.",
- ObjectIdentifier.of(
- CURRENT_CATALOG,
- CURRENT_DATABASE,
- tableName)
- .asSummaryString()))
- });
- return specs;
- }
-
- private void prepareSessionContext() {
- Configuration configuration = tEnv.getConfig().getConfiguration();
- configuration.setString(TABLE_STORE_PREFIX + FILE_PATH.key(), rootPath);
- configuration.setString(
- TABLE_STORE_PREFIX + LOG_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers());
- configuration.setBoolean(TABLE_STORE_PREFIX + CHANGE_TRACKING.key(), enableChangeTracking);
- }
-
- private void prepareEnvForCreateTable() {
- if (expectedResult.success) {
- // ensure catalog doesn't contain the table meta
- tEnv.getCatalog(tEnv.getCurrentCatalog())
- .ifPresent(
- (catalog) -> {
- try {
- catalog.dropTable(tableIdentifier.toObjectPath(), false);
- } catch (TableNotExistException ignored) {
- // ignored
- }
- });
- // ensure log store doesn't exist the topic
- if (enableChangeTracking && !ignoreException) {
- deleteTopicIfExists(tableIdentifier.asSummaryString());
- }
- } else if (expectedResult.expectedMessage.startsWith("Failed to create file store path.")) {
- // failed when creating file store
- Paths.get(rootPath, getRelativeFileStoreTablePath(tableIdentifier)).toFile().mkdirs();
- } else if (expectedResult.expectedMessage.startsWith("Failed to create kafka topic.")) {
- // failed when creating log store
- createTopicIfNotExists(tableIdentifier.asSummaryString(), BUCKET.defaultValue());
- } else {
- // failed when registering schema to catalog
- tEnv.getCatalog(tEnv.getCurrentCatalog())
- .ifPresent(
- (catalog) -> {
- try {
- catalog.createTable(
- tableIdentifier.toObjectPath(), resolvedTable, false);
- } catch (TableAlreadyExistException
- | DatabaseNotExistException ignored) {
- // ignored
- }
- });
- }
- }
-
- private void prepareEnvForDropTable() {
- ((TableEnvironmentImpl) tEnv)
- .getCatalogManager()
- .createTable(resolvedTable, tableIdentifier, false);
- if (expectedResult.success) {
- if (ignoreException) {
- // delete catalog schema does not affect dropping the table
- tEnv.getCatalog(tEnv.getCurrentCatalog())
- .ifPresent(
- (catalog) -> {
- try {
- catalog.dropTable(tableIdentifier.toObjectPath(), false);
- } catch (TableNotExistException ignored) {
- // ignored
- }
- });
- // delete file store path does not affect dropping the table
- deleteTablePath();
- // delete log store topic does not affect dropping the table
- if (enableChangeTracking) {
- deleteTopicIfExists(tableIdentifier.asSummaryString());
- }
- }
- } else if (expectedResult.expectedMessage.startsWith("Failed to delete file store path.")) {
- // failed when deleting file path
- deleteTablePath();
- } else if (expectedResult.expectedMessage.startsWith("Failed to delete kafka topic.")) {
- // failed when deleting topic
- deleteTopicIfExists(tableIdentifier.asSummaryString());
- } else {
- // failed when dropping catalog schema
- tEnv.getCatalog(tEnv.getCurrentCatalog())
- .ifPresent(
- (catalog) -> {
- try {
- catalog.dropTable(tableIdentifier.toObjectPath(), false);
- } catch (TableNotExistException ignored) {
- // ignored
- }
- });
- }
- }
-
- private void deleteTablePath() {
- try {
- FileUtils.deleteDirectory(
- Paths.get(rootPath, getRelativeFileStoreTablePath(tableIdentifier)).toFile());
- } catch (IOException ignored) {
- // ignored
- }
- }
-
- static ResolvedCatalogTable createResolvedTable(
- Map<String, String> options, RowType rowType, int[] pk) {
- List<String> fieldNames = rowType.getFieldNames();
- List<DataType> fieldDataTypes =
- rowType.getChildren().stream()
- .map(TypeConversions::fromLogicalToDataType)
- .collect(Collectors.toList());
- CatalogTable origin =
- CatalogTable.of(
- Schema.newBuilder().fromFields(fieldNames, fieldDataTypes).build(),
- "a comment",
- Collections.emptyList(),
- options);
- List<Column> resolvedColumns =
- IntStream.range(0, fieldNames.size())
- .mapToObj(i -> Column.physical(fieldNames.get(i), fieldDataTypes.get(i)))
- .collect(Collectors.toList());
- UniqueConstraint constraint = null;
- if (pk.length > 0) {
- List<String> pkNames =
- Arrays.stream(pk).mapToObj(fieldNames::get).collect(Collectors.toList());
- constraint = UniqueConstraint.primaryKey("pk", pkNames);
- }
- return new ResolvedCatalogTable(
- origin, new ResolvedSchema(resolvedColumns, Collections.emptyList(), constraint));
- }
-
- static String getRelativeFileStoreTablePath(ObjectIdentifier tableIdentifier) {
- return String.format(
- "root/%s.catalog/%s.db/%s",
- tableIdentifier.getCatalogName(),
- tableIdentifier.getDatabaseName(),
- tableIdentifier.getObjectName());
- }
-
- enum StatementType {
- CREATE_STATEMENT,
- DROP_STATEMENT
- }
-
- private static class ExpectedResult {
- private boolean success;
- private Class<? extends Throwable> expectedType;
- private String expectedMessage;
-
- ExpectedResult success(boolean success) {
- this.success = success;
- return this;
- }
-
- ExpectedResult expectedType(Class<? extends Throwable> exceptionClazz) {
- this.expectedType = exceptionClazz;
- return this;
- }
-
- ExpectedResult expectedMessage(String exceptionMessage) {
- this.expectedMessage = exceptionMessage;
- return this;
- }
-
- @Override
- public String toString() {
- return "ExpectedResult{"
- + "success="
- + success
- + ", expectedType="
- + expectedType
- + ", expectedMessage='"
- + expectedMessage
- + '\''
- + '}';
- }
- }
-}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
new file mode 100644
index 0000000..4abc7f9
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.store.kafka.KafkaTableTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.CHANGE_TRACKING;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
+import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
+
+/** End-to-end test base for table store. */
+public abstract class TableStoreTestBase extends KafkaTableTestBase {
+
+ public static final String CURRENT_CATALOG = "catalog";
+ public static final String CURRENT_DATABASE = "database";
+
+ protected final RuntimeExecutionMode executionMode;
+ protected final ObjectIdentifier tableIdentifier;
+ protected final boolean enableChangeTracking;
+ protected final ExpectedResult expectedResult;
+
+ protected String rootPath;
+
+ public TableStoreTestBase(
+ RuntimeExecutionMode executionMode,
+ String tableName,
+ boolean enableChangeTracking,
+ ExpectedResult expectedResult) {
+ this.executionMode = executionMode;
+ this.tableIdentifier = ObjectIdentifier.of(CURRENT_CATALOG, CURRENT_DATABASE, tableName);
+ this.enableChangeTracking = enableChangeTracking;
+ this.expectedResult = expectedResult;
+ }
+
+ protected abstract void prepareEnv();
+
+ @Override
+ @Before
+ public void setup() {
+ super.setup();
+ env.setRuntimeMode(executionMode);
+ if (executionMode == RuntimeExecutionMode.STREAMING) {
+ env.enableCheckpointing(100);
+ }
+ tEnv = StreamTableEnvironment.create(env);
+ ((TableEnvironmentImpl) tEnv)
+ .getCatalogManager()
+ .registerCatalog(
+ CURRENT_CATALOG,
+ new GenericInMemoryCatalog(CURRENT_CATALOG, CURRENT_DATABASE));
+ tEnv.useCatalog(CURRENT_CATALOG);
+ try {
+ rootPath = TEMPORARY_FOLDER.newFolder().getPath();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ prepareSessionContext();
+ prepareEnv();
+ }
+
+ protected void prepareSessionContext() {
+ Configuration configuration = tEnv.getConfig().getConfiguration();
+ configuration.setString(TABLE_STORE_PREFIX + FILE_PATH.key(), rootPath);
+ configuration.setString(
+ TABLE_STORE_PREFIX + LOG_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers());
+ configuration.setBoolean(TABLE_STORE_PREFIX + CHANGE_TRACKING.key(), enableChangeTracking);
+ }
+
+ protected static ResolvedCatalogTable createResolvedTable(
+ Map<String, String> options, RowType rowType, List<String> partitionKeys, int[] pk) {
+ List<String> fieldNames = rowType.getFieldNames();
+ List<DataType> fieldDataTypes =
+ rowType.getChildren().stream()
+ .map(TypeConversions::fromLogicalToDataType)
+ .collect(Collectors.toList());
+ CatalogTable origin =
+ CatalogTable.of(
+ Schema.newBuilder().fromFields(fieldNames, fieldDataTypes).build(),
+ "a comment",
+ partitionKeys,
+ options);
+ List<Column> resolvedColumns =
+ IntStream.range(0, fieldNames.size())
+ .mapToObj(i -> Column.physical(fieldNames.get(i), fieldDataTypes.get(i)))
+ .collect(Collectors.toList());
+ UniqueConstraint constraint = null;
+ if (pk.length > 0) {
+ List<String> pkNames =
+ Arrays.stream(pk).mapToObj(fieldNames::get).collect(Collectors.toList());
+ constraint = UniqueConstraint.primaryKey("pk", pkNames);
+ }
+ return new ResolvedCatalogTable(
+ origin, new ResolvedSchema(resolvedColumns, Collections.emptyList(), constraint));
+ }
+
+ protected void deleteTablePath() {
+ FileUtils.deleteQuietly(
+ Paths.get(rootPath, getRelativeFileStoreTablePath(tableIdentifier)).toFile());
+ }
+
+ protected static String getRelativeFileStoreTablePath(ObjectIdentifier tableIdentifier) {
+ return String.format(
+ "root/%s.catalog/%s.db/%s",
+ tableIdentifier.getCatalogName(),
+ tableIdentifier.getDatabaseName(),
+ tableIdentifier.getObjectName());
+ }
+
+ /** Expected result wrapper. */
+ protected static class ExpectedResult {
+ protected boolean success;
+ protected List<Row> expectedRecords;
+ protected Class<? extends Throwable> expectedType;
+ protected String expectedMessage;
+
+ ExpectedResult success(boolean success) {
+ this.success = success;
+ return this;
+ }
+
+ ExpectedResult expectedRecords(List<Row> expectedRecords) {
+ this.expectedRecords = expectedRecords;
+ return this;
+ }
+
+ ExpectedResult expectedType(Class<? extends Throwable> exceptionClazz) {
+ this.expectedType = exceptionClazz;
+ return this;
+ }
+
+ ExpectedResult expectedMessage(String exceptionMessage) {
+ this.expectedMessage = exceptionMessage;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "ExpectedResult{"
+ + "success="
+ + success
+ + ", expectedRecords="
+ + expectedRecords
+ + ", expectedType="
+ + expectedType
+ + ", expectedMessage='"
+ + expectedMessage
+ + '\''
+ + '}';
+ }
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
index d6d1ad9..4cd6170 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
@@ -23,6 +23,9 @@ import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
+import java.util.HashSet;
+import java.util.Set;
+
/** Options for merge tree. */
public class MergeTreeOptions {
@@ -129,4 +132,17 @@ public class MergeTreeOptions {
config.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT),
config.get(COMPACTION_SIZE_RATIO));
}
+
+ public static Set<ConfigOption<?>> allOptions() {
+ Set<ConfigOption<?>> allOptions = new HashSet<>();
+ allOptions.add(WRITE_BUFFER_SIZE);
+ allOptions.add(PAGE_SIZE);
+ allOptions.add(TARGET_FILE_SIZE);
+ allOptions.add(NUM_SORTED_RUNS_MAX);
+ allOptions.add(NUM_LEVELS);
+ allOptions.add(COMMIT_FORCE_COMPACT);
+ allOptions.add(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT);
+ allOptions.add(COMPACTION_SIZE_RATIO);
+ return allOptions;
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
index f501f59..2875e4e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionVisitor;
import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.TypeLiteralExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
@@ -29,6 +30,8 @@ import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
+import javax.annotation.Nullable;
+
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
@@ -174,6 +177,16 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
throw new RuntimeException("Unsupported expression: " + expression.asSummaryString());
}
+ @Nullable
+ public static Predicate convert(List<ResolvedExpression> filters) {
+ return filters != null
+ ? filters.stream()
+ .map(filter -> filter.accept(PredicateConverter.CONVERTER))
+ .reduce(And::new)
+ .orElse(null)
+ : null;
+ }
+
/** Encounter an unsupported expression, the caller can choose to ignore this filter branch. */
public static class UnsupportedExpression extends RuntimeException {}
}