You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/23 04:56:51 UTC
[flink] branch release-1.15 updated: [FLINK-26805][table] Managed table breaks legacy connector without 'connector.type'
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 6e63e6c [FLINK-26805][table] Managed table breaks legacy connector without 'connector.type'
6e63e6c is described below
commit 6e63e6c2ab074f070389a0eae181269cfbc82772
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed Mar 23 12:55:47 2022 +0800
[FLINK-26805][table] Managed table breaks legacy connector without 'connector.type'
This closes #19201
---
.../flink/table/catalog/ManagedTableListener.java | 16 +++--
.../flink/table/factories/TableFactoryUtil.java | 35 ++++++++++
.../table/planner/delegation/PlannerBase.scala | 58 ++++------------
.../connector/file/table/LegacyTableFactory.java | 78 ++++++++++++++++++++++
.../plan/stream/sql/LegacyTableFactoryTest.java | 46 +++++++++++++
.../org.apache.flink.table.factories.TableFactory | 1 +
.../plan/stream/sql/LegacyTableFactoryTest.xml | 52 +++++++++++++++
7 files changed, 238 insertions(+), 48 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
index d2598f4..80f7c95 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ManagedTableListener.java
@@ -20,13 +20,14 @@ package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.util.StringUtils;
import javax.annotation.Nullable;
@@ -110,7 +111,8 @@ public class ManagedTableListener {
return false;
}
- if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+ if (table.getTableKind() != CatalogBaseTable.TableKind.TABLE
+ || !(table instanceof CatalogTable)) {
// view is not managed table
return false;
}
@@ -123,8 +125,14 @@ public class ManagedTableListener {
return false;
}
- if (!StringUtils.isNullOrWhitespaceOnly(
- options.get(ConnectorDescriptorValidator.CONNECTOR_TYPE))) {
+ // check legacy connector, here we need to check the factory, other properties are dummy
+ if (TableFactoryUtil.isLegacyConnectorOptions(
+ catalog,
+ new Configuration(),
+ true,
+ ObjectIdentifier.of("dummy_catalog", "dummy_database", "dummy_table"),
+ (CatalogTable) table,
+ true)) {
// legacy connector is not managed table
return false;
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index 7adc476..440e568 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
@@ -129,4 +131,37 @@ public class TableFactoryUtil {
}
return Optional.empty();
}
+
+ /** Checks whether the {@link CatalogTable} uses legacy connector sink options. */
+ public static boolean isLegacyConnectorOptions(
+ @Nullable Catalog catalog,
+ ReadableConfig configuration,
+ boolean isStreamingMode,
+ ObjectIdentifier objectIdentifier,
+ CatalogTable catalogTable,
+ boolean isTemporary) {
+ // normalize option keys
+ DescriptorProperties properties = new DescriptorProperties(true);
+ properties.putProperties(catalogTable.getOptions());
+ if (properties.containsKey(ConnectorDescriptorValidator.CONNECTOR_TYPE)) {
+ return true;
+ } else {
+ try {
+ // try to create legacy table source using the options,
+ // some legacy factories may use the 'type' key
+ TableFactoryUtil.findAndCreateTableSink(
+ catalog,
+ objectIdentifier,
+ catalogTable,
+ configuration,
+ isStreamingMode,
+ isTemporary);
+ // success, then we will use the legacy factories
+ return true;
+ } catch (Throwable ignore) {
+ // fail, then we will use new factories
+ return false;
+ }
+ }
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index a3a5c24..5994292 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -18,18 +18,23 @@
package org.apache.flink.table.planner.delegation
+import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
+import org.apache.calcite.plan.{RelTrait, RelTraitDef}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.hint.RelHint
+import org.apache.calcite.rel.logical.LogicalTableModify
+import org.apache.calcite.tools.FrameworkConfig
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.dag.Transformation
import org.apache.flink.configuration.ReadableConfig
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.graph.StreamGraph
import org.apache.flink.table.api._
-import org.apache.flink.table.api.config.{ExecutionConfigOptions, TableConfigOptions}
+import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.catalog.ManagedTableListener.isManagedTable
import org.apache.flink.table.catalog._
import org.apache.flink.table.connector.sink.DynamicTableSink
import org.apache.flink.table.delegation.{Executor, Parser, Planner}
-import org.apache.flink.table.descriptors.{ConnectorDescriptorValidator, DescriptorProperties}
import org.apache.flink.table.factories.{DynamicTableSinkFactory, FactoryUtil, TableFactoryUtil}
import org.apache.flink.table.module.{Module, ModuleManager}
import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
@@ -59,17 +64,9 @@ import org.apache.flink.table.runtime.generated.CompileUtils
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
-import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
-import org.apache.calcite.plan.{RelTrait, RelTraitDef}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.hint.RelHint
-import org.apache.calcite.rel.logical.LogicalTableModify
-import org.apache.calcite.tools.FrameworkConfig
-
import java.lang.{Long => JLong}
import java.util
import java.util.{Collections, TimeZone}
-
import _root_.scala.collection.JavaConversions._
import scala.collection.mutable
@@ -386,7 +383,13 @@ abstract class PlannerBase(
}
if (!contextResolvedTable.isAnonymous &&
- isLegacyConnectorOptions(objectIdentifier, resolvedTable.getOrigin, isTemporary)) {
+ TableFactoryUtil.isLegacyConnectorOptions(
+ catalogManager.getCatalog(objectIdentifier.getCatalogName).orElse(null),
+ tableConfig,
+ isStreamingMode,
+ objectIdentifier,
+ resolvedTable.getOrigin,
+ isTemporary)) {
val tableSink = TableFactoryUtil.findAndCreateTableSink(
catalog.orNull,
objectIdentifier,
@@ -423,39 +426,6 @@ abstract class PlannerBase(
}
}
- /**
- * Checks whether the [[CatalogTable]] uses legacy connector sink options.
- */
- private def isLegacyConnectorOptions(
- objectIdentifier: ObjectIdentifier,
- catalogTable: CatalogTable,
- isTemporary: Boolean) = {
- // normalize option keys
- val properties = new DescriptorProperties(true)
- properties.putProperties(catalogTable.getOptions)
- if (properties.containsKey(ConnectorDescriptorValidator.CONNECTOR_TYPE)) {
- true
- } else {
- val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName)
- try {
- // try to create legacy table source using the options,
- // some legacy factories uses the new 'connector' key
- TableFactoryUtil.findAndCreateTableSink(
- catalog.orElse(null),
- objectIdentifier,
- catalogTable,
- getTableConfig,
- isStreamingMode,
- isTemporary)
- // success, then we will use the legacy factories
- true
- } catch {
- // fail, then we will use new factories
- case _: Throwable => false
- }
- }
- }
-
protected def createSerdeContext: SerdeContext = {
val planner = createFlinkPlanner
new SerdeContext(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/LegacyTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/LegacyTableFactory.java
new file mode 100644
index 0000000..c0b0389
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/LegacyTableFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.planner.runtime.utils.TestingAppendTableSink;
+import org.apache.flink.table.planner.utils.TestTableSource;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/** A legacy {@link TableFactory} uses user define options. */
+public class LegacyTableFactory
+ implements StreamTableSinkFactory<Row>, StreamTableSourceFactory<Row> {
+
+ @Override
+ public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
+ DescriptorProperties dp = new DescriptorProperties();
+ dp.putProperties(properties);
+ TableSchema tableSchema = dp.getTableSchema(SCHEMA);
+ StreamTableSink<Row> sink = new TestingAppendTableSink();
+ return (StreamTableSink)
+ sink.configure(tableSchema.getFieldNames(), tableSchema.getFieldTypes());
+ }
+
+ @Override
+ public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
+ DescriptorProperties dp = new DescriptorProperties();
+ dp.putProperties(properties);
+ TableSchema tableSchema = dp.getTableSchema(SCHEMA);
+ return new TestTableSource(false, tableSchema);
+ }
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> options = new HashMap<>();
+ options.put("type", "legacy");
+ return options;
+ }
+
+ @Override
+ public List<String> supportedProperties() {
+ List<String> properties = new ArrayList<>();
+ // schema
+ properties.add(SCHEMA + ".#." + DescriptorProperties.TYPE);
+ properties.add(SCHEMA + ".#." + DescriptorProperties.DATA_TYPE);
+ properties.add(SCHEMA + ".#." + DescriptorProperties.NAME);
+ properties.add(SCHEMA + ".#." + DescriptorProperties.EXPR);
+ return properties;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/LegacyTableFactoryTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/LegacyTableFactoryTest.java
new file mode 100644
index 0000000..2984e74
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/LegacyTableFactoryTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.sql;
+
+import org.apache.flink.connector.file.table.LegacyTableFactory;
+import org.apache.flink.table.planner.utils.JavaStreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Test;
+
+/** Tests for usages of {@link LegacyTableFactory}. */
+public class LegacyTableFactoryTest extends TableTestBase {
+
+ private final JavaStreamTableTestUtil util;
+
+ public LegacyTableFactoryTest() {
+ util = javaStreamTestUtil();
+ util.tableEnv().executeSql("CREATE TABLE T (a INT) WITH ('type'='legacy')");
+ }
+
+ @Test
+ public void testSelect() {
+ util.verifyExecPlan("SELECT * FROM T");
+ }
+
+ @Test
+ public void testInsert() {
+ util.verifyExecPlanInsert("INSERT INTO T VALUES (1)");
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 9a8051b..b929b23 100644
--- a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -35,3 +35,4 @@ org.apache.flink.table.planner.utils.TestDataTypeTableSourceWithTimeFactory
org.apache.flink.table.planner.utils.TestStreamTableSourceFactory
org.apache.flink.table.planner.utils.TestFileInputFormatTableSourceFactory
org.apache.flink.table.planner.utils.TestTableSourceWithTimeFactory
+org.apache.flink.connector.file.table.LegacyTableFactory
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/LegacyTableFactoryTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/LegacyTableFactoryTest.xml
new file mode 100644
index 0000000..9993c59
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/LegacyTableFactoryTest.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testInsert">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO T VALUES (1)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`T`], fields=[a])
++- LogicalValues(tuples=[[{ 1 }]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+LegacySink(name=[`default_catalog`.`default_database`.`T`], fields=[a])
++- Values(tuples=[[{ 1 }]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSelect">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0])
++- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a)]]], fields=[a])
+]]>
+ </Resource>
+ </TestCase>
+</Root>