You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/02/22 03:50:46 UTC

[flink] 03/06: [FLINK-15912][table-planner-blink] Support create table source/sink by context in blink planner

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 69d8816d164a106f8edf61a768569dafa5b0dc8d
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon Feb 10 15:35:42 2020 +0800

    [FLINK-15912][table-planner-blink] Support create table source/sink by context in blink planner
---
 .../flink/table/factories/TableFactoryUtil.java    | 39 ++++++++++
 .../table/planner/delegation/PlannerBase.scala     | 16 ++--
 .../planner/plan/schema/CatalogSourceTable.scala   | 48 ++++++------
 .../table/planner/plan/batch/sql/TableScanTest.xml |  8 +-
 .../utils/TestCollectionTableFactory.scala         | 53 ++++---------
 .../table/planner/plan/batch/sql/SinkTest.scala    | 31 --------
 .../planner/plan/common/TableFactoryTest.scala     | 88 ++++++++++++++++++++++
 .../plan/utils/TestContextTableFactory.scala       | 69 +++++++++++++++++
 8 files changed, 245 insertions(+), 107 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index 57ca4cf..e33d55c 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -34,6 +34,34 @@ import java.util.Optional;
 public class TableFactoryUtil {
 
 	/**
+	 * Returns a table source matching the descriptor.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T> TableSource<T> findAndCreateTableSource(TableSourceFactory.Context context) {
+		try {
+			return TableFactoryService
+					.find(TableSourceFactory.class, context.getTable().toProperties())
+					.createTableSource(context);
+		} catch (Throwable t) {
+			throw new TableException("findAndCreateTableSource failed.", t);
+		}
+	}
+
+	/**
+	 * Returns a table sink matching the context.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T> TableSink<T> findAndCreateTableSink(TableSinkFactory.Context context) {
+		try {
+			return TableFactoryService
+					.find(TableSinkFactory.class, context.getTable().toProperties())
+					.createTableSink(context);
+		} catch (Throwable t) {
+			throw new TableException("findAndCreateTableSink failed.", t);
+		}
+	}
+
+	/**
 	 * Returns a table source matching the properties.
 	 */
 	@SuppressWarnings("unchecked")
@@ -86,4 +114,15 @@ public class TableFactoryUtil {
 		return Optional.empty();
 	}
 
+	/**
+	 * Creates a table sink for a {@link CatalogTable} using table factory associated with the catalog.
+	 */
+	public static Optional<TableSink> createTableSinkForCatalogTable(Catalog catalog, TableSinkFactory.Context context) {
+		TableFactory tableFactory = catalog.getTableFactory().orElse(null);
+		if (tableFactory instanceof TableSinkFactory) {
+			return Optional.ofNullable(((TableSinkFactory) tableFactory).createTableSink(context));
+		}
+		return Optional.empty();
+	}
+
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index ab084e5..47892a1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException}
 import org.apache.flink.table.catalog._
 import org.apache.flink.table.delegation.{Executor, Parser, Planner}
-import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory}
+import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactoryContextImpl}
 import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
 import org.apache.flink.table.operations._
 import org.apache.flink.table.planner.calcite.{CalciteParser, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory}
@@ -40,7 +40,7 @@ import org.apache.flink.table.planner.plan.utils.SameRelObjectShuttle
 import org.apache.flink.table.planner.sinks.DataStreamTableSink
 import org.apache.flink.table.planner.sinks.TableSinkUtils.{inferSinkPhysicalSchema, validateLogicalPhysicalTypesCompatible, validateSchemaAndApplyImplicitCast, validateTableSink}
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil
-import org.apache.flink.table.sinks.{OverwritableTableSink, TableSink}
+import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
 import org.apache.flink.table.utils.TableSchemaUtils
 
@@ -296,19 +296,15 @@ abstract class PlannerBase(
       case Some(s) if s.isInstanceOf[CatalogTable] =>
         val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName)
         val table = s.asInstanceOf[CatalogTable]
+        val context = new TableSinkFactoryContextImpl(
+          objectIdentifier, table, getTableConfig.getConfiguration)
         if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
-          val objectPath = objectIdentifier.toObjectPath
-          val sink = TableFactoryUtil.createTableSinkForCatalogTable(
-            catalog.get(),
-            table,
-            objectPath)
+          val sink = TableFactoryUtil.createTableSinkForCatalogTable(catalog.get(), context)
           if (sink.isPresent) {
             return Option(table, sink.get())
           }
         }
-        val sinkProperties = table.toProperties
-        Option(table, TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties)
-          .createTableSink(sinkProperties))
+        Option(table, TableFactoryUtil.findAndCreateTableSink(context))
 
       case _ => None
     }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
index 96d35a9..9a82a8d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
@@ -18,12 +18,14 @@
 
 package org.apache.flink.table.planner.plan.schema
 
+import org.apache.flink.configuration.ReadableConfig
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.catalog.CatalogTable
-import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory}
+import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory, TableSourceFactoryContextImpl}
 import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder}
 import org.apache.flink.table.planner.catalog.CatalogSchemaTable
 import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceValidation}
+import org.apache.flink.table.utils.TableConnectorUtils.generateRuntimeName
 
 import org.apache.calcite.plan.{RelOptSchema, RelOptTable}
 import org.apache.calcite.rel.RelNode
@@ -62,12 +64,23 @@ class CatalogSourceTable[T](
       .toMap
   }
 
-  lazy val tableSource: TableSource[T] = findAndCreateTableSource().asInstanceOf[TableSource[T]]
-
-  override def getQualifiedName: JList[String] = explainSourceAsString(tableSource)
+  override def getQualifiedName: JList[String] = {
+    // Do not explain source, we already have full names, table source should be created in toRel.
+    val ret = new util.ArrayList[String](names)
+    // Add class name to distinguish TableSourceTable.
+    val name = generateRuntimeName(getClass, catalogTable.getSchema.getFieldNames)
+    ret.add(s"catalog_source: [$name]")
+    ret
+  }
 
   override def toRel(context: RelOptTable.ToRelContext): RelNode = {
     val cluster = context.getCluster
+    val flinkContext = cluster
+        .getPlanner
+        .getContext
+        .unwrap(classOf[FlinkContext])
+
+    val tableSource = findAndCreateTableSource(flinkContext.getTableConfig.getConfiguration)
     val tableSourceTable = new TableSourceTable[T](
       relOptSchema,
       schemaTable.getTableIdentifier,
@@ -91,11 +104,7 @@ class CatalogSourceTable[T](
     val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema)
     relBuilder.push(scan)
 
-    val toRexFactory = cluster
-        .getPlanner
-        .getContext
-        .unwrap(classOf[FlinkContext])
-        .getSqlExprToRexConverterFactory
+    val toRexFactory = flinkContext.getSqlExprToRexConverterFactory
 
     // 2. push computed column project
     val fieldNames = rowType.getFieldNames.asScala
@@ -140,32 +149,25 @@ class CatalogSourceTable[T](
     relBuilder.build()
   }
 
-  /** Create the table source lazily. */
-  private def findAndCreateTableSource(): TableSource[_] = {
+  /** Create the table source. */
+  private def findAndCreateTableSource(conf: ReadableConfig): TableSource[T] = {
     val tableFactoryOpt = schemaTable.getTableFactory
+    val context = new TableSourceFactoryContextImpl(
+      schemaTable.getTableIdentifier, catalogTable, conf)
     val tableSource = if (tableFactoryOpt.isPresent) {
       tableFactoryOpt.get() match {
         case tableSourceFactory: TableSourceFactory[_] =>
-          tableSourceFactory.createTableSource(
-            schemaTable.getTableIdentifier.toObjectPath,
-            catalogTable)
+          tableSourceFactory.createTableSource(context)
         case _ => throw new TableException("Cannot query a sink-only table. "
           + "TableFactory provided by catalog must implement TableSourceFactory")
       }
     } else {
-      TableFactoryUtil.findAndCreateTableSource(catalogTable)
+      TableFactoryUtil.findAndCreateTableSource(context)
     }
     if (!tableSource.isInstanceOf[StreamTableSource[_]]) {
       throw new TableException("Catalog tables support only "
         + "StreamTableSource and InputFormatTableSource")
     }
-    tableSource
-  }
-
-  override protected def explainSourceAsString(ts: TableSource[_]): JList[String] = {
-    val ret = new util.ArrayList[String](super.explainSourceAsString(ts))
-    // Add class name to distinguish TableSourceTable.
-    ret.add("class: " + classOf[CatalogSourceTable[_]].getSimpleName)
-    ret
+    tableSource.asInstanceOf[TableSource[T]]
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
index 39a67ca..63bea00 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
@@ -100,7 +100,7 @@ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
   <TestCase name="testTableApiScanWithDDL">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)], class: CatalogSourceTable]])
+LogicalTableScan(table=[[default_catalog, default_database, t1, catalog_source: [CatalogSourceTable(a, b)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -113,7 +113,7 @@ TableSourceScan(table=[[default_catalog, default_database, t1, source: [Collecti
   <TestCase name="testTableApiScanWithTemporaryTable">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CsvTableSource(read fields: word)], class: CatalogSourceTable]])
+LogicalTableScan(table=[[default_catalog, default_database, t1, catalog_source: [CatalogSourceTable(word)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -126,7 +126,7 @@ TableSourceScan(table=[[default_catalog, default_database, t1, source: [CsvTable
   <TestCase name="testTableApiScanWithWatermark">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, source: [CollectionTableSource(a, b)], class: CatalogSourceTable]])
+LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, catalog_source: [CatalogSourceTable(a, b, c, d, e)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -140,7 +140,7 @@ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
   <TestCase name="testTableApiScanWithComputedColumn">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)], class: CatalogSourceTable]])
+LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, catalog_source: [CatalogSourceTable(a, b, c, d, e)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
index 01c8fb8..170f118 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.io.{CollectionInputFormat, LocalCollectionOutputFormat}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, DataStreamSource}
@@ -30,12 +29,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.table.api.TableSchema
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
-import org.apache.flink.table.descriptors.{DescriptorProperties, Schema}
-import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, StreamTableSinkFactory, StreamTableSourceFactory}
+import org.apache.flink.table.factories.{TableSinkFactory, TableSourceFactory}
 import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction}
 import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory.{getCollectionSink, getCollectionSource}
 import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, StreamTableSink, TableSink}
-import org.apache.flink.table.sources.{BatchTableSource, LookupableTableSource, StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{BatchTableSource, LookupableTableSource, StreamTableSource}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
 
@@ -45,35 +43,16 @@ import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, List => JL
 
 import scala.collection.JavaConversions._
 
-class TestCollectionTableFactory
-  extends StreamTableSourceFactory[Row]
-    with StreamTableSinkFactory[Row]
-    with BatchTableSourceFactory[Row]
-    with BatchTableSinkFactory[Row]
-{
+class TestCollectionTableFactory extends TableSourceFactory[Row] with TableSinkFactory[Row] {
 
-  override def createTableSource(properties: JMap[String, String]): TableSource[Row] = {
-    getCollectionSource(properties)
+  override def createTableSource(
+      context: TableSourceFactory.Context): StreamTableSource[Row] = {
+    getCollectionSource(context)
   }
 
-  override def createTableSink(properties: JMap[String, String]): TableSink[Row] = {
-    getCollectionSink(properties)
-  }
-
-  override def createStreamTableSource(properties: JMap[String, String]): StreamTableSource[Row] = {
-    getCollectionSource(properties)
-  }
-
-  override def createStreamTableSink(properties: JMap[String, String]): StreamTableSink[Row] = {
-    getCollectionSink(properties)
-  }
-
-  override def createBatchTableSource(properties: JMap[String, String]): BatchTableSource[Row] = {
-    getCollectionSource(properties)
-  }
-
-  override def createBatchTableSink(properties: JMap[String, String]): BatchTableSink[Row] = {
-    getCollectionSink(properties)
+  override def createTableSink(
+      context: TableSinkFactory.Context): StreamTableSink[Row] = {
+    getCollectionSink(context)
   }
 
   override def requiredContext(): JMap[String, String] = {
@@ -118,18 +97,14 @@ object TestCollectionTableFactory {
 
   def getResult: util.List[Row] = RESULT
 
-  def getCollectionSource(props: JMap[String, String]): CollectionTableSource = {
-    val properties = new DescriptorProperties()
-    properties.putProperties(props)
-    val schema = properties.getTableSchema(Schema.SCHEMA)
-    val isBounded = properties.getOptionalBoolean(IS_BOUNDED).orElse(true)
+  def getCollectionSource(context: TableSourceFactory.Context): CollectionTableSource = {
+    val schema = context.getTable.getSchema
+    val isBounded = context.getTable.getProperties.getOrDefault(IS_BOUNDED, "true").toBoolean
     new CollectionTableSource(emitIntervalMS, physicalSchema(schema), isBounded)
   }
 
-  def getCollectionSink(props: JMap[String, String]): CollectionTableSink = {
-    val properties = new DescriptorProperties()
-    properties.putProperties(props)
-    val schema = properties.getTableSchema(Schema.SCHEMA)
+  def getCollectionSink(context: TableSinkFactory.Context): CollectionTableSink = {
+    val schema = context.getTable.getSchema
     new CollectionTableSink(physicalSchema(schema))
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala
index e04f9eb..4a91621 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala
@@ -20,20 +20,11 @@ package org.apache.flink.table.planner.plan.batch.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{DataTypes, TableSchema}
-import org.apache.flink.table.catalog.{CatalogTableImpl, GenericInMemoryCatalog, ObjectPath}
-import org.apache.flink.table.factories.TableSinkFactory
 import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
 import org.apache.flink.table.planner.utils.TableTestBase
-import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.types.logical.{BigIntType, IntType}
 
 import org.junit.Test
-import org.mockito.{ArgumentMatchers, Mockito}
-
-import java.util.Optional
-
-import scala.collection.JavaConverters._
 
 class SinkTest extends TableTestBase {
 
@@ -68,26 +59,4 @@ class SinkTest extends TableTestBase {
 
     util.verifyPlan()
   }
-
-  @Test
-  def testCatalogTableSink(): Unit = {
-    val schemaBuilder = new TableSchema.Builder()
-    schemaBuilder.fields(Array("i"), Array(DataTypes.INT()))
-    val schema = schemaBuilder.build()
-    val sink = util.createCollectTableSink(schema.getFieldNames, Array(INT))
-    val catalog = Mockito.spy(new GenericInMemoryCatalog("dummy"))
-    val factory = Mockito.mock(classOf[TableSinkFactory[_]])
-    Mockito.when[Optional[_]](catalog.getTableFactory).thenReturn(Optional.of(factory))
-    Mockito.when[TableSink[_]](factory.createTableSink(
-      ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(sink)
-    util.tableEnv.registerCatalog(catalog.getName, catalog)
-    util.tableEnv.useCatalog(catalog.getName)
-    val catalogTable = new CatalogTableImpl(schema, Map[String, String]().asJava, "")
-    catalog.createTable(new ObjectPath("default", "tbl"), catalogTable, false)
-    util.tableEnv.sqlQuery("select 1").insertInto("tbl")
-    util.tableEnv.explain(false)
-    // verify we tried to get table factory from catalog
-    Mockito.verify(catalog, Mockito.atLeast(1)).getTableFactory
-  }
-
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala
new file mode 100644
index 0000000..9661bfc
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.common
+
+import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectIdentifier}
+import org.apache.flink.table.factories.TableFactory
+import org.apache.flink.table.planner.plan.utils.TestContextTableFactory
+import org.apache.flink.table.planner.utils.TableTestBase
+
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Assert, Test}
+
+import java.util.Optional
+
+@RunWith(classOf[Parameterized])
+class TableFactoryTest(isBatch: Boolean) extends TableTestBase {
+
+  private val util = if (isBatch) batchTestUtil() else streamTestUtil()
+
+  @Test
+  def testTableSourceSinkFactory(): Unit = {
+    val factory = new TestContextTableFactory(
+      ObjectIdentifier.of("cat", "default", "t1"),
+      ObjectIdentifier.of("cat", "default", "t2"))
+    util.tableEnv.getConfig.getConfiguration.setBoolean(TestContextTableFactory.REQUIRED_KEY, true)
+    util.tableEnv.registerCatalog("cat", new GenericInMemoryCatalog("default") {
+      override def getTableFactory: Optional[TableFactory] = Optional.of(factory)
+    })
+    util.tableEnv.useCatalog("cat")
+
+    val sourceDDL =
+      """
+        |create table t1(
+        |  a int,
+        |  b varchar,
+        |  c as a + 1
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    val sinkDDL =
+      """
+        |create table t2(
+        |  a int,
+        |  b as c - 1,
+        |  c int
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    val query =
+      """
+        |insert into t2
+        |select t1.a, t1.c from t1
+      """.stripMargin
+    util.tableEnv.sqlUpdate(sourceDDL)
+    util.tableEnv.sqlUpdate(sinkDDL)
+    util.tableEnv.sqlUpdate(query)
+
+    util.tableEnv.explain(false)
+    Assert.assertTrue(factory.hasInvokedSource)
+    Assert.assertTrue(factory.hasInvokedSink)
+  }
+}
+
+object TableFactoryTest {
+  @Parameterized.Parameters(name = "isBatch: {0}")
+  def parameters(): java.util.Collection[Boolean] = {
+    java.util.Arrays.asList(true, false)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala
new file mode 100644
index 0000000..9e60cd7
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils
+
+import org.apache.flink.configuration.{ConfigOption, ConfigOptions}
+import org.apache.flink.table.catalog.ObjectIdentifier
+import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactory, TableSourceFactory}
+import org.apache.flink.table.planner.plan.utils.TestContextTableFactory.REQUIRED_KEY
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
+
+import org.junit.Assert
+
+import java.{lang, util}
+
+/**
+  * Test [[TableSourceFactory]] and [[TableSinkFactory]] for context.
+  */
+class TestContextTableFactory[T](
+    sourceIdentifier: ObjectIdentifier,
+    sinkIdentifier: ObjectIdentifier)
+    extends TableSourceFactory[T] with TableSinkFactory[T] {
+
+  var hasInvokedSource = false
+  var hasInvokedSink = false
+
+  override def requiredContext(): util.Map[String, String] = {
+    throw new UnsupportedOperationException
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    throw new UnsupportedOperationException
+  }
+
+  override def createTableSource(context: TableSourceFactory.Context): TableSource[T] = {
+    Assert.assertTrue(context.getConfiguration.get(REQUIRED_KEY))
+    Assert.assertEquals(sourceIdentifier, context.getObjectIdentifier)
+    hasInvokedSource = true
+    TableFactoryUtil.findAndCreateTableSource(context)
+  }
+
+  override def createTableSink(context: TableSinkFactory.Context): TableSink[T] = {
+    Assert.assertTrue(context.getConfiguration.get(REQUIRED_KEY))
+    Assert.assertEquals(sinkIdentifier, context.getObjectIdentifier)
+    hasInvokedSink = true
+    TableFactoryUtil.findAndCreateTableSink(context)
+  }
+}
+
+object TestContextTableFactory{
+  val REQUIRED_KEY: ConfigOption[lang.Boolean] = ConfigOptions
+      .key("testing.required.key").booleanType().defaultValue(false)
+}