You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/02/22 03:50:43 UTC

[flink] branch master updated (9511243 -> 052eb77)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 9511243  [FLINK-16186][es][tests] Reduce connect timeout to 5 seconds
     new f6895da  [FLINK-15912][table] Add Context to TableSourceFactory and TableSinkFactory
     new 306a89a  [FLINK-15912][table] Support create table source/sink by context in sql-cli
     new 69d8816  [FLINK-15912][table-planner-blink] Support create table source/sink by context in blink planner
     new e280ffc  [FLINK-15912][table-planner] Support create table source/sink by context in legacy planner
     new 4e92bdb  [FLINK-15912][table] Support create table source/sink by context in hive connector
     new 052eb77  [FLINK-15912][table] Clean TableFactoryUtil

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/connectors/hive/HiveTableFactory.java    | 26 ++-----
 .../connectors/hive/HiveTableFactoryTest.java      | 16 +++-
 .../flink/connectors/hive/HiveTableSourceTest.java |  3 +-
 .../client/gateway/local/ExecutionContext.java     | 37 ++++++---
 .../gateway/utils/TestTableSinkFactoryBase.java    | 11 +--
 .../gateway/utils/TestTableSourceFactoryBase.java  | 10 ++-
 .../table/factories/StreamTableSinkFactory.java    | 15 +++-
 .../table/factories/StreamTableSourceFactory.java  | 15 +++-
 .../flink/table/catalog/CatalogTableImpl.java      | 15 ++++
 .../table/descriptors/ConnectTableDescriptor.java  | 15 +---
 .../flink/table/factories/TableFactoryUtil.java    | 42 ++++-------
 .../table/descriptors/DescriptorProperties.java    | 19 +++++
 .../flink/table/factories/TableSinkFactory.java    | 49 +++++++++++-
 .../TableSinkFactoryContextImpl.java}              | 59 ++++++---------
 .../flink/table/factories/TableSourceFactory.java  | 49 +++++++++++-
 .../TableSourceFactoryContextImpl.java}            | 59 ++++++---------
 .../table/planner/delegation/PlannerBase.scala     | 16 ++--
 .../planner/plan/schema/CatalogSourceTable.scala   | 48 ++++++------
 .../table/planner/plan/batch/sql/TableScanTest.xml |  8 +-
 .../utils/TestCollectionTableFactory.scala         | 53 ++++---------
 .../table/planner/plan/batch/sql/SinkTest.scala    | 31 --------
 .../planner/plan/common/TableFactoryTest.scala     | 88 ++++++++++++++++++++++
 .../plan/utils/TestContextTableFactory.scala       | 69 +++++++++++++++++
 .../flink/table/catalog/CatalogCalciteSchema.java  | 11 ++-
 .../table/catalog/CatalogManagerCalciteSchema.java |  8 +-
 .../flink/table/catalog/DatabaseCalciteSchema.java | 27 ++++---
 .../flink/table/api/internal/TableEnvImpl.scala    | 15 ++--
 .../apache/flink/table/planner/StreamPlanner.scala | 16 ++--
 .../table/catalog/DatabaseCalciteSchemaTest.java   |  4 +-
 .../table/sqlexec/SqlToOperationConverterTest.java |  2 +-
 .../table/api/stream/sql/TableFactoryTest.scala    | 74 ++++++++++++++++++
 .../table/utils/TestContextTableFactory.scala      | 69 +++++++++++++++++
 32 files changed, 673 insertions(+), 306 deletions(-)
 copy flink-table/flink-table-common/src/main/java/org/apache/flink/table/{api/constraints/AbstractConstraint.java => factories/TableSinkFactoryContextImpl.java} (53%)
 copy flink-table/flink-table-common/src/main/java/org/apache/flink/table/{api/constraints/AbstractConstraint.java => factories/TableSourceFactoryContextImpl.java} (52%)
 create mode 100644 flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala
 create mode 100644 flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala
 create mode 100644 flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TableFactoryTest.scala
 create mode 100644 flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestContextTableFactory.scala


[flink] 03/06: [FLINK-15912][table-planner-blink] Support create table source/sink by context in blink planner

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 69d8816d164a106f8edf61a768569dafa5b0dc8d
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon Feb 10 15:35:42 2020 +0800

    [FLINK-15912][table-planner-blink] Support create table source/sink by context in blink planner
---
 .../flink/table/factories/TableFactoryUtil.java    | 39 ++++++++++
 .../table/planner/delegation/PlannerBase.scala     | 16 ++--
 .../planner/plan/schema/CatalogSourceTable.scala   | 48 ++++++------
 .../table/planner/plan/batch/sql/TableScanTest.xml |  8 +-
 .../utils/TestCollectionTableFactory.scala         | 53 ++++---------
 .../table/planner/plan/batch/sql/SinkTest.scala    | 31 --------
 .../planner/plan/common/TableFactoryTest.scala     | 88 ++++++++++++++++++++++
 .../plan/utils/TestContextTableFactory.scala       | 69 +++++++++++++++++
 8 files changed, 245 insertions(+), 107 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index 57ca4cf..e33d55c 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -34,6 +34,34 @@ import java.util.Optional;
 public class TableFactoryUtil {
 
 	/**
+	 * Returns a table source matching the descriptor.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T> TableSource<T> findAndCreateTableSource(TableSourceFactory.Context context) {
+		try {
+			return TableFactoryService
+					.find(TableSourceFactory.class, context.getTable().toProperties())
+					.createTableSource(context);
+		} catch (Throwable t) {
+			throw new TableException("findAndCreateTableSource failed.", t);
+		}
+	}
+
+	/**
+	 * Returns a table sink matching the context.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T> TableSink<T> findAndCreateTableSink(TableSinkFactory.Context context) {
+		try {
+			return TableFactoryService
+					.find(TableSinkFactory.class, context.getTable().toProperties())
+					.createTableSink(context);
+		} catch (Throwable t) {
+			throw new TableException("findAndCreateTableSink failed.", t);
+		}
+	}
+
+	/**
 	 * Returns a table source matching the properties.
 	 */
 	@SuppressWarnings("unchecked")
@@ -86,4 +114,15 @@ public class TableFactoryUtil {
 		return Optional.empty();
 	}
 
+	/**
+	 * Creates a table sink for a {@link CatalogTable} using table factory associated with the catalog.
+	 */
+	public static Optional<TableSink> createTableSinkForCatalogTable(Catalog catalog, TableSinkFactory.Context context) {
+		TableFactory tableFactory = catalog.getTableFactory().orElse(null);
+		if (tableFactory instanceof TableSinkFactory) {
+			return Optional.ofNullable(((TableSinkFactory) tableFactory).createTableSink(context));
+		}
+		return Optional.empty();
+	}
+
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index ab084e5..47892a1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException}
 import org.apache.flink.table.catalog._
 import org.apache.flink.table.delegation.{Executor, Parser, Planner}
-import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory}
+import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactoryContextImpl}
 import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
 import org.apache.flink.table.operations._
 import org.apache.flink.table.planner.calcite.{CalciteParser, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory}
@@ -40,7 +40,7 @@ import org.apache.flink.table.planner.plan.utils.SameRelObjectShuttle
 import org.apache.flink.table.planner.sinks.DataStreamTableSink
 import org.apache.flink.table.planner.sinks.TableSinkUtils.{inferSinkPhysicalSchema, validateLogicalPhysicalTypesCompatible, validateSchemaAndApplyImplicitCast, validateTableSink}
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil
-import org.apache.flink.table.sinks.{OverwritableTableSink, TableSink}
+import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
 import org.apache.flink.table.utils.TableSchemaUtils
 
@@ -296,19 +296,15 @@ abstract class PlannerBase(
       case Some(s) if s.isInstanceOf[CatalogTable] =>
         val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName)
         val table = s.asInstanceOf[CatalogTable]
+        val context = new TableSinkFactoryContextImpl(
+          objectIdentifier, table, getTableConfig.getConfiguration)
         if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
-          val objectPath = objectIdentifier.toObjectPath
-          val sink = TableFactoryUtil.createTableSinkForCatalogTable(
-            catalog.get(),
-            table,
-            objectPath)
+          val sink = TableFactoryUtil.createTableSinkForCatalogTable(catalog.get(), context)
           if (sink.isPresent) {
             return Option(table, sink.get())
           }
         }
-        val sinkProperties = table.toProperties
-        Option(table, TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties)
-          .createTableSink(sinkProperties))
+        Option(table, TableFactoryUtil.findAndCreateTableSink(context))
 
       case _ => None
     }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
index 96d35a9..9a82a8d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
@@ -18,12 +18,14 @@
 
 package org.apache.flink.table.planner.plan.schema
 
+import org.apache.flink.configuration.ReadableConfig
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.catalog.CatalogTable
-import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory}
+import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory, TableSourceFactoryContextImpl}
 import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder}
 import org.apache.flink.table.planner.catalog.CatalogSchemaTable
 import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceValidation}
+import org.apache.flink.table.utils.TableConnectorUtils.generateRuntimeName
 
 import org.apache.calcite.plan.{RelOptSchema, RelOptTable}
 import org.apache.calcite.rel.RelNode
@@ -62,12 +64,23 @@ class CatalogSourceTable[T](
       .toMap
   }
 
-  lazy val tableSource: TableSource[T] = findAndCreateTableSource().asInstanceOf[TableSource[T]]
-
-  override def getQualifiedName: JList[String] = explainSourceAsString(tableSource)
+  override def getQualifiedName: JList[String] = {
+    // Do not explain source, we already have full names, table source should be created in toRel.
+    val ret = new util.ArrayList[String](names)
+    // Add class name to distinguish TableSourceTable.
+    val name = generateRuntimeName(getClass, catalogTable.getSchema.getFieldNames)
+    ret.add(s"catalog_source: [$name]")
+    ret
+  }
 
   override def toRel(context: RelOptTable.ToRelContext): RelNode = {
     val cluster = context.getCluster
+    val flinkContext = cluster
+        .getPlanner
+        .getContext
+        .unwrap(classOf[FlinkContext])
+
+    val tableSource = findAndCreateTableSource(flinkContext.getTableConfig.getConfiguration)
     val tableSourceTable = new TableSourceTable[T](
       relOptSchema,
       schemaTable.getTableIdentifier,
@@ -91,11 +104,7 @@ class CatalogSourceTable[T](
     val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema)
     relBuilder.push(scan)
 
-    val toRexFactory = cluster
-        .getPlanner
-        .getContext
-        .unwrap(classOf[FlinkContext])
-        .getSqlExprToRexConverterFactory
+    val toRexFactory = flinkContext.getSqlExprToRexConverterFactory
 
     // 2. push computed column project
     val fieldNames = rowType.getFieldNames.asScala
@@ -140,32 +149,25 @@ class CatalogSourceTable[T](
     relBuilder.build()
   }
 
-  /** Create the table source lazily. */
-  private def findAndCreateTableSource(): TableSource[_] = {
+  /** Create the table source. */
+  private def findAndCreateTableSource(conf: ReadableConfig): TableSource[T] = {
     val tableFactoryOpt = schemaTable.getTableFactory
+    val context = new TableSourceFactoryContextImpl(
+      schemaTable.getTableIdentifier, catalogTable, conf)
     val tableSource = if (tableFactoryOpt.isPresent) {
       tableFactoryOpt.get() match {
         case tableSourceFactory: TableSourceFactory[_] =>
-          tableSourceFactory.createTableSource(
-            schemaTable.getTableIdentifier.toObjectPath,
-            catalogTable)
+          tableSourceFactory.createTableSource(context)
         case _ => throw new TableException("Cannot query a sink-only table. "
           + "TableFactory provided by catalog must implement TableSourceFactory")
       }
     } else {
-      TableFactoryUtil.findAndCreateTableSource(catalogTable)
+      TableFactoryUtil.findAndCreateTableSource(context)
     }
     if (!tableSource.isInstanceOf[StreamTableSource[_]]) {
       throw new TableException("Catalog tables support only "
         + "StreamTableSource and InputFormatTableSource")
     }
-    tableSource
-  }
-
-  override protected def explainSourceAsString(ts: TableSource[_]): JList[String] = {
-    val ret = new util.ArrayList[String](super.explainSourceAsString(ts))
-    // Add class name to distinguish TableSourceTable.
-    ret.add("class: " + classOf[CatalogSourceTable[_]].getSimpleName)
-    ret
+    tableSource.asInstanceOf[TableSource[T]]
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
index 39a67ca..63bea00 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
@@ -100,7 +100,7 @@ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
   <TestCase name="testTableApiScanWithDDL">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)], class: CatalogSourceTable]])
+LogicalTableScan(table=[[default_catalog, default_database, t1, catalog_source: [CatalogSourceTable(a, b)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -113,7 +113,7 @@ TableSourceScan(table=[[default_catalog, default_database, t1, source: [Collecti
   <TestCase name="testTableApiScanWithTemporaryTable">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CsvTableSource(read fields: word)], class: CatalogSourceTable]])
+LogicalTableScan(table=[[default_catalog, default_database, t1, catalog_source: [CatalogSourceTable(word)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -126,7 +126,7 @@ TableSourceScan(table=[[default_catalog, default_database, t1, source: [CsvTable
   <TestCase name="testTableApiScanWithWatermark">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, source: [CollectionTableSource(a, b)], class: CatalogSourceTable]])
+LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, catalog_source: [CatalogSourceTable(a, b, c, d, e)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -140,7 +140,7 @@ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
   <TestCase name="testTableApiScanWithComputedColumn">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)], class: CatalogSourceTable]])
+LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, catalog_source: [CatalogSourceTable(a, b, c, d, e)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
index 01c8fb8..170f118 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.io.{CollectionInputFormat, LocalCollectionOutputFormat}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, DataStreamSource}
@@ -30,12 +29,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.table.api.TableSchema
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
-import org.apache.flink.table.descriptors.{DescriptorProperties, Schema}
-import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, StreamTableSinkFactory, StreamTableSourceFactory}
+import org.apache.flink.table.factories.{TableSinkFactory, TableSourceFactory}
 import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction}
 import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory.{getCollectionSink, getCollectionSource}
 import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, StreamTableSink, TableSink}
-import org.apache.flink.table.sources.{BatchTableSource, LookupableTableSource, StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{BatchTableSource, LookupableTableSource, StreamTableSource}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
 
@@ -45,35 +43,16 @@ import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, List => JL
 
 import scala.collection.JavaConversions._
 
-class TestCollectionTableFactory
-  extends StreamTableSourceFactory[Row]
-    with StreamTableSinkFactory[Row]
-    with BatchTableSourceFactory[Row]
-    with BatchTableSinkFactory[Row]
-{
+class TestCollectionTableFactory extends TableSourceFactory[Row] with TableSinkFactory[Row] {
 
-  override def createTableSource(properties: JMap[String, String]): TableSource[Row] = {
-    getCollectionSource(properties)
+  override def createTableSource(
+      context: TableSourceFactory.Context): StreamTableSource[Row] = {
+    getCollectionSource(context)
   }
 
-  override def createTableSink(properties: JMap[String, String]): TableSink[Row] = {
-    getCollectionSink(properties)
-  }
-
-  override def createStreamTableSource(properties: JMap[String, String]): StreamTableSource[Row] = {
-    getCollectionSource(properties)
-  }
-
-  override def createStreamTableSink(properties: JMap[String, String]): StreamTableSink[Row] = {
-    getCollectionSink(properties)
-  }
-
-  override def createBatchTableSource(properties: JMap[String, String]): BatchTableSource[Row] = {
-    getCollectionSource(properties)
-  }
-
-  override def createBatchTableSink(properties: JMap[String, String]): BatchTableSink[Row] = {
-    getCollectionSink(properties)
+  override def createTableSink(
+      context: TableSinkFactory.Context): StreamTableSink[Row] = {
+    getCollectionSink(context)
   }
 
   override def requiredContext(): JMap[String, String] = {
@@ -118,18 +97,14 @@ object TestCollectionTableFactory {
 
   def getResult: util.List[Row] = RESULT
 
-  def getCollectionSource(props: JMap[String, String]): CollectionTableSource = {
-    val properties = new DescriptorProperties()
-    properties.putProperties(props)
-    val schema = properties.getTableSchema(Schema.SCHEMA)
-    val isBounded = properties.getOptionalBoolean(IS_BOUNDED).orElse(true)
+  def getCollectionSource(context: TableSourceFactory.Context): CollectionTableSource = {
+    val schema = context.getTable.getSchema
+    val isBounded = context.getTable.getProperties.getOrDefault(IS_BOUNDED, "true").toBoolean
     new CollectionTableSource(emitIntervalMS, physicalSchema(schema), isBounded)
   }
 
-  def getCollectionSink(props: JMap[String, String]): CollectionTableSink = {
-    val properties = new DescriptorProperties()
-    properties.putProperties(props)
-    val schema = properties.getTableSchema(Schema.SCHEMA)
+  def getCollectionSink(context: TableSinkFactory.Context): CollectionTableSink = {
+    val schema = context.getTable.getSchema
     new CollectionTableSink(physicalSchema(schema))
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala
index e04f9eb..4a91621 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala
@@ -20,20 +20,11 @@ package org.apache.flink.table.planner.plan.batch.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{DataTypes, TableSchema}
-import org.apache.flink.table.catalog.{CatalogTableImpl, GenericInMemoryCatalog, ObjectPath}
-import org.apache.flink.table.factories.TableSinkFactory
 import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
 import org.apache.flink.table.planner.utils.TableTestBase
-import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.types.logical.{BigIntType, IntType}
 
 import org.junit.Test
-import org.mockito.{ArgumentMatchers, Mockito}
-
-import java.util.Optional
-
-import scala.collection.JavaConverters._
 
 class SinkTest extends TableTestBase {
 
@@ -68,26 +59,4 @@ class SinkTest extends TableTestBase {
 
     util.verifyPlan()
   }
-
-  @Test
-  def testCatalogTableSink(): Unit = {
-    val schemaBuilder = new TableSchema.Builder()
-    schemaBuilder.fields(Array("i"), Array(DataTypes.INT()))
-    val schema = schemaBuilder.build()
-    val sink = util.createCollectTableSink(schema.getFieldNames, Array(INT))
-    val catalog = Mockito.spy(new GenericInMemoryCatalog("dummy"))
-    val factory = Mockito.mock(classOf[TableSinkFactory[_]])
-    Mockito.when[Optional[_]](catalog.getTableFactory).thenReturn(Optional.of(factory))
-    Mockito.when[TableSink[_]](factory.createTableSink(
-      ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(sink)
-    util.tableEnv.registerCatalog(catalog.getName, catalog)
-    util.tableEnv.useCatalog(catalog.getName)
-    val catalogTable = new CatalogTableImpl(schema, Map[String, String]().asJava, "")
-    catalog.createTable(new ObjectPath("default", "tbl"), catalogTable, false)
-    util.tableEnv.sqlQuery("select 1").insertInto("tbl")
-    util.tableEnv.explain(false)
-    // verify we tried to get table factory from catalog
-    Mockito.verify(catalog, Mockito.atLeast(1)).getTableFactory
-  }
-
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala
new file mode 100644
index 0000000..9661bfc
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.common
+
+import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectIdentifier}
+import org.apache.flink.table.factories.TableFactory
+import org.apache.flink.table.planner.plan.utils.TestContextTableFactory
+import org.apache.flink.table.planner.utils.TableTestBase
+
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Assert, Test}
+
+import java.util.Optional
+
+@RunWith(classOf[Parameterized])
+class TableFactoryTest(isBatch: Boolean) extends TableTestBase {
+
+  private val util = if (isBatch) batchTestUtil() else streamTestUtil()
+
+  @Test
+  def testTableSourceSinkFactory(): Unit = {
+    val factory = new TestContextTableFactory(
+      ObjectIdentifier.of("cat", "default", "t1"),
+      ObjectIdentifier.of("cat", "default", "t2"))
+    util.tableEnv.getConfig.getConfiguration.setBoolean(TestContextTableFactory.REQUIRED_KEY, true)
+    util.tableEnv.registerCatalog("cat", new GenericInMemoryCatalog("default") {
+      override def getTableFactory: Optional[TableFactory] = Optional.of(factory)
+    })
+    util.tableEnv.useCatalog("cat")
+
+    val sourceDDL =
+      """
+        |create table t1(
+        |  a int,
+        |  b varchar,
+        |  c as a + 1
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    val sinkDDL =
+      """
+        |create table t2(
+        |  a int,
+        |  b as c - 1,
+        |  c int
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    val query =
+      """
+        |insert into t2
+        |select t1.a, t1.c from t1
+      """.stripMargin
+    util.tableEnv.sqlUpdate(sourceDDL)
+    util.tableEnv.sqlUpdate(sinkDDL)
+    util.tableEnv.sqlUpdate(query)
+
+    util.tableEnv.explain(false)
+    Assert.assertTrue(factory.hasInvokedSource)
+    Assert.assertTrue(factory.hasInvokedSink)
+  }
+}
+
+object TableFactoryTest {
+  @Parameterized.Parameters(name = "isBatch: {0}")
+  def parameters(): java.util.Collection[Boolean] = {
+    java.util.Arrays.asList(true, false)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala
new file mode 100644
index 0000000..9e60cd7
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils
+
+import org.apache.flink.configuration.{ConfigOption, ConfigOptions}
+import org.apache.flink.table.catalog.ObjectIdentifier
+import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactory, TableSourceFactory}
+import org.apache.flink.table.planner.plan.utils.TestContextTableFactory.REQUIRED_KEY
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
+
+import org.junit.Assert
+
+import java.{lang, util}
+
+/**
+  * Test [[TableSourceFactory]] and [[TableSinkFactory]] for context.
+  */
+class TestContextTableFactory[T](
+    sourceIdentifier: ObjectIdentifier,
+    sinkIdentifier: ObjectIdentifier)
+    extends TableSourceFactory[T] with TableSinkFactory[T] {
+
+  var hasInvokedSource = false
+  var hasInvokedSink = false
+
+  override def requiredContext(): util.Map[String, String] = {
+    throw new UnsupportedOperationException
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    throw new UnsupportedOperationException
+  }
+
+  override def createTableSource(context: TableSourceFactory.Context): TableSource[T] = {
+    Assert.assertTrue(context.getConfiguration.get(REQUIRED_KEY))
+    Assert.assertEquals(sourceIdentifier, context.getObjectIdentifier)
+    hasInvokedSource = true
+    TableFactoryUtil.findAndCreateTableSource(context)
+  }
+
+  override def createTableSink(context: TableSinkFactory.Context): TableSink[T] = {
+    Assert.assertTrue(context.getConfiguration.get(REQUIRED_KEY))
+    Assert.assertEquals(sinkIdentifier, context.getObjectIdentifier)
+    hasInvokedSink = true
+    TableFactoryUtil.findAndCreateTableSink(context)
+  }
+}
+
+object TestContextTableFactory{
+  val REQUIRED_KEY: ConfigOption[lang.Boolean] = ConfigOptions
+      .key("testing.required.key").booleanType().defaultValue(false)
+}


[flink] 06/06: [FLINK-15912][table] Clean TableFactoryUtil

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 052eb7722325cf1ef91ff5b898c30996e688eabe
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Feb 12 17:48:52 2020 +0800

    [FLINK-15912][table] Clean TableFactoryUtil
---
 .../flink/table/factories/TableFactoryUtil.java    | 29 ----------------------
 1 file changed, 29 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index beca21d..336d694 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 
-import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -61,34 +60,6 @@ public class TableFactoryUtil {
 	}
 
 	/**
-	 * Returns a table source matching the properties.
-	 */
-	@SuppressWarnings("unchecked")
-	private static <T> TableSource<T> findAndCreateTableSource(Map<String, String> properties) {
-		try {
-			return TableFactoryService
-				.find(TableSourceFactory.class, properties)
-				.createTableSource(properties);
-		} catch (Throwable t) {
-			throw new TableException("findAndCreateTableSource failed.", t);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	private static <T> TableSink<T> findAndCreateTableSink(Map<String, String> properties) {
-		TableSink tableSink;
-		try {
-			tableSink = TableFactoryService
-				.find(TableSinkFactory.class, properties)
-				.createTableSink(properties);
-		} catch (Throwable t) {
-			throw new TableException("findAndCreateTableSink failed.", t);
-		}
-
-		return tableSink;
-	}
-
-	/**
 	 * Creates a table sink for a {@link CatalogTable} using table factory associated with the catalog.
 	 */
 	public static Optional<TableSink> createTableSinkForCatalogTable(Catalog catalog, TableSinkFactory.Context context) {


[flink] 04/06: [FLINK-15912][table-planner] Support create table source/sink by context in legacy planner

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e280ffc8db37697104809c9fed8b0ed2a850372c
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Tue Feb 11 14:28:02 2020 +0800

    [FLINK-15912][table-planner] Support create table source/sink by context in legacy planner
---
 .../flink/table/factories/TableFactoryUtil.java    | 12 ----
 .../flink/table/catalog/CatalogCalciteSchema.java  | 11 +++-
 .../table/catalog/CatalogManagerCalciteSchema.java |  8 ++-
 .../flink/table/catalog/DatabaseCalciteSchema.java | 27 +++++---
 .../flink/table/api/internal/TableEnvImpl.scala    | 15 ++---
 .../apache/flink/table/planner/StreamPlanner.scala | 16 ++---
 .../table/catalog/DatabaseCalciteSchemaTest.java   |  4 +-
 .../table/sqlexec/SqlToOperationConverterTest.java |  2 +-
 .../table/api/stream/sql/TableFactoryTest.scala    | 74 ++++++++++++++++++++++
 .../table/utils/TestContextTableFactory.scala      | 69 ++++++++++++++++++++
 10 files changed, 191 insertions(+), 47 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index e33d55c..bea892e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.factories;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 
@@ -106,17 +105,6 @@ public class TableFactoryUtil {
 	/**
 	 * Creates a table sink for a {@link CatalogTable} using table factory associated with the catalog.
 	 */
-	public static Optional<TableSink> createTableSinkForCatalogTable(Catalog catalog, CatalogTable catalogTable, ObjectPath tablePath) {
-		TableFactory tableFactory = catalog.getTableFactory().orElse(null);
-		if (tableFactory instanceof TableSinkFactory) {
-			return Optional.ofNullable(((TableSinkFactory) tableFactory).createTableSink(tablePath, catalogTable));
-		}
-		return Optional.empty();
-	}
-
-	/**
-	 * Creates a table sink for a {@link CatalogTable} using table factory associated with the catalog.
-	 */
 	public static Optional<TableSink> createTableSinkForCatalogTable(Catalog catalog, TableSinkFactory.Context context) {
 		TableFactory tableFactory = catalog.getTableFactory().orElse(null);
 		if (tableFactory instanceof TableSinkFactory) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
index 5be5a79..f5dc057 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableConfig;
 
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.rel.type.RelProtoDataType;
@@ -44,11 +45,17 @@ public class CatalogCalciteSchema implements Schema {
 	private final boolean isStreamingMode;
 	private final String catalogName;
 	private final CatalogManager catalogManager;
+	private final TableConfig tableConfig;
 
-	public CatalogCalciteSchema(boolean isStreamingMode, String catalogName, CatalogManager catalogManager) {
+	public CatalogCalciteSchema(
+			boolean isStreamingMode,
+			String catalogName,
+			CatalogManager catalogManager,
+			TableConfig tableConfig) {
 		this.isStreamingMode = isStreamingMode;
 		this.catalogName = catalogName;
 		this.catalogManager = catalogManager;
+		this.tableConfig = tableConfig;
 	}
 
 	/**
@@ -60,7 +67,7 @@ public class CatalogCalciteSchema implements Schema {
 	@Override
 	public Schema getSubSchema(String schemaName) {
 		if (catalogManager.schemaExists(catalogName, schemaName)) {
-			return new DatabaseCalciteSchema(isStreamingMode, schemaName, catalogName, catalogManager);
+			return new DatabaseCalciteSchema(isStreamingMode, schemaName, catalogName, catalogManager, tableConfig);
 		} else {
 			return null;
 		}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
index e7376fc..1ec9783 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableConfig;
 
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.rel.type.RelProtoDataType;
@@ -43,10 +44,13 @@ import java.util.Set;
 public class CatalogManagerCalciteSchema implements Schema {
 
 	private final CatalogManager catalogManager;
+	private final TableConfig tableConfig;
 	private boolean isStreamingMode;
 
-	public CatalogManagerCalciteSchema(CatalogManager catalogManager, boolean isStreamingMode) {
+	public CatalogManagerCalciteSchema(
+			CatalogManager catalogManager, TableConfig tableConfig, boolean isStreamingMode) {
 		this.catalogManager = catalogManager;
+		this.tableConfig = tableConfig;
 		this.isStreamingMode = isStreamingMode;
 	}
 
@@ -83,7 +87,7 @@ public class CatalogManagerCalciteSchema implements Schema {
 	@Override
 	public Schema getSubSchema(String name) {
 		if (catalogManager.schemaExists(name)) {
-			return new CatalogCalciteSchema(isStreamingMode, name, catalogManager);
+			return new CatalogCalciteSchema(isStreamingMode, name, catalogManager, tableConfig);
 		} else {
 			return null;
 		}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index 1dd5b23..6226dff 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.table.catalog;
 
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.calcite.FlinkTypeFactory;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
 import org.apache.flink.table.plan.schema.TableSinkTable;
 import org.apache.flink.table.plan.schema.TableSourceTable;
 import org.apache.flink.table.plan.stats.FlinkStatistic;
@@ -57,16 +59,19 @@ class DatabaseCalciteSchema implements Schema {
 	private final String catalogName;
 	private final String databaseName;
 	private final CatalogManager catalogManager;
+	private final TableConfig tableConfig;
 
 	public DatabaseCalciteSchema(
 			boolean isStreamingMode,
 			String databaseName,
 			String catalogName,
-			CatalogManager catalogManager) {
+			CatalogManager catalogManager,
+			TableConfig tableConfig) {
 		this.isStreamingMode = isStreamingMode;
 		this.databaseName = databaseName;
 		this.catalogName = catalogName;
 		this.catalogManager = catalogManager;
+		this.tableConfig = tableConfig;
 	}
 
 	@Override
@@ -83,20 +88,20 @@ class DatabaseCalciteSchema implements Schema {
 						.flatMap(Catalog::getTableFactory)
 						.orElse(null);
 				}
-				return convertTable(identifier.toObjectPath(), table, tableFactory);
+				return convertTable(identifier, table, tableFactory);
 			})
 			.orElse(null);
 	}
 
-	private Table convertTable(ObjectPath tablePath, CatalogBaseTable table, @Nullable TableFactory tableFactory) {
+	private Table convertTable(ObjectIdentifier identifier, CatalogBaseTable table, @Nullable TableFactory tableFactory) {
 		if (table instanceof QueryOperationCatalogView) {
 			return QueryOperationCatalogViewTable.createCalciteTable(((QueryOperationCatalogView) table));
 		} else if (table instanceof ConnectorCatalogTable) {
 			return convertConnectorTable((ConnectorCatalogTable<?, ?>) table);
 		} else if (table instanceof CatalogTable) {
-			return convertCatalogTable(tablePath, (CatalogTable) table, tableFactory);
+			return convertCatalogTable(identifier, (CatalogTable) table, tableFactory);
 		} else if (table instanceof CatalogView) {
-			return convertCatalogView(tablePath, (CatalogView) table);
+			return convertCatalogView(identifier.getObjectName(), (CatalogView) table);
 		} else {
 			throw new TableException("Unsupported table type: " + table);
 		}
@@ -125,17 +130,19 @@ class DatabaseCalciteSchema implements Schema {
 		}
 	}
 
-	private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table, @Nullable TableFactory tableFactory) {
+	private Table convertCatalogTable(ObjectIdentifier identifier, CatalogTable table, @Nullable TableFactory tableFactory) {
 		final TableSource<?> tableSource;
+		final TableSourceFactory.Context context = new TableSourceFactoryContextImpl(
+				identifier, table, tableConfig.getConfiguration());
 		if (tableFactory != null) {
 			if (tableFactory instanceof TableSourceFactory) {
-				tableSource = ((TableSourceFactory) tableFactory).createTableSource(tablePath, table);
+				tableSource = ((TableSourceFactory) tableFactory).createTableSource(context);
 			} else {
 				throw new TableException(
 					"Cannot query a sink-only table. TableFactory provided by catalog must implement TableSourceFactory");
 			}
 		} else {
-			tableSource = TableFactoryUtil.findAndCreateTableSource(table);
+			tableSource = TableFactoryUtil.findAndCreateTableSource(context);
 		}
 
 		if (!(tableSource instanceof StreamTableSource)) {
@@ -153,14 +160,14 @@ class DatabaseCalciteSchema implements Schema {
 		);
 	}
 
-	private Table convertCatalogView(ObjectPath tableName, CatalogView table) {
+	private Table convertCatalogView(String tableName, CatalogView table) {
 		TableSchema schema = table.getSchema();
 		return new ViewTable(
 			null,
 			typeFactory -> ((FlinkTypeFactory) typeFactory).buildLogicalRowType(schema),
 			table.getExpandedQuery(),
 			Arrays.asList(catalogName, databaseName),
-			Arrays.asList(catalogName, databaseName, tableName.getObjectName())
+			Arrays.asList(catalogName, databaseName, tableName)
 		);
 	}
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 3faee81..9fd0278 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.catalog.exceptions.{TableNotExistException => _, _
 import org.apache.flink.table.delegation.Parser
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup
-import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory}
+import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactoryContextImpl}
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction, UserDefinedAggregateFunction, _}
 import org.apache.flink.table.module.{Module, ModuleManager}
 import org.apache.flink.table.operations.ddl._
@@ -102,7 +102,7 @@ abstract class TableEnvImpl(
     new PlanningConfigurationBuilder(
       config,
       functionCatalog,
-      asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
+      asRootSchema(new CatalogManagerCalciteSchema(catalogManager, config, isStreamingMode)),
       expressionBridge)
 
   private val parser: Parser = new ParserImpl(
@@ -741,18 +741,15 @@ abstract class TableEnvImpl(
 
         val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName)
         val catalogTable = s.asInstanceOf[CatalogTable]
+        val context = new TableSinkFactoryContextImpl(
+          objectIdentifier, catalogTable, config.getConfiguration)
         if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
-          val sink = TableFactoryUtil.createTableSinkForCatalogTable(
-            catalog.get(),
-            catalogTable,
-            objectIdentifier.toObjectPath)
+          val sink = TableFactoryUtil.createTableSinkForCatalogTable(catalog.get(), context)
           if (sink.isPresent) {
             return Option(sink.get())
           }
         }
-        val sinkProperties = catalogTable.toProperties
-        Option(TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties)
-          .createTableSink(sinkProperties))
+        Option(TableFactoryUtil.findAndCreateTableSink(context))
 
       case _ => None
     }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 6592f1b..46823f2 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.delegation.{Executor, Parser, Planner}
 import org.apache.flink.table.executor.StreamExecutor
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.{ExpressionBridge, PlannerExpression, PlannerExpressionConverter, PlannerTypeInferenceUtilImpl}
-import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory}
+import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactoryContextImpl}
 import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
 import org.apache.flink.table.operations._
 import org.apache.flink.table.plan.StreamOptimizer
@@ -51,7 +51,6 @@ import _root_.java.util
 import _root_.java.util.Objects
 import _root_.java.util.function.{Supplier => JSupplier}
 
-import _root_.scala.collection.JavaConversions._
 import _root_.scala.collection.JavaConverters._
 
 /**
@@ -77,7 +76,7 @@ class StreamPlanner(
   functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
 
   private val internalSchema: CalciteSchema =
-    asRootSchema(new CatalogManagerCalciteSchema(catalogManager, true))
+    asRootSchema(new CatalogManagerCalciteSchema(catalogManager, config, true))
 
   // temporary bridge between API and planner
   private val expressionBridge: ExpressionBridge[PlannerExpression] =
@@ -434,18 +433,15 @@ class StreamPlanner(
       case Some(s) if s.isInstanceOf[CatalogTable] =>
         val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName)
         val catalogTable = s.asInstanceOf[CatalogTable]
+        val context = new TableSinkFactoryContextImpl(
+          objectIdentifier, catalogTable, config.getConfiguration)
         if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
-          val sink = TableFactoryUtil.createTableSinkForCatalogTable(
-            catalog.get(),
-            catalogTable,
-            objectIdentifier.toObjectPath)
+          val sink = TableFactoryUtil.createTableSinkForCatalogTable(catalog.get(), context)
           if (sink.isPresent) {
             return Option(sink.get())
           }
         }
-        val sinkProperties = catalogTable.toProperties
-        Option(TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties)
-          .createTableSink(sinkProperties))
+        Option(TableFactoryUtil.findAndCreateTableSink(context))
 
       case _ => None
     }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
index 69e1438..a850f48 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.catalog;
 
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.TestExternalTableSourceFactory.TestExternalTableSource;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -55,7 +56,8 @@ public class DatabaseCalciteSchemaTest {
 		DatabaseCalciteSchema calciteSchema = new DatabaseCalciteSchema(true,
 			databaseName,
 			catalogName,
-			catalogManager);
+			catalogManager,
+			new TableConfig());
 
 		catalog.createTable(new ObjectPath(databaseName, tableName), new TestCatalogBaseTable(), false);
 		Table table = calciteSchema.getTable(tableName);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 8bd077b..036cc6a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -95,7 +95,7 @@ public class SqlToOperationConverterTest {
 	private final PlanningConfigurationBuilder planningConfigurationBuilder =
 		new PlanningConfigurationBuilder(tableConfig,
 			functionCatalog,
-			asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false)),
+			asRootSchema(new CatalogManagerCalciteSchema(catalogManager, tableConfig, false)),
 			new ExpressionBridge<>(functionCatalog,
 				PlannerExpressionConverter.INSTANCE()));
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TableFactoryTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TableFactoryTest.scala
new file mode 100644
index 0000000..9bdbfd1
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TableFactoryTest.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectIdentifier}
+import org.apache.flink.table.factories.TableFactory
+import org.apache.flink.table.utils.{TableTestBase, TestContextTableFactory}
+
+import org.junit.{Assert, Test}
+
+import java.util.Optional
+
+class TableFactoryTest extends TableTestBase {
+
+  private val util = streamTestUtil()
+
+  @Test
+  def testTableSourceSinkFactory(): Unit = {
+    val factory = new TestContextTableFactory(
+      ObjectIdentifier.of("cat", "default", "t1"),
+      ObjectIdentifier.of("cat", "default", "t2"))
+    util.tableEnv.getConfig.getConfiguration.setBoolean(TestContextTableFactory.REQUIRED_KEY, true)
+    util.tableEnv.registerCatalog("cat", new GenericInMemoryCatalog("default") {
+      override def getTableFactory: Optional[TableFactory] = Optional.of(factory)
+    })
+    util.tableEnv.useCatalog("cat")
+
+    val sourceDDL =
+      """
+        |create table t1(
+        |  a int,
+        |  b varchar,
+        |  c int
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    val sinkDDL =
+      """
+        |create table t2(
+        |  a int,
+        |  b int
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    val query =
+      """
+        |insert into t2
+        |select t1.a, t1.c from t1
+      """.stripMargin
+    util.tableEnv.sqlUpdate(sourceDDL)
+    util.tableEnv.sqlUpdate(sinkDDL)
+    util.tableEnv.sqlUpdate(query)
+    Assert.assertTrue(factory.hasInvokedSource)
+    Assert.assertTrue(factory.hasInvokedSink)
+  }
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestContextTableFactory.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestContextTableFactory.scala
new file mode 100644
index 0000000..611076f
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestContextTableFactory.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.configuration.{ConfigOption, ConfigOptions}
+import org.apache.flink.table.catalog.ObjectIdentifier
+import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactory, TableSourceFactory}
+import org.apache.flink.table.utils.TestContextTableFactory.REQUIRED_KEY
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
+
+import org.junit.Assert
+
+import java.{lang, util}
+
+/**
+  * Test [[TableSourceFactory]] and [[TableSinkFactory]] for context.
+  */
+class TestContextTableFactory[T](
+    sourceIdentifier: ObjectIdentifier,
+    sinkIdentifier: ObjectIdentifier)
+    extends TableSourceFactory[T] with TableSinkFactory[T] {
+
+  var hasInvokedSource = false
+  var hasInvokedSink = false
+
+  override def requiredContext(): util.Map[String, String] = {
+    throw new UnsupportedOperationException
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    throw new UnsupportedOperationException
+  }
+
+  override def createTableSource(context: TableSourceFactory.Context): TableSource[T] = {
+    Assert.assertTrue(context.getConfiguration.get(REQUIRED_KEY))
+    Assert.assertEquals(sourceIdentifier, context.getObjectIdentifier)
+    hasInvokedSource = true
+    TableFactoryUtil.findAndCreateTableSource(context)
+  }
+
+  override def createTableSink(context: TableSinkFactory.Context): TableSink[T] = {
+    Assert.assertTrue(context.getConfiguration.get(REQUIRED_KEY))
+    Assert.assertEquals(sinkIdentifier, context.getObjectIdentifier)
+    hasInvokedSink = true
+    TableFactoryUtil.findAndCreateTableSink(context)
+  }
+}
+
+object TestContextTableFactory{
+  val REQUIRED_KEY: ConfigOption[lang.Boolean] = ConfigOptions
+      .key("testing.required.key").booleanType().defaultValue(false)
+}


[flink] 01/06: [FLINK-15912][table] Add Context to TableSourceFactory and TableSinkFactory

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6895da4f762e506c4d0fa8d6dea427094e84173
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon Feb 10 14:58:41 2020 +0800

    [FLINK-15912][table] Add Context to TableSourceFactory and TableSinkFactory
    
    This closes #11047
---
 .../table/factories/StreamTableSinkFactory.java    | 15 +++++-
 .../table/factories/StreamTableSourceFactory.java  | 15 +++++-
 .../flink/table/factories/TableSinkFactory.java    | 49 ++++++++++++++++-
 .../factories/TableSinkFactoryContextImpl.java     | 61 ++++++++++++++++++++++
 .../flink/table/factories/TableSourceFactory.java  | 49 ++++++++++++++++-
 .../factories/TableSourceFactoryContextImpl.java   | 61 ++++++++++++++++++++++
 6 files changed, 244 insertions(+), 6 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSinkFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSinkFactory.java
index 86c7e82..c721424 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSinkFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSinkFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.sinks.StreamTableSink;
 import org.apache.flink.table.sinks.TableSink;
 
@@ -38,14 +39,24 @@ public interface StreamTableSinkFactory<T> extends TableSinkFactory<T> {
 	 *
 	 * @param properties normalized properties describing a table sink.
 	 * @return the configured table sink.
+	 * @deprecated {@link Context} contains more information, and already contains table schema too.
+	 * Please use {@link #createTableSink(Context)} instead.
 	 */
-	StreamTableSink<T> createStreamTableSink(Map<String, String> properties);
+	@Deprecated
+	default StreamTableSink<T> createStreamTableSink(Map<String, String> properties) {
+		return null;
+	}
 
 	/**
 	 * Only create stream table sink.
 	 */
 	@Override
 	default TableSink<T> createTableSink(Map<String, String> properties) {
-		return createStreamTableSink(properties);
+		StreamTableSink<T> sink = createStreamTableSink(properties);
+		if (sink == null) {
+			throw new ValidationException(
+					"Please override 'createTableSink(Context)' method.");
+		}
+		return sink;
 	}
 }
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSourceFactory.java
index b007d7b..3e392ed 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSourceFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.sources.TableSource;
 
@@ -38,14 +39,24 @@ public interface StreamTableSourceFactory<T> extends TableSourceFactory<T> {
 	 *
 	 * @param properties normalized properties describing a stream table source.
 	 * @return the configured stream table source.
+	 * @deprecated {@link Context} contains more information, and already contains table schema too.
+	 * Please use {@link #createTableSource(Context)} instead.
 	 */
-	StreamTableSource<T> createStreamTableSource(Map<String, String> properties);
+	@Deprecated
+	default StreamTableSource<T> createStreamTableSource(Map<String, String> properties) {
+		return null;
+	}
 
 	/**
 	 * Only create a stream table source.
 	 */
 	@Override
 	default TableSource<T> createTableSource(Map<String, String> properties) {
-		return createStreamTableSource(properties);
+		StreamTableSource<T> source = createStreamTableSource(properties);
+		if (source == null) {
+			throw new ValidationException(
+					"Please override 'createTableSource(Context)' method.");
+		}
+		return source;
 	}
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
index 05d0f34..c92ec9f 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
@@ -19,7 +19,9 @@
 package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.sinks.TableSink;
 
@@ -39,8 +41,13 @@ public interface TableSinkFactory<T> extends TableFactory {
 	 *
 	 * @param properties normalized properties describing a table sink.
 	 * @return the configured table sink.
+	 * @deprecated {@link Context} contains more information, and already contains table schema too.
+	 * Please use {@link #createTableSink(Context)} instead.
 	 */
-	TableSink<T> createTableSink(Map<String, String> properties);
+	@Deprecated
+	default TableSink<T> createTableSink(Map<String, String> properties) {
+		return null;
+	}
 
 	/**
 	 * Creates and configures a {@link TableSink} based on the given {@link CatalogTable} instance.
@@ -48,9 +55,49 @@ public interface TableSinkFactory<T> extends TableFactory {
 	 * @param tablePath path of the given {@link CatalogTable}
 	 * @param table {@link CatalogTable} instance.
 	 * @return the configured table sink.
+	 * @deprecated {@link Context} contains more information, and already contains table schema too.
+	 * Please use {@link #createTableSink(Context)} instead.
 	 */
+	@Deprecated
 	default TableSink<T> createTableSink(ObjectPath tablePath, CatalogTable table) {
 		return createTableSink(table.toProperties());
 	}
 
+	/**
+	 * Creates and configures a {@link TableSink} based on the given
+	 {@link Context}.
+	 *
+	 * @param context context of this table sink.
+	 * @return the configured table sink.
+	 */
+	default TableSink<T> createTableSink(Context context) {
+		return createTableSink(
+				context.getObjectIdentifier().toObjectPath(),
+				context.getTable());
+	}
+
+	/**
+	 * Context of table sink creation. Contains table information and
+	 environment information.
+	 */
+	interface Context {
+
+		/**
+		 * @return full identifier of the given {@link CatalogTable}.
+		 */
+		ObjectIdentifier getObjectIdentifier();
+
+		/**
+		 * @return table {@link CatalogTable} instance.
+		 */
+		CatalogTable getTable();
+
+		/**
+		 * @return readable config of this table environment. The configuration gives the ability
+		 * to access {@code TableConfig#getConfiguration()} which holds the current
+		 * {@code TableEnvironment} session configurations.
+		 */
+		ReadableConfig getConfiguration();
+	}
+
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactoryContextImpl.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactoryContextImpl.java
new file mode 100644
index 0000000..e69d353
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactoryContextImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of {@link TableSinkFactory.Context}.
+ */
+@Internal
+public class TableSinkFactoryContextImpl implements TableSinkFactory.Context {
+
+	private final ObjectIdentifier identifier;
+	private final CatalogTable table;
+	private final ReadableConfig config;
+
+	public TableSinkFactoryContextImpl(
+			ObjectIdentifier identifier,
+			CatalogTable table,
+			ReadableConfig config) {
+		this.identifier = checkNotNull(identifier);
+		this.table = checkNotNull(table);
+		this.config = checkNotNull(config);
+	}
+
+	@Override
+	public ObjectIdentifier getObjectIdentifier() {
+		return identifier;
+	}
+
+	@Override
+	public CatalogTable getTable() {
+		return table;
+	}
+
+	@Override
+	public ReadableConfig getConfiguration() {
+		return config;
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java
index c0f97d9..8cc5bdc 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java
@@ -19,7 +19,9 @@
 package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.sources.TableSource;
 
@@ -39,8 +41,13 @@ public interface TableSourceFactory<T> extends TableFactory {
 	 *
 	 * @param properties normalized properties describing a table source.
 	 * @return the configured table source.
+	 * @deprecated {@link Context} contains more information, and already contains table schema too.
+	 * Please use {@link #createTableSource(Context)} instead.
 	 */
-	TableSource<T> createTableSource(Map<String, String> properties);
+	@Deprecated
+	default TableSource<T> createTableSource(Map<String, String> properties) {
+		return null;
+	}
 
 	/**
 	 * Creates and configures a {@link TableSource} based on the given {@link CatalogTable} instance.
@@ -48,9 +55,49 @@ public interface TableSourceFactory<T> extends TableFactory {
 	 * @param tablePath path of the given {@link CatalogTable}
 	 * @param table {@link CatalogTable} instance.
 	 * @return the configured table source.
+	 * @deprecated {@link Context} contains more information, and already contains table schema too.
+	 * Please use {@link #createTableSource(Context)} instead.
 	 */
+	@Deprecated
 	default TableSource<T> createTableSource(ObjectPath tablePath, CatalogTable table) {
 		return createTableSource(table.toProperties());
 	}
 
+	/**
+	 * Creates and configures a {@link TableSource} based on the given
+	 {@link Context}.
+	 *
+	 * @param context context of this table source.
+	 * @return the configured table source.
+	 */
+	default TableSource<T> createTableSource(Context context) {
+		return createTableSource(
+				context.getObjectIdentifier().toObjectPath(),
+				context.getTable());
+	}
+
+	/**
+	 * Context of table source creation. Contains table information and
+	 environment information.
+	 */
+	interface Context {
+
+		/**
+		 * @return full identifier of the given {@link CatalogTable}.
+		 */
+		ObjectIdentifier getObjectIdentifier();
+
+		/**
+		 * @return table {@link CatalogTable} instance.
+		 */
+		CatalogTable getTable();
+
+		/**
+		 * @return readable config of this table environment. The configuration gives the ability
+		 * to access {@code TableConfig#getConfiguration()} which holds the current
+		 * {@code TableEnvironment} session configurations.
+		 */
+		ReadableConfig getConfiguration();
+	}
+
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactoryContextImpl.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactoryContextImpl.java
new file mode 100644
index 0000000..5fcab42
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactoryContextImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of {@link TableSourceFactory.Context}.
+ */
+@Internal
+public class TableSourceFactoryContextImpl implements TableSourceFactory.Context {
+
+	private final ObjectIdentifier identifier;
+	private final CatalogTable table;
+	private final ReadableConfig config;
+
+	public TableSourceFactoryContextImpl(
+			ObjectIdentifier identifier,
+			CatalogTable table,
+			ReadableConfig config) {
+		this.identifier = checkNotNull(identifier);
+		this.table = checkNotNull(table);
+		this.config = checkNotNull(config);
+	}
+
+	@Override
+	public ObjectIdentifier getObjectIdentifier() {
+		return identifier;
+	}
+
+	@Override
+	public CatalogTable getTable() {
+		return table;
+	}
+
+	@Override
+	public ReadableConfig getConfiguration() {
+		return config;
+	}
+}


[flink] 05/06: [FLINK-15912][table] Support create table source/sink by context in hive connector

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4e92bdb186e6abc1d5e033ebfb4978e94af20cc7
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Tue Feb 11 16:50:49 2020 +0800

    [FLINK-15912][table] Support create table source/sink by context in hive connector
---
 .../flink/connectors/hive/HiveTableFactory.java    | 26 +++++++---------------
 .../connectors/hive/HiveTableFactoryTest.java      | 16 +++++++++----
 .../flink/connectors/hive/HiveTableSourceTest.java |  3 ++-
 .../flink/table/factories/TableFactoryUtil.java    | 14 ------------
 4 files changed, 22 insertions(+), 37 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
index dc70ec2..9a17b71 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
@@ -64,26 +64,16 @@ public class HiveTableFactory
 	}
 
 	@Override
-	public TableSink<Row> createTableSink(Map<String, String> properties) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public TableSource<BaseRow> createTableSource(Map properties) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public TableSource<BaseRow> createTableSource(ObjectPath tablePath, CatalogTable table) {
-		Preconditions.checkNotNull(table);
+	public TableSource<BaseRow> createTableSource(TableSourceFactory.Context context) {
+		CatalogTable table = checkNotNull(context.getTable());
 		Preconditions.checkArgument(table instanceof CatalogTableImpl);
 
 		boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
 
 		if (!isGeneric) {
-			return createHiveTableSource(tablePath, table);
+			return createHiveTableSource(context.getObjectIdentifier().toObjectPath(), table);
 		} else {
-			return TableFactoryUtil.findAndCreateTableSource(table);
+			return TableFactoryUtil.findAndCreateTableSource(context);
 		}
 	}
 
@@ -95,16 +85,16 @@ public class HiveTableFactory
 	}
 
 	@Override
-	public TableSink<Row> createTableSink(ObjectPath tablePath, CatalogTable table) {
-		Preconditions.checkNotNull(table);
+	public TableSink<Row> createTableSink(TableSinkFactory.Context context) {
+		CatalogTable table = checkNotNull(context.getTable());
 		Preconditions.checkArgument(table instanceof CatalogTableImpl);
 
 		boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
 
 		if (!isGeneric) {
-			return createOutputFormatTableSink(tablePath, table);
+			return createOutputFormatTableSink(context.getObjectIdentifier().toObjectPath(), table);
 		} else {
-			return TableFactoryUtil.findAndCreateTableSink(table);
+			return TableFactoryUtil.findAndCreateTableSink(context);
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
index f04d496..88dc191 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
@@ -18,16 +18,20 @@
 
 package org.apache.flink.connectors.hive;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.factories.TableSinkFactoryContextImpl;
+import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
 import org.apache.flink.table.sinks.StreamTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.StreamTableSource;
@@ -78,9 +82,11 @@ public class HiveTableFactoryTest {
 		Optional<TableFactory> opt = catalog.getTableFactory();
 		assertTrue(opt.isPresent());
 		HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
-		TableSource tableSource = tableFactory.createTableSource(path, table);
+		TableSource tableSource = tableFactory.createTableSource(new TableSourceFactoryContextImpl(
+				ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration()));
 		assertTrue(tableSource instanceof StreamTableSource);
-		TableSink tableSink = tableFactory.createTableSink(path, table);
+		TableSink tableSink = tableFactory.createTableSink(new TableSinkFactoryContextImpl(
+				ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration()));
 		assertTrue(tableSink instanceof StreamTableSink);
 	}
 
@@ -100,9 +106,11 @@ public class HiveTableFactoryTest {
 		Optional<TableFactory> opt = catalog.getTableFactory();
 		assertTrue(opt.isPresent());
 		HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
-		TableSink tableSink = tableFactory.createTableSink(path, table);
+		TableSink tableSink = tableFactory.createTableSink(new TableSinkFactoryContextImpl(
+				ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration()));
 		assertTrue(tableSink instanceof HiveTableSink);
-		TableSource tableSource = tableFactory.createTableSource(path, table);
+		TableSource tableSource = tableFactory.createTableSource(new TableSourceFactoryContextImpl(
+				ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration()));
 		assertTrue(tableSource instanceof HiveTableSource);
 	}
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
index 18cdea2..e5afef5 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.utils.TableTestUtil;
@@ -447,7 +448,7 @@ public class HiveTableSourceTest {
 
 		HiveTableFactory tableFactorySpy = spy((HiveTableFactory) hiveCatalog.getTableFactory().get());
 		doReturn(new TestVectorReaderSource(new JobConf(hiveCatalog.getHiveConf()), tablePath, catalogTable))
-				.when(tableFactorySpy).createTableSource(any(ObjectPath.class), any(CatalogTable.class));
+				.when(tableFactorySpy).createTableSource(any(TableSourceFactory.Context.class));
 		HiveCatalog catalogSpy = spy(hiveCatalog);
 		doReturn(Optional.of(tableFactorySpy)).when(catalogSpy).getTableFactory();
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index bea892e..beca21d 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -89,20 +89,6 @@ public class TableFactoryUtil {
 	}
 
 	/**
-	 * Returns a table sink matching the {@link org.apache.flink.table.catalog.CatalogTable}.
-	 */
-	public static <T> TableSink<T> findAndCreateTableSink(CatalogTable table) {
-		return findAndCreateTableSink(table.toProperties());
-	}
-
-	/**
-	 * Returns a table source matching the {@link org.apache.flink.table.catalog.CatalogTable}.
-	 */
-	public static <T> TableSource<T> findAndCreateTableSource(CatalogTable table) {
-		return findAndCreateTableSource(table.toProperties());
-	}
-
-	/**
 	 * Creates a table sink for a {@link CatalogTable} using table factory associated with the catalog.
 	 */
 	public static Optional<TableSink> createTableSinkForCatalogTable(Catalog catalog, TableSinkFactory.Context context) {


[flink] 02/06: [FLINK-15912][table] Support create table source/sink by context in sql-cli

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 306a89a3556ca3fbab0306301f56972ccf11641b
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Tue Feb 11 15:39:59 2020 +0800

    [FLINK-15912][table] Support create table source/sink by context in sql-cli
---
 .../client/gateway/local/ExecutionContext.java     | 37 +++++++++++++++-------
 .../gateway/utils/TestTableSinkFactoryBase.java    | 11 +++----
 .../gateway/utils/TestTableSourceFactoryBase.java  | 10 +++---
 .../flink/table/catalog/CatalogTableImpl.java      | 15 +++++++++
 .../table/descriptors/ConnectTableDescriptor.java  | 15 +--------
 .../table/descriptors/DescriptorProperties.java    | 19 +++++++++++
 6 files changed, 71 insertions(+), 36 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index ba69ebd..8bad114 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -48,11 +48,12 @@ import org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl;
 import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.config.entries.DeploymentEntry;
-import org.apache.flink.table.client.config.entries.ExecutionEntry;
 import org.apache.flink.table.client.config.entries.SinkTableEntry;
 import org.apache.flink.table.client.config.entries.SourceSinkTableEntry;
 import org.apache.flink.table.client.config.entries.SourceTableEntry;
@@ -72,7 +73,9 @@ import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.factories.ModuleFactory;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.factories.TableSinkFactory;
+import org.apache.flink.table.factories.TableSinkFactoryContextImpl;
 import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionService;
@@ -374,12 +377,18 @@ public class ExecutionContext<ClusterID> {
 		return factory.createCatalog(name, catalogProperties);
 	}
 
-	private static TableSource<?> createTableSource(ExecutionEntry execution, Map<String, String> sourceProperties, ClassLoader classLoader) {
-		if (execution.isStreamingPlanner()) {
+	private TableSource<?> createTableSource(String name, Map<String, String> sourceProperties) {
+		if (environment.getExecution().isStreamingPlanner()) {
 			final TableSourceFactory<?> factory = (TableSourceFactory<?>)
 				TableFactoryService.find(TableSourceFactory.class, sourceProperties, classLoader);
-			return factory.createTableSource(sourceProperties);
-		} else if (execution.isBatchPlanner()) {
+			return factory.createTableSource(new TableSourceFactoryContextImpl(
+					ObjectIdentifier.of(
+							tableEnv.getCurrentCatalog(),
+							tableEnv.getCurrentDatabase(),
+							name),
+					CatalogTableImpl.fromProperties(sourceProperties),
+					tableEnv.getConfig().getConfiguration()));
+		} else if (environment.getExecution().isBatchPlanner()) {
 			final BatchTableSourceFactory<?> factory = (BatchTableSourceFactory<?>)
 				TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, classLoader);
 			return factory.createBatchTableSource(sourceProperties);
@@ -387,12 +396,18 @@ public class ExecutionContext<ClusterID> {
 		throw new SqlExecutionException("Unsupported execution type for sources.");
 	}
 
-	private static TableSink<?> createTableSink(ExecutionEntry execution, Map<String, String> sinkProperties, ClassLoader classLoader) {
-		if (execution.isStreamingPlanner()) {
+	private TableSink<?> createTableSink(String name, Map<String, String> sinkProperties) {
+		if (environment.getExecution().isStreamingPlanner()) {
 			final TableSinkFactory<?> factory = (TableSinkFactory<?>)
 				TableFactoryService.find(TableSinkFactory.class, sinkProperties, classLoader);
-			return factory.createTableSink(sinkProperties);
-		} else if (execution.isBatchPlanner()) {
+			return factory.createTableSink(new TableSinkFactoryContextImpl(
+					ObjectIdentifier.of(
+							tableEnv.getCurrentCatalog(),
+							tableEnv.getCurrentDatabase(),
+							name),
+					CatalogTableImpl.fromProperties(sinkProperties),
+					tableEnv.getConfig().getConfiguration()));
+		} else if (environment.getExecution().isBatchPlanner()) {
 			final BatchTableSinkFactory<?> factory = (BatchTableSinkFactory<?>)
 				TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, classLoader);
 			return factory.createBatchTableSink(sinkProperties);
@@ -567,10 +582,10 @@ public class ExecutionContext<ClusterID> {
 		Map<String, TableSink<?>> tableSinks = new HashMap<>();
 		environment.getTables().forEach((name, entry) -> {
 			if (entry instanceof SourceTableEntry || entry instanceof SourceSinkTableEntry) {
-				tableSources.put(name, createTableSource(environment.getExecution(), entry.asMap(), classLoader));
+				tableSources.put(name, createTableSource(name, entry.asMap()));
 			}
 			if (entry instanceof SinkTableEntry || entry instanceof SourceSinkTableEntry) {
-				tableSinks.put(name, createTableSink(environment.getExecution(), entry.asMap(), classLoader));
+				tableSinks.put(name, createTableSink(name, entry.asMap()));
 			}
 		});
 		// register table sources
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
index 408bf24..e934959 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
@@ -22,9 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.SchemaValidator;
 import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.TableSinkFactory;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.StreamTableSink;
 import org.apache.flink.table.sinks.TableSink;
@@ -93,12 +92,10 @@ public abstract class TestTableSinkFactoryBase implements StreamTableSinkFactory
 	}
 
 	@Override
-	public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
-		final DescriptorProperties params = new DescriptorProperties(true);
-		params.putProperties(properties);
+	public StreamTableSink<Row> createTableSink(TableSinkFactory.Context context) {
 		return new TestTableSink(
-				SchemaValidator.deriveTableSinkSchema(params),
-				properties.get(testProperty));
+				context.getTable().getSchema(),
+				context.getTable().getProperties().get(testProperty));
 	}
 
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
index e64ae51..f305b90 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.Types;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.SchemaValidator;
 import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.table.sources.DefinedProctimeAttribute;
 import org.apache.flink.table.sources.DefinedRowtimeAttributes;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
@@ -96,14 +97,15 @@ public abstract class TestTableSourceFactoryBase implements StreamTableSourceFac
 	}
 
 	@Override
-	public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
+	public StreamTableSource<Row> createTableSource(TableSourceFactory.Context context) {
+		TableSchema schema = context.getTable().getSchema();
 		final DescriptorProperties params = new DescriptorProperties(true);
-		params.putProperties(properties);
+		params.putProperties(context.getTable().getProperties());
 		final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
 		final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
 		return new TestTableSource(
-			TableSchemaUtils.getPhysicalSchema(params.getTableSchema(SCHEMA)),
-			properties.get(testProperty),
+			schema,
+			context.getTable().getProperties().get(testProperty),
 			proctime.orElse(null),
 			rowtime);
 	}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
index 5566fc3..9808b32 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
@@ -81,4 +81,19 @@ public class CatalogTableImpl extends AbstractCatalogTable {
 
 		return descriptor.asMap();
 	}
+
+	/**
+	 * Construct a {@link CatalogTableImpl} from complete properties that contains table schema.
+	 */
+	public static CatalogTableImpl fromProperties(Map<String, String> properties) {
+		DescriptorProperties descriptorProperties = new DescriptorProperties();
+		descriptorProperties.putProperties(properties);
+		TableSchema tableSchema = descriptorProperties.getTableSchema(Schema.SCHEMA);
+		descriptorProperties.removeKeyPrefix(Schema.SCHEMA);
+		return new CatalogTableImpl(
+				tableSchema,
+				descriptorProperties.asMap(),
+				""
+		);
+	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
index fb63382..a7c54c7 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
@@ -29,7 +29,6 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nullable;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -80,19 +79,7 @@ public abstract class ConnectTableDescriptor
 					" use registerTableSource/registerTableSink/registerTableSourceAndSink.");
 		}
 
-		Map<String, String> schemaProperties = schemaDescriptor.toProperties();
-		TableSchema tableSchema = getTableSchema(schemaProperties);
-
-		Map<String, String> properties = new HashMap<>(toProperties());
-		schemaProperties.keySet().forEach(properties::remove);
-
-		CatalogTableImpl catalogTable = new CatalogTableImpl(
-			tableSchema,
-			properties,
-			""
-		);
-
-		registration.createTemporaryTable(path, catalogTable);
+		registration.createTemporaryTable(path, CatalogTableImpl.fromProperties(toProperties()));
 	}
 
 	private TableSchema getTableSchema(Map<String, String> schemaProperties) {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
index f299db4..1f17519 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
@@ -44,6 +44,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -144,6 +145,24 @@ public class DescriptorProperties {
 	}
 
 	/**
+	 * Removes the mapping for a key prefix from this properties if it is present.
+	 *
+	 * <p>For example: for prefix "flink", the kvs in properties like key "flink.k" and
+	 * value "v" will be removed.
+	 */
+	public void removeKeyPrefix(String prefix) {
+		checkNotNull(prefix);
+
+		Iterator<Map.Entry<String, String>> iterator = properties.entrySet().iterator();
+		while (iterator.hasNext()) {
+			String key = iterator.next().getKey();
+			if (key.startsWith(prefix)) {
+				iterator.remove();
+			}
+		}
+	}
+
+	/**
 	 * Adds a class under the given key.
 	 */
 	public void putClass(String key, Class<?> clazz) {