You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/23 07:11:49 UTC
[flink-table-store] branch release-0.2 updated: [FLINK-28642] Cannot create sink for Temporary table in table store catalog
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new 8fa38421 [FLINK-28642] Cannot create sink for Temporary table in table store catalog
8fa38421 is described below
commit 8fa384217f974f69356f301ee5801220146b8e95
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Sat Jul 23 15:11:44 2022 +0800
[FLINK-28642] Cannot create sink for Temporary table in table store catalog
This closes #232
---
.../store/connector/AbstractTableStoreFactory.java | 6 ++--
.../connector/TableStoreConnectorFactory.java | 37 ++++++++++++++++++++--
2 files changed, 39 insertions(+), 4 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
index 4f729bf6..34c71100 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
@@ -58,7 +60,7 @@ public abstract class AbstractTableStoreFactory
implements DynamicTableSourceFactory, DynamicTableSinkFactory {
@Override
- public TableStoreSource createDynamicTableSource(Context context) {
+ public DynamicTableSource createDynamicTableSource(Context context) {
return new TableStoreSource(
context.getObjectIdentifier(),
buildFileStoreTable(context),
@@ -69,7 +71,7 @@ public abstract class AbstractTableStoreFactory
}
@Override
- public TableStoreSink createDynamicTableSink(Context context) {
+ public DynamicTableSink createDynamicTableSink(Context context) {
return new TableStoreSink(
context.getObjectIdentifier(),
buildFileStoreTable(context),
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
index 145cff7a..69e738e7 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
@@ -18,7 +18,10 @@
package org.apache.flink.table.store.connector;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.store.connector.sink.TableStoreSink;
import org.apache.flink.table.store.file.catalog.CatalogLock;
@@ -45,9 +48,39 @@ public class TableStoreConnectorFactory extends AbstractTableStoreFactory {
}
@Override
- public TableStoreSink createDynamicTableSink(Context context) {
- TableStoreSink sink = super.createDynamicTableSink(context);
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ if (isFlinkTable(context)) {
+ // only Flink 1.14 temporary table will come here
+ return FactoryUtil.createTableSource(
+ null,
+ context.getObjectIdentifier(),
+ context.getCatalogTable(),
+ context.getConfiguration(),
+ context.getClassLoader(),
+ context.isTemporary());
+ }
+ return super.createDynamicTableSource(context);
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ if (isFlinkTable(context)) {
+ // only Flink 1.14 temporary table will come here
+ return FactoryUtil.createTableSink(
+ null,
+ context.getObjectIdentifier(),
+ context.getCatalogTable(),
+ context.getConfiguration(),
+ context.getClassLoader(),
+ context.isTemporary());
+ }
+ TableStoreSink sink = (TableStoreSink) super.createDynamicTableSink(context);
sink.setLockFactory(lockFactory);
return sink;
}
+
+ private boolean isFlinkTable(Context context) {
+ String identifier = context.getCatalogTable().getOptions().get(FactoryUtil.CONNECTOR.key());
+ return identifier != null && !IDENTIFIER.equals(identifier);
+ }
}