You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/23 07:11:49 UTC

[flink-table-store] branch release-0.2 updated: [FLINK-28642] Cannot create sink for Temporary table in table store catalog

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new 8fa38421 [FLINK-28642] Cannot create sink for Temporary table in table store catalog
8fa38421 is described below

commit 8fa384217f974f69356f301ee5801220146b8e95
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Sat Jul 23 15:11:44 2022 +0800

    [FLINK-28642] Cannot create sink for Temporary table in table store catalog
    
    This closes #232
---
 .../store/connector/AbstractTableStoreFactory.java |  6 ++--
 .../connector/TableStoreConnectorFactory.java      | 37 ++++++++++++++++++++--
 2 files changed, 39 insertions(+), 4 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
index 4f729bf6..34c71100 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
@@ -58,7 +60,7 @@ public abstract class AbstractTableStoreFactory
         implements DynamicTableSourceFactory, DynamicTableSinkFactory {
 
     @Override
-    public TableStoreSource createDynamicTableSource(Context context) {
+    public DynamicTableSource createDynamicTableSource(Context context) {
         return new TableStoreSource(
                 context.getObjectIdentifier(),
                 buildFileStoreTable(context),
@@ -69,7 +71,7 @@ public abstract class AbstractTableStoreFactory
     }
 
     @Override
-    public TableStoreSink createDynamicTableSink(Context context) {
+    public DynamicTableSink createDynamicTableSink(Context context) {
         return new TableStoreSink(
                 context.getObjectIdentifier(),
                 buildFileStoreTable(context),
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
index 145cff7a..69e738e7 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.table.store.connector;
 
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.store.connector.sink.TableStoreSink;
 import org.apache.flink.table.store.file.catalog.CatalogLock;
 
@@ -45,9 +48,39 @@ public class TableStoreConnectorFactory extends AbstractTableStoreFactory {
     }
 
     @Override
-    public TableStoreSink createDynamicTableSink(Context context) {
-        TableStoreSink sink = super.createDynamicTableSink(context);
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        if (isFlinkTable(context)) {
+            // only Flink 1.14 temporary table will come here
+            return FactoryUtil.createTableSource(
+                    null,
+                    context.getObjectIdentifier(),
+                    context.getCatalogTable(),
+                    context.getConfiguration(),
+                    context.getClassLoader(),
+                    context.isTemporary());
+        }
+        return super.createDynamicTableSource(context);
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        if (isFlinkTable(context)) {
+            // only Flink 1.14 temporary table will come here
+            return FactoryUtil.createTableSink(
+                    null,
+                    context.getObjectIdentifier(),
+                    context.getCatalogTable(),
+                    context.getConfiguration(),
+                    context.getClassLoader(),
+                    context.isTemporary());
+        }
+        TableStoreSink sink = (TableStoreSink) super.createDynamicTableSink(context);
         sink.setLockFactory(lockFactory);
         return sink;
     }
+
+    private boolean isFlinkTable(Context context) {
+        String identifier = context.getCatalogTable().getOptions().get(FactoryUtil.CONNECTOR.key());
+        return identifier != null && !IDENTIFIER.equals(identifier);
+    }
 }