You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/02/22 03:50:46 UTC
[flink] 03/06: [FLINK-15912][table-planner-blink] Support create
table source/sink by context in blink planner
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 69d8816d164a106f8edf61a768569dafa5b0dc8d
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon Feb 10 15:35:42 2020 +0800
[FLINK-15912][table-planner-blink] Support create table source/sink by context in blink planner
---
.../flink/table/factories/TableFactoryUtil.java | 39 ++++++++++
.../table/planner/delegation/PlannerBase.scala | 16 ++--
.../planner/plan/schema/CatalogSourceTable.scala | 48 ++++++------
.../table/planner/plan/batch/sql/TableScanTest.xml | 8 +-
.../utils/TestCollectionTableFactory.scala | 53 ++++---------
.../table/planner/plan/batch/sql/SinkTest.scala | 31 --------
.../planner/plan/common/TableFactoryTest.scala | 88 ++++++++++++++++++++++
.../plan/utils/TestContextTableFactory.scala | 69 +++++++++++++++++
8 files changed, 245 insertions(+), 107 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index 57ca4cf..e33d55c 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -34,6 +34,34 @@ import java.util.Optional;
public class TableFactoryUtil {
/**
+ * Returns a table source matching the descriptor.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> TableSource<T> findAndCreateTableSource(TableSourceFactory.Context context) {
+ try {
+ return TableFactoryService
+ .find(TableSourceFactory.class, context.getTable().toProperties())
+ .createTableSource(context);
+ } catch (Throwable t) {
+ throw new TableException("findAndCreateTableSource failed.", t);
+ }
+ }
+
+ /**
+ * Returns a table sink matching the context.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> TableSink<T> findAndCreateTableSink(TableSinkFactory.Context context) {
+ try {
+ return TableFactoryService
+ .find(TableSinkFactory.class, context.getTable().toProperties())
+ .createTableSink(context);
+ } catch (Throwable t) {
+ throw new TableException("findAndCreateTableSink failed.", t);
+ }
+ }
+
+ /**
* Returns a table source matching the properties.
*/
@SuppressWarnings("unchecked")
@@ -86,4 +114,15 @@ public class TableFactoryUtil {
return Optional.empty();
}
+ /**
+ * Creates a table sink for a {@link CatalogTable} using table factory associated with the catalog.
+ */
+ public static Optional<TableSink> createTableSinkForCatalogTable(Catalog catalog, TableSinkFactory.Context context) {
+ TableFactory tableFactory = catalog.getTableFactory().orElse(null);
+ if (tableFactory instanceof TableSinkFactory) {
+ return Optional.ofNullable(((TableSinkFactory) tableFactory).createTableSink(context));
+ }
+ return Optional.empty();
+ }
+
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index ab084e5..47892a1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException}
import org.apache.flink.table.catalog._
import org.apache.flink.table.delegation.{Executor, Parser, Planner}
-import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory}
+import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactoryContextImpl}
import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
import org.apache.flink.table.operations._
import org.apache.flink.table.planner.calcite.{CalciteParser, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory}
@@ -40,7 +40,7 @@ import org.apache.flink.table.planner.plan.utils.SameRelObjectShuttle
import org.apache.flink.table.planner.sinks.DataStreamTableSink
import org.apache.flink.table.planner.sinks.TableSinkUtils.{inferSinkPhysicalSchema, validateLogicalPhysicalTypesCompatible, validateSchemaAndApplyImplicitCast, validateTableSink}
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil
-import org.apache.flink.table.sinks.{OverwritableTableSink, TableSink}
+import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
import org.apache.flink.table.utils.TableSchemaUtils
@@ -296,19 +296,15 @@ abstract class PlannerBase(
case Some(s) if s.isInstanceOf[CatalogTable] =>
val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName)
val table = s.asInstanceOf[CatalogTable]
+ val context = new TableSinkFactoryContextImpl(
+ objectIdentifier, table, getTableConfig.getConfiguration)
if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
- val objectPath = objectIdentifier.toObjectPath
- val sink = TableFactoryUtil.createTableSinkForCatalogTable(
- catalog.get(),
- table,
- objectPath)
+ val sink = TableFactoryUtil.createTableSinkForCatalogTable(catalog.get(), context)
if (sink.isPresent) {
return Option(table, sink.get())
}
}
- val sinkProperties = table.toProperties
- Option(table, TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties)
- .createTableSink(sinkProperties))
+ Option(table, TableFactoryUtil.findAndCreateTableSink(context))
case _ => None
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
index 96d35a9..9a82a8d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
@@ -18,12 +18,14 @@
package org.apache.flink.table.planner.plan.schema
+import org.apache.flink.configuration.ReadableConfig
import org.apache.flink.table.api.TableException
import org.apache.flink.table.catalog.CatalogTable
-import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory}
+import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory, TableSourceFactoryContextImpl}
import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder}
import org.apache.flink.table.planner.catalog.CatalogSchemaTable
import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceValidation}
+import org.apache.flink.table.utils.TableConnectorUtils.generateRuntimeName
import org.apache.calcite.plan.{RelOptSchema, RelOptTable}
import org.apache.calcite.rel.RelNode
@@ -62,12 +64,23 @@ class CatalogSourceTable[T](
.toMap
}
- lazy val tableSource: TableSource[T] = findAndCreateTableSource().asInstanceOf[TableSource[T]]
-
- override def getQualifiedName: JList[String] = explainSourceAsString(tableSource)
+ override def getQualifiedName: JList[String] = {
+ // Do not explain source, we already have full names, table source should be created in toRel.
+ val ret = new util.ArrayList[String](names)
+ // Add class name to distinguish TableSourceTable.
+ val name = generateRuntimeName(getClass, catalogTable.getSchema.getFieldNames)
+ ret.add(s"catalog_source: [$name]")
+ ret
+ }
override def toRel(context: RelOptTable.ToRelContext): RelNode = {
val cluster = context.getCluster
+ val flinkContext = cluster
+ .getPlanner
+ .getContext
+ .unwrap(classOf[FlinkContext])
+
+ val tableSource = findAndCreateTableSource(flinkContext.getTableConfig.getConfiguration)
val tableSourceTable = new TableSourceTable[T](
relOptSchema,
schemaTable.getTableIdentifier,
@@ -91,11 +104,7 @@ class CatalogSourceTable[T](
val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema)
relBuilder.push(scan)
- val toRexFactory = cluster
- .getPlanner
- .getContext
- .unwrap(classOf[FlinkContext])
- .getSqlExprToRexConverterFactory
+ val toRexFactory = flinkContext.getSqlExprToRexConverterFactory
// 2. push computed column project
val fieldNames = rowType.getFieldNames.asScala
@@ -140,32 +149,25 @@ class CatalogSourceTable[T](
relBuilder.build()
}
- /** Create the table source lazily. */
- private def findAndCreateTableSource(): TableSource[_] = {
+ /** Create the table source. */
+ private def findAndCreateTableSource(conf: ReadableConfig): TableSource[T] = {
val tableFactoryOpt = schemaTable.getTableFactory
+ val context = new TableSourceFactoryContextImpl(
+ schemaTable.getTableIdentifier, catalogTable, conf)
val tableSource = if (tableFactoryOpt.isPresent) {
tableFactoryOpt.get() match {
case tableSourceFactory: TableSourceFactory[_] =>
- tableSourceFactory.createTableSource(
- schemaTable.getTableIdentifier.toObjectPath,
- catalogTable)
+ tableSourceFactory.createTableSource(context)
case _ => throw new TableException("Cannot query a sink-only table. "
+ "TableFactory provided by catalog must implement TableSourceFactory")
}
} else {
- TableFactoryUtil.findAndCreateTableSource(catalogTable)
+ TableFactoryUtil.findAndCreateTableSource(context)
}
if (!tableSource.isInstanceOf[StreamTableSource[_]]) {
throw new TableException("Catalog tables support only "
+ "StreamTableSource and InputFormatTableSource")
}
- tableSource
- }
-
- override protected def explainSourceAsString(ts: TableSource[_]): JList[String] = {
- val ret = new util.ArrayList[String](super.explainSourceAsString(ts))
- // Add class name to distinguish TableSourceTable.
- ret.add("class: " + classOf[CatalogSourceTable[_]].getSimpleName)
- ret
+ tableSource.asInstanceOf[TableSource[T]]
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
index 39a67ca..63bea00 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
@@ -100,7 +100,7 @@ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
<TestCase name="testTableApiScanWithDDL">
<Resource name="planBefore">
<![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)], class: CatalogSourceTable]])
+LogicalTableScan(table=[[default_catalog, default_database, t1, catalog_source: [CatalogSourceTable(a, b)]]])
]]>
</Resource>
<Resource name="planAfter">
@@ -113,7 +113,7 @@ TableSourceScan(table=[[default_catalog, default_database, t1, source: [Collecti
<TestCase name="testTableApiScanWithTemporaryTable">
<Resource name="planBefore">
<![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CsvTableSource(read fields: word)], class: CatalogSourceTable]])
+LogicalTableScan(table=[[default_catalog, default_database, t1, catalog_source: [CatalogSourceTable(word)]]])
]]>
</Resource>
<Resource name="planAfter">
@@ -126,7 +126,7 @@ TableSourceScan(table=[[default_catalog, default_database, t1, source: [CsvTable
<TestCase name="testTableApiScanWithWatermark">
<Resource name="planBefore">
<![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, source: [CollectionTableSource(a, b)], class: CatalogSourceTable]])
+LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, catalog_source: [CatalogSourceTable(a, b, c, d, e)]]])
]]>
</Resource>
<Resource name="planAfter">
@@ -140,7 +140,7 @@ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
<TestCase name="testTableApiScanWithComputedColumn">
<Resource name="planBefore">
<![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)], class: CatalogSourceTable]])
+LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, catalog_source: [CatalogSourceTable(a, b, c, d, e)]]])
]]>
</Resource>
<Resource name="planAfter">
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
index 01c8fb8..170f118 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.io.{CollectionInputFormat, LocalCollectionOutputFormat}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, DataStreamSource}
@@ -30,12 +29,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.table.api.TableSchema
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
-import org.apache.flink.table.descriptors.{DescriptorProperties, Schema}
-import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, StreamTableSinkFactory, StreamTableSourceFactory}
+import org.apache.flink.table.factories.{TableSinkFactory, TableSourceFactory}
import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction}
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory.{getCollectionSink, getCollectionSource}
import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, StreamTableSink, TableSink}
-import org.apache.flink.table.sources.{BatchTableSource, LookupableTableSource, StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{BatchTableSource, LookupableTableSource, StreamTableSource}
import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row
@@ -45,35 +43,16 @@ import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, List => JL
import scala.collection.JavaConversions._
-class TestCollectionTableFactory
- extends StreamTableSourceFactory[Row]
- with StreamTableSinkFactory[Row]
- with BatchTableSourceFactory[Row]
- with BatchTableSinkFactory[Row]
-{
+class TestCollectionTableFactory extends TableSourceFactory[Row] with TableSinkFactory[Row] {
- override def createTableSource(properties: JMap[String, String]): TableSource[Row] = {
- getCollectionSource(properties)
+ override def createTableSource(
+ context: TableSourceFactory.Context): StreamTableSource[Row] = {
+ getCollectionSource(context)
}
- override def createTableSink(properties: JMap[String, String]): TableSink[Row] = {
- getCollectionSink(properties)
- }
-
- override def createStreamTableSource(properties: JMap[String, String]): StreamTableSource[Row] = {
- getCollectionSource(properties)
- }
-
- override def createStreamTableSink(properties: JMap[String, String]): StreamTableSink[Row] = {
- getCollectionSink(properties)
- }
-
- override def createBatchTableSource(properties: JMap[String, String]): BatchTableSource[Row] = {
- getCollectionSource(properties)
- }
-
- override def createBatchTableSink(properties: JMap[String, String]): BatchTableSink[Row] = {
- getCollectionSink(properties)
+ override def createTableSink(
+ context: TableSinkFactory.Context): StreamTableSink[Row] = {
+ getCollectionSink(context)
}
override def requiredContext(): JMap[String, String] = {
@@ -118,18 +97,14 @@ object TestCollectionTableFactory {
def getResult: util.List[Row] = RESULT
- def getCollectionSource(props: JMap[String, String]): CollectionTableSource = {
- val properties = new DescriptorProperties()
- properties.putProperties(props)
- val schema = properties.getTableSchema(Schema.SCHEMA)
- val isBounded = properties.getOptionalBoolean(IS_BOUNDED).orElse(true)
+ def getCollectionSource(context: TableSourceFactory.Context): CollectionTableSource = {
+ val schema = context.getTable.getSchema
+ val isBounded = context.getTable.getProperties.getOrDefault(IS_BOUNDED, "true").toBoolean
new CollectionTableSource(emitIntervalMS, physicalSchema(schema), isBounded)
}
- def getCollectionSink(props: JMap[String, String]): CollectionTableSink = {
- val properties = new DescriptorProperties()
- properties.putProperties(props)
- val schema = properties.getTableSchema(Schema.SCHEMA)
+ def getCollectionSink(context: TableSinkFactory.Context): CollectionTableSink = {
+ val schema = context.getTable.getSchema
new CollectionTableSink(physicalSchema(schema))
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala
index e04f9eb..4a91621 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala
@@ -20,20 +20,11 @@ package org.apache.flink.table.planner.plan.batch.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{DataTypes, TableSchema}
-import org.apache.flink.table.catalog.{CatalogTableImpl, GenericInMemoryCatalog, ObjectPath}
-import org.apache.flink.table.factories.TableSinkFactory
import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
import org.apache.flink.table.planner.utils.TableTestBase
-import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.types.logical.{BigIntType, IntType}
import org.junit.Test
-import org.mockito.{ArgumentMatchers, Mockito}
-
-import java.util.Optional
-
-import scala.collection.JavaConverters._
class SinkTest extends TableTestBase {
@@ -68,26 +59,4 @@ class SinkTest extends TableTestBase {
util.verifyPlan()
}
-
- @Test
- def testCatalogTableSink(): Unit = {
- val schemaBuilder = new TableSchema.Builder()
- schemaBuilder.fields(Array("i"), Array(DataTypes.INT()))
- val schema = schemaBuilder.build()
- val sink = util.createCollectTableSink(schema.getFieldNames, Array(INT))
- val catalog = Mockito.spy(new GenericInMemoryCatalog("dummy"))
- val factory = Mockito.mock(classOf[TableSinkFactory[_]])
- Mockito.when[Optional[_]](catalog.getTableFactory).thenReturn(Optional.of(factory))
- Mockito.when[TableSink[_]](factory.createTableSink(
- ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(sink)
- util.tableEnv.registerCatalog(catalog.getName, catalog)
- util.tableEnv.useCatalog(catalog.getName)
- val catalogTable = new CatalogTableImpl(schema, Map[String, String]().asJava, "")
- catalog.createTable(new ObjectPath("default", "tbl"), catalogTable, false)
- util.tableEnv.sqlQuery("select 1").insertInto("tbl")
- util.tableEnv.explain(false)
- // verify we tried to get table factory from catalog
- Mockito.verify(catalog, Mockito.atLeast(1)).getTableFactory
- }
-
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala
new file mode 100644
index 0000000..9661bfc
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.common
+
+import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectIdentifier}
+import org.apache.flink.table.factories.TableFactory
+import org.apache.flink.table.planner.plan.utils.TestContextTableFactory
+import org.apache.flink.table.planner.utils.TableTestBase
+
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Assert, Test}
+
+import java.util.Optional
+
+@RunWith(classOf[Parameterized])
+class TableFactoryTest(isBatch: Boolean) extends TableTestBase {
+
+ private val util = if (isBatch) batchTestUtil() else streamTestUtil()
+
+ @Test
+ def testTableSourceSinkFactory(): Unit = {
+ val factory = new TestContextTableFactory(
+ ObjectIdentifier.of("cat", "default", "t1"),
+ ObjectIdentifier.of("cat", "default", "t2"))
+ util.tableEnv.getConfig.getConfiguration.setBoolean(TestContextTableFactory.REQUIRED_KEY, true)
+ util.tableEnv.registerCatalog("cat", new GenericInMemoryCatalog("default") {
+ override def getTableFactory: Optional[TableFactory] = Optional.of(factory)
+ })
+ util.tableEnv.useCatalog("cat")
+
+ val sourceDDL =
+ """
+ |create table t1(
+ | a int,
+ | b varchar,
+ | c as a + 1
+ |) with (
+ | 'connector' = 'COLLECTION'
+ |)
+ """.stripMargin
+ val sinkDDL =
+ """
+ |create table t2(
+ | a int,
+ | b as c - 1,
+ | c int
+ |) with (
+ | 'connector' = 'COLLECTION'
+ |)
+ """.stripMargin
+ val query =
+ """
+ |insert into t2
+ |select t1.a, t1.c from t1
+ """.stripMargin
+ util.tableEnv.sqlUpdate(sourceDDL)
+ util.tableEnv.sqlUpdate(sinkDDL)
+ util.tableEnv.sqlUpdate(query)
+
+ util.tableEnv.explain(false)
+ Assert.assertTrue(factory.hasInvokedSource)
+ Assert.assertTrue(factory.hasInvokedSink)
+ }
+}
+
+object TableFactoryTest {
+ @Parameterized.Parameters(name = "isBatch: {0}")
+ def parameters(): java.util.Collection[Boolean] = {
+ java.util.Arrays.asList(true, false)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala
new file mode 100644
index 0000000..9e60cd7
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils
+
+import org.apache.flink.configuration.{ConfigOption, ConfigOptions}
+import org.apache.flink.table.catalog.ObjectIdentifier
+import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactory, TableSourceFactory}
+import org.apache.flink.table.planner.plan.utils.TestContextTableFactory.REQUIRED_KEY
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
+
+import org.junit.Assert
+
+import java.{lang, util}
+
+/**
+ * Test [[TableSourceFactory]] and [[TableSinkFactory]] for context.
+ */
+class TestContextTableFactory[T](
+ sourceIdentifier: ObjectIdentifier,
+ sinkIdentifier: ObjectIdentifier)
+ extends TableSourceFactory[T] with TableSinkFactory[T] {
+
+ var hasInvokedSource = false
+ var hasInvokedSink = false
+
+ override def requiredContext(): util.Map[String, String] = {
+ throw new UnsupportedOperationException
+ }
+
+ override def supportedProperties(): util.List[String] = {
+ throw new UnsupportedOperationException
+ }
+
+ override def createTableSource(context: TableSourceFactory.Context): TableSource[T] = {
+ Assert.assertTrue(context.getConfiguration.get(REQUIRED_KEY))
+ Assert.assertEquals(sourceIdentifier, context.getObjectIdentifier)
+ hasInvokedSource = true
+ TableFactoryUtil.findAndCreateTableSource(context)
+ }
+
+ override def createTableSink(context: TableSinkFactory.Context): TableSink[T] = {
+ Assert.assertTrue(context.getConfiguration.get(REQUIRED_KEY))
+ Assert.assertEquals(sinkIdentifier, context.getObjectIdentifier)
+ hasInvokedSink = true
+ TableFactoryUtil.findAndCreateTableSink(context)
+ }
+}
+
+object TestContextTableFactory{
+ val REQUIRED_KEY: ConfigOption[lang.Boolean] = ConfigOptions
+ .key("testing.required.key").booleanType().defaultValue(false)
+}