You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/23 17:18:01 UTC

[flink] branch release-1.11 updated: [FLINK-18416][table-common] Deprecate TableEnvironment#connect API

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new ace2f4c  [FLINK-18416][table-common] Deprecate TableEnvironment#connect API
ace2f4c is described below

commit ace2f4c1595c09d1311c2f8fc7b557b5329559cd
Author: Jark Wu <ja...@apache.org>
AuthorDate: Wed Jun 24 01:17:21 2020 +0800

    [FLINK-18416][table-common] Deprecate TableEnvironment#connect API
    
    This closes #12753
---
 flink-python/pyflink/table/table_environment.py    | 12 +++-
 .../api/bridge/java/BatchTableEnvironment.java     |  4 ++
 .../api/bridge/java/StreamTableEnvironment.java    |  4 ++
 .../apache/flink/table/api/TableEnvironment.java   | 10 +++-
 .../api/bridge/scala/BatchTableEnvironment.scala   | 60 ++++++++++---------
 .../api/bridge/scala/StreamTableEnvironment.scala  | 70 ++++++++++++----------
 6 files changed, 94 insertions(+), 66 deletions(-)

diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 4cdbfcd..e7f5698 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -189,7 +189,7 @@ class TableEnvironment(object):
         :param table_source: The table source to register.
         :type table_source: pyflink.table.TableSource
 
-        .. note:: Deprecated in 1.10. Use :func:`connect` instead.
+        .. note:: Deprecated in 1.10. Use :func:`execute_sql` instead.
         """
         warnings.warn("Deprecated in 1.10. Use connect instead.", DeprecationWarning)
         self._j_tenv.registerTableSourceInternal(name, table_source._j_table_source)
@@ -214,7 +214,7 @@ class TableEnvironment(object):
         :param table_sink: The table sink to register.
         :type table_sink: pyflink.table.TableSink
 
-        .. note:: Deprecated in 1.10. Use :func:`connect` instead.
+        .. note:: Deprecated in 1.10. Use :func:`execute_sql` instead.
         """
         warnings.warn("Deprecated in 1.10. Use connect instead.", DeprecationWarning)
         self._j_tenv.registerTableSinkInternal(name, table_sink._j_table_sink)
@@ -798,6 +798,8 @@ class TableEnvironment(object):
         :return: A :class:`~pyflink.table.descriptors.ConnectTableDescriptor` used to build the
                  temporary table.
         :rtype: pyflink.table.descriptors.ConnectTableDescriptor
+
+        .. note:: Deprecated in 1.11. Use :func:`execute_sql` to register a table instead.
         """
         pass
 
@@ -1397,7 +1399,10 @@ class StreamTableEnvironment(TableEnvironment):
         :return: A :class:`~pyflink.table.descriptors.StreamTableDescriptor` used to build the
                  temporary table.
         :rtype: pyflink.table.descriptors.StreamTableDescriptor
+
+        .. note:: Deprecated in 1.11. Use :func:`execute_sql` to register a table instead.
         """
+        warnings.warn("Deprecated in 1.11. Use execute_sql instead.", DeprecationWarning)
         return StreamTableDescriptor(
             self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
 
@@ -1531,7 +1536,10 @@ class BatchTableEnvironment(TableEnvironment):
                  to build the temporary table.
         :rtype: pyflink.table.descriptors.BatchTableDescriptor or
                 pyflink.table.descriptors.StreamTableDescriptor
+
+        .. note:: Deprecated in 1.11. Use :func:`execute_sql` to register a table instead.
         """
+        warnings.warn("Deprecated in 1.11. Use execute_sql instead.", DeprecationWarning)
         gateway = get_gateway()
         blink_t_env_class = get_java_class(
             gateway.jvm.org.apache.flink.table.api.internal.TableEnvironmentImpl)
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.java
index bf972f0..9b5dd00 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.java
@@ -442,8 +442,12 @@ public interface BatchTableEnvironment extends TableEnvironment {
 	 * </pre>
 	 *
 	 * @param connectorDescriptor connector descriptor describing the external system
+	 * @deprecated The SQL {@code CREATE TABLE} DDL is richer than this part of the API. This method
+	 * might be refactored in the next versions. Please use {@link #executeSql(String) executeSql(ddl)}
+	 * to register a table instead.
 	 */
 	@Override
+	@Deprecated
 	BatchTableDescriptor connect(ConnectorDescriptor connectorDescriptor);
 
 	/**
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
index 6f80220..a287c9d 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
@@ -636,8 +636,12 @@ public interface StreamTableEnvironment extends TableEnvironment {
 	 * </pre>
 	 *
 	 * @param connectorDescriptor connector descriptor describing the external system
+	 * @deprecated The SQL {@code CREATE TABLE} DDL is richer than this part of the API. This method
+	 * might be refactored in the next versions. Please use {@link #executeSql(String) executeSql(ddl)}
+	 * to register a table instead.
 	 */
 	@Override
+	@Deprecated
 	StreamTableDescriptor connect(ConnectorDescriptor connectorDescriptor);
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index dfe5054e..87fe7d8 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -555,7 +555,7 @@ public interface TableEnvironment {
 	 *
 	 * @param name        The name under which the {@link TableSource} is registered.
 	 * @param tableSource The {@link TableSource} to register.
-	 * @deprecated Use {@link #connect(ConnectorDescriptor)} instead.
+	 * @deprecated Use {@link #executeSql(String) executeSql(ddl)} to register a table instead.
 	 */
 	@Deprecated
 	void registerTableSource(String name, TableSource<?> tableSource);
@@ -573,7 +573,7 @@ public interface TableEnvironment {
 	 * @param fieldNames The field names to register with the {@link TableSink}.
 	 * @param fieldTypes The field types to register with the {@link TableSink}.
 	 * @param tableSink The {@link TableSink} to register.
-	 * @deprecated Use {@link #connect(ConnectorDescriptor)} instead.
+	 * @deprecated Use {@link #executeSql(String) executeSql(ddl)} to register a table instead.
 	 */
 	@Deprecated
 	void registerTableSink(String name, String[] fieldNames, TypeInformation<?>[] fieldTypes, TableSink<?> tableSink);
@@ -589,7 +589,7 @@ public interface TableEnvironment {
 	 *
 	 * @param name The name under which the {@link TableSink} is registered.
 	 * @param configuredSink The configured {@link TableSink} to register.
-	 * @deprecated Use {@link #connect(ConnectorDescriptor)} instead.
+	 * @deprecated Use {@link #executeSql(String) executeSql(ddl)} to register a table instead.
 	 */
 	@Deprecated
 	void registerTableSink(String name, TableSink<?> configuredSink);
@@ -728,7 +728,11 @@ public interface TableEnvironment {
 	 *</pre>
 	 *
 	 * @param connectorDescriptor connector descriptor describing the external system
+	 * @deprecated The SQL {@code CREATE TABLE} DDL is richer than this part of the API. This method
+	 * might be refactored in the next versions. Please use {@link #executeSql(String) executeSql(ddl)}
+	 * to register a table instead.
 	 */
+	@Deprecated
 	ConnectTableDescriptor connect(ConnectorDescriptor connectorDescriptor);
 
 	/**
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/BatchTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/BatchTableEnvironment.scala
index a769a56..829abf0 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/BatchTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/BatchTableEnvironment.scala
@@ -303,34 +303,38 @@ trait BatchTableEnvironment extends TableEnvironment {
   override def execute(jobName: String): JobExecutionResult
 
   /**
-    * Creates a temporary table from a descriptor.
-    *
-    * Descriptors allow for declaring the communication to external systems in an
-    * implementation-agnostic way. The classpath is scanned for suitable table factories that match
-    * the desired configuration.
-    *
-    * The following example shows how to read from a connector using a JSON format and
-    * registering a temporary table as "MyTable":
-    *
-    * {{{
-    *
-    * tableEnv
-    *   .connect(
-    *     new ExternalSystemXYZ()
-    *       .version("0.11"))
-    *   .withFormat(
-    *     new Json()
-    *       .jsonSchema("{...}")
-    *       .failOnMissingField(false))
-    *   .withSchema(
-    *     new Schema()
-    *       .field("user-name", "VARCHAR").from("u_name")
-    *       .field("count", "DECIMAL")
-    *   .createTemporaryTable("MyTable")
-    * }}}
-    *
-    * @param connectorDescriptor connector descriptor describing the external system
-    */
+   * Creates a temporary table from a descriptor.
+   *
+   * Descriptors allow for declaring the communication to external systems in an
+   * implementation-agnostic way. The classpath is scanned for suitable table factories that match
+   * the desired configuration.
+   *
+   * The following example shows how to read from a connector using a JSON format and
+   * registering a temporary table as "MyTable":
+   *
+   * {{{
+   *
+   * tableEnv
+   *   .connect(
+   *     new ExternalSystemXYZ()
+   *       .version("0.11"))
+   *   .withFormat(
+   *     new Json()
+   *       .jsonSchema("{...}")
+   *       .failOnMissingField(false))
+   *   .withSchema(
+   *     new Schema()
+   *       .field("user-name", "VARCHAR").from("u_name")
+   *       .field("count", "DECIMAL")
+   *   .createTemporaryTable("MyTable")
+   * }}}
+   *
+   * @param connectorDescriptor connector descriptor describing the external system
+   * @deprecated The SQL `CREATE TABLE` DDL is richer than this part of the API.
+   *             This method might be refactored in the next versions.
+   *             Please use [[executeSql]] to register a table instead.
+   */
+  @deprecated
   override def connect(connectorDescriptor: ConnectorDescriptor): BatchTableDescriptor
 }
 
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala
index dd0373e..b462d74 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala
@@ -359,39 +359,43 @@ trait StreamTableEnvironment extends TableEnvironment {
   override def execute(jobName: String): JobExecutionResult
 
   /**
-    * Creates a table source and/or table sink from a descriptor.
-    *
-    * Descriptors allow for declaring the communication to external systems in an
-    * implementation-agnostic way. The classpath is scanned for suitable table factories that match
-    * the desired configuration.
-    *
-    * The following example shows how to read from a Kafka connector using a JSON format and
-    * registering a table source "MyTable" in append mode:
-    *
-    * {{{
-    *
-    * tableEnv
-    *   .connect(
-    *     new Kafka()
-    *       .version("0.11")
-    *       .topic("clicks")
-    *       .property("group.id", "click-group")
-    *       .startFromEarliest())
-    *   .withFormat(
-    *     new Json()
-    *       .jsonSchema("{...}")
-    *       .failOnMissingField(false))
-    *   .withSchema(
-    *     new Schema()
-    *       .field("user-name", "VARCHAR").from("u_name")
-    *       .field("count", "DECIMAL")
-    *       .field("proc-time", "TIMESTAMP").proctime())
-    *   .inAppendMode()
-    *   .createTemporaryTable("MyTable")
-    * }}}
-    *
-    * @param connectorDescriptor connector descriptor describing the external system
-    */
+   * Creates a table source and/or table sink from a descriptor.
+   *
+   * Descriptors allow for declaring the communication to external systems in an
+   * implementation-agnostic way. The classpath is scanned for suitable table factories that match
+   * the desired configuration.
+   *
+   * The following example shows how to read from a Kafka connector using a JSON format and
+   * registering a table source "MyTable" in append mode:
+   *
+   * {{{
+   *
+   * tableEnv
+   *   .connect(
+   *     new Kafka()
+   *       .version("0.11")
+   *       .topic("clicks")
+   *       .property("group.id", "click-group")
+   *       .startFromEarliest())
+   *   .withFormat(
+   *     new Json()
+   *       .jsonSchema("{...}")
+   *       .failOnMissingField(false))
+   *   .withSchema(
+   *     new Schema()
+   *       .field("user-name", "VARCHAR").from("u_name")
+   *       .field("count", "DECIMAL")
+   *       .field("proc-time", "TIMESTAMP").proctime())
+   *   .inAppendMode()
+   *   .createTemporaryTable("MyTable")
+   * }}}
+   *
+   * @param connectorDescriptor connector descriptor describing the external system
+   * @deprecated The SQL `CREATE TABLE` DDL is richer than this part of the API.
+   *             This method might be refactored in the next versions.
+   *             Please use [[executeSql]] to register a table instead.
+   */
+  @deprecated
   override def connect(connectorDescriptor: ConnectorDescriptor): StreamTableDescriptor
 }