You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/23 04:56:51 UTC

[flink] branch release-1.15 updated: [FLINK-26805][table] Managed table breaks legacy connector without 'connector.type'

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 6e63e6c  [FLINK-26805][table] Managed table breaks legacy connector without 'connector.type'
6e63e6c is described below

commit 6e63e6c2ab074f070389a0eae181269cfbc82772
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed Mar 23 12:55:47 2022 +0800

    [FLINK-26805][table] Managed table breaks legacy connector without 'connector.type'
    
    This closes #19201
---
 .../flink/table/catalog/ManagedTableListener.java  | 16 +++--
 .../flink/table/factories/TableFactoryUtil.java    | 35 ++++++++++
 .../table/planner/delegation/PlannerBase.scala     | 58 ++++------------
 .../connector/file/table/LegacyTableFactory.java   | 78 ++++++++++++++++++++++
 .../plan/stream/sql/LegacyTableFactoryTest.java    | 46 +++++++++++++
 .../org.apache.flink.table.factories.TableFactory  |  1 +
 .../plan/stream/sql/LegacyTableFactoryTest.xml     | 52 +++++++++++++++
 7 files changed, 238 insertions(+), 48 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
index d2598f4..80f7c95 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
@@ -20,13 +20,14 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
@@ -110,7 +111,8 @@ public class ManagedTableListener {
             return false;
         }
 
-        if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+        if (table.getTableKind() != CatalogBaseTable.TableKind.TABLE
+                || !(table instanceof CatalogTable)) {
             // view is not managed table
             return false;
         }
@@ -123,8 +125,14 @@ public class ManagedTableListener {
             return false;
         }
 
-        if (!StringUtils.isNullOrWhitespaceOnly(
-                options.get(ConnectorDescriptorValidator.CONNECTOR_TYPE))) {
+        // check legacy connector, here we need to check the factory, other properties are dummy
+        if (TableFactoryUtil.isLegacyConnectorOptions(
+                catalog,
+                new Configuration(),
+                true,
+                ObjectIdentifier.of("dummy_catalog", "dummy_database", "dummy_table"),
+                (CatalogTable) table,
+                true)) {
             // legacy connector is not managed table
             return false;
         }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index 7adc476..440e568 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 
@@ -129,4 +131,37 @@ public class TableFactoryUtil {
         }
         return Optional.empty();
     }
+
+    /** Checks whether the {@link CatalogTable} uses legacy connector sink options. */
+    public static boolean isLegacyConnectorOptions(
+            @Nullable Catalog catalog,
+            ReadableConfig configuration,
+            boolean isStreamingMode,
+            ObjectIdentifier objectIdentifier,
+            CatalogTable catalogTable,
+            boolean isTemporary) {
+        // normalize option keys
+        DescriptorProperties properties = new DescriptorProperties(true);
+        properties.putProperties(catalogTable.getOptions());
+        if (properties.containsKey(ConnectorDescriptorValidator.CONNECTOR_TYPE)) {
+            return true;
+        } else {
+            try {
+                // try to create legacy table source using the options,
+                // some legacy factories may use the 'type' key
+                TableFactoryUtil.findAndCreateTableSink(
+                        catalog,
+                        objectIdentifier,
+                        catalogTable,
+                        configuration,
+                        isStreamingMode,
+                        isTemporary);
+                // success, then we will use the legacy factories
+                return true;
+            } catch (Throwable ignore) {
+                // fail, then we will use new factories
+                return false;
+            }
+        }
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index a3a5c24..5994292 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -18,18 +18,23 @@
 
 package org.apache.flink.table.planner.delegation
 
+import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
+import org.apache.calcite.plan.{RelTrait, RelTraitDef}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.hint.RelHint
+import org.apache.calcite.rel.logical.LogicalTableModify
+import org.apache.calcite.tools.FrameworkConfig
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.configuration.ReadableConfig
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.graph.StreamGraph
 import org.apache.flink.table.api._
-import org.apache.flink.table.api.config.{ExecutionConfigOptions, TableConfigOptions}
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.catalog.ManagedTableListener.isManagedTable
 import org.apache.flink.table.catalog._
 import org.apache.flink.table.connector.sink.DynamicTableSink
 import org.apache.flink.table.delegation.{Executor, Parser, Planner}
-import org.apache.flink.table.descriptors.{ConnectorDescriptorValidator, DescriptorProperties}
 import org.apache.flink.table.factories.{DynamicTableSinkFactory, FactoryUtil, TableFactoryUtil}
 import org.apache.flink.table.module.{Module, ModuleManager}
 import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
@@ -59,17 +64,9 @@ import org.apache.flink.table.runtime.generated.CompileUtils
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
 
-import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
-import org.apache.calcite.plan.{RelTrait, RelTraitDef}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.hint.RelHint
-import org.apache.calcite.rel.logical.LogicalTableModify
-import org.apache.calcite.tools.FrameworkConfig
-
 import java.lang.{Long => JLong}
 import java.util
 import java.util.{Collections, TimeZone}
-
 import _root_.scala.collection.JavaConversions._
 import scala.collection.mutable
 
@@ -386,7 +383,13 @@ abstract class PlannerBase(
         }
 
         if (!contextResolvedTable.isAnonymous &&
-          isLegacyConnectorOptions(objectIdentifier, resolvedTable.getOrigin, isTemporary)) {
+          TableFactoryUtil.isLegacyConnectorOptions(
+            catalogManager.getCatalog(objectIdentifier.getCatalogName).orElse(null),
+            tableConfig,
+            isStreamingMode,
+            objectIdentifier,
+            resolvedTable.getOrigin,
+            isTemporary)) {
           val tableSink = TableFactoryUtil.findAndCreateTableSink(
             catalog.orNull,
             objectIdentifier,
@@ -423,39 +426,6 @@ abstract class PlannerBase(
     }
   }
 
-  /**
-   * Checks whether the [[CatalogTable]] uses legacy connector sink options.
-   */
-  private def isLegacyConnectorOptions(
-      objectIdentifier: ObjectIdentifier,
-      catalogTable: CatalogTable,
-      isTemporary: Boolean) = {
-    // normalize option keys
-    val properties = new DescriptorProperties(true)
-    properties.putProperties(catalogTable.getOptions)
-    if (properties.containsKey(ConnectorDescriptorValidator.CONNECTOR_TYPE)) {
-      true
-    } else {
-      val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName)
-      try {
-        // try to create legacy table source using the options,
-        // some legacy factories uses the new 'connector' key
-        TableFactoryUtil.findAndCreateTableSink(
-          catalog.orElse(null),
-          objectIdentifier,
-          catalogTable,
-          getTableConfig,
-          isStreamingMode,
-          isTemporary)
-        // success, then we will use the legacy factories
-        true
-      } catch {
-        // fail, then we will use new factories
-        case _: Throwable => false
-      }
-    }
-  }
-
   protected def createSerdeContext: SerdeContext = {
     val planner = createFlinkPlanner
     new SerdeContext(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/LegacyTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/LegacyTableFactory.java
new file mode 100644
index 0000000..c0b0389
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/LegacyTableFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.planner.runtime.utils.TestingAppendTableSink;
+import org.apache.flink.table.planner.utils.TestTableSource;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/** A legacy {@link TableFactory} uses user define options. */
+public class LegacyTableFactory
+        implements StreamTableSinkFactory<Row>, StreamTableSourceFactory<Row> {
+
+    @Override
+    public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
+        DescriptorProperties dp = new DescriptorProperties();
+        dp.putProperties(properties);
+        TableSchema tableSchema = dp.getTableSchema(SCHEMA);
+        StreamTableSink<Row> sink = new TestingAppendTableSink();
+        return (StreamTableSink)
+                sink.configure(tableSchema.getFieldNames(), tableSchema.getFieldTypes());
+    }
+
+    @Override
+    public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
+        DescriptorProperties dp = new DescriptorProperties();
+        dp.putProperties(properties);
+        TableSchema tableSchema = dp.getTableSchema(SCHEMA);
+        return new TestTableSource(false, tableSchema);
+    }
+
+    @Override
+    public Map<String, String> requiredContext() {
+        Map<String, String> options = new HashMap<>();
+        options.put("type", "legacy");
+        return options;
+    }
+
+    @Override
+    public List<String> supportedProperties() {
+        List<String> properties = new ArrayList<>();
+        // schema
+        properties.add(SCHEMA + ".#." + DescriptorProperties.TYPE);
+        properties.add(SCHEMA + ".#." + DescriptorProperties.DATA_TYPE);
+        properties.add(SCHEMA + ".#." + DescriptorProperties.NAME);
+        properties.add(SCHEMA + ".#." + DescriptorProperties.EXPR);
+        return properties;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/LegacyTableFactoryTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/LegacyTableFactoryTest.java
new file mode 100644
index 0000000..2984e74
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/LegacyTableFactoryTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.sql;
+
+import org.apache.flink.connector.file.table.LegacyTableFactory;
+import org.apache.flink.table.planner.utils.JavaStreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Test;
+
+/** Tests for usages of {@link LegacyTableFactory}. */
+public class LegacyTableFactoryTest extends TableTestBase {
+
+    private final JavaStreamTableTestUtil util;
+
+    public LegacyTableFactoryTest() {
+        util = javaStreamTestUtil();
+        util.tableEnv().executeSql("CREATE TABLE T (a INT) WITH ('type'='legacy')");
+    }
+
+    @Test
+    public void testSelect() {
+        util.verifyExecPlan("SELECT * FROM T");
+    }
+
+    @Test
+    public void testInsert() {
+        util.verifyExecPlanInsert("INSERT INTO T VALUES (1)");
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 9a8051b..b929b23 100644
--- a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -35,3 +35,4 @@ org.apache.flink.table.planner.utils.TestDataTypeTableSourceWithTimeFactory
 org.apache.flink.table.planner.utils.TestStreamTableSourceFactory
 org.apache.flink.table.planner.utils.TestFileInputFormatTableSourceFactory
 org.apache.flink.table.planner.utils.TestTableSourceWithTimeFactory
+org.apache.flink.connector.file.table.LegacyTableFactory
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/LegacyTableFactoryTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/LegacyTableFactoryTest.xml
new file mode 100644
index 0000000..9993c59
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/LegacyTableFactoryTest.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testInsert">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO T VALUES (1)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`T`], fields=[a])
++- LogicalValues(tuples=[[{ 1 }]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+LegacySink(name=[`default_catalog`.`default_database`.`T`], fields=[a])
++- Values(tuples=[[{ 1 }]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSelect">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0])
++- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a)]]], fields=[a])
+]]>
+    </Resource>
+  </TestCase>
+</Root>