You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/23 17:18:01 UTC
[flink] branch release-1.11 updated: [FLINK-18416][table-common]
Deprecate TableEnvironment#connect API
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new ace2f4c [FLINK-18416][table-common] Deprecate TableEnvironment#connect API
ace2f4c is described below
commit ace2f4c1595c09d1311c2f8fc7b557b5329559cd
Author: Jark Wu <ja...@apache.org>
AuthorDate: Wed Jun 24 01:17:21 2020 +0800
[FLINK-18416][table-common] Deprecate TableEnvironment#connect API
This closes #12753
---
flink-python/pyflink/table/table_environment.py | 12 +++-
.../api/bridge/java/BatchTableEnvironment.java | 4 ++
.../api/bridge/java/StreamTableEnvironment.java | 4 ++
.../apache/flink/table/api/TableEnvironment.java | 10 +++-
.../api/bridge/scala/BatchTableEnvironment.scala | 60 ++++++++++---------
.../api/bridge/scala/StreamTableEnvironment.scala | 70 ++++++++++++----------
6 files changed, 94 insertions(+), 66 deletions(-)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 4cdbfcd..e7f5698 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -189,7 +189,7 @@ class TableEnvironment(object):
:param table_source: The table source to register.
:type table_source: pyflink.table.TableSource
- .. note:: Deprecated in 1.10. Use :func:`connect` instead.
+ .. note:: Deprecated in 1.10. Use :func:`execute_sql` instead.
"""
warnings.warn("Deprecated in 1.10. Use connect instead.", DeprecationWarning)
self._j_tenv.registerTableSourceInternal(name, table_source._j_table_source)
@@ -214,7 +214,7 @@ class TableEnvironment(object):
:param table_sink: The table sink to register.
:type table_sink: pyflink.table.TableSink
- .. note:: Deprecated in 1.10. Use :func:`connect` instead.
+ .. note:: Deprecated in 1.10. Use :func:`execute_sql` instead.
"""
warnings.warn("Deprecated in 1.10. Use connect instead.", DeprecationWarning)
self._j_tenv.registerTableSinkInternal(name, table_sink._j_table_sink)
@@ -798,6 +798,8 @@ class TableEnvironment(object):
:return: A :class:`~pyflink.table.descriptors.ConnectTableDescriptor` used to build the
temporary table.
:rtype: pyflink.table.descriptors.ConnectTableDescriptor
+
+ .. note:: Deprecated in 1.11. Use :func:`execute_sql` to register a table instead.
"""
pass
@@ -1397,7 +1399,10 @@ class StreamTableEnvironment(TableEnvironment):
:return: A :class:`~pyflink.table.descriptors.StreamTableDescriptor` used to build the
temporary table.
:rtype: pyflink.table.descriptors.StreamTableDescriptor
+
+ .. note:: Deprecated in 1.11. Use :func:`execute_sql` to register a table instead.
"""
+ warnings.warn("Deprecated in 1.11. Use execute_sql instead.", DeprecationWarning)
return StreamTableDescriptor(
self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
@@ -1531,7 +1536,10 @@ class BatchTableEnvironment(TableEnvironment):
to build the temporary table.
:rtype: pyflink.table.descriptors.BatchTableDescriptor or
pyflink.table.descriptors.StreamTableDescriptor
+
+ .. note:: Deprecated in 1.11. Use :func:`execute_sql` to register a table instead.
"""
+ warnings.warn("Deprecated in 1.11. Use execute_sql instead.", DeprecationWarning)
gateway = get_gateway()
blink_t_env_class = get_java_class(
gateway.jvm.org.apache.flink.table.api.internal.TableEnvironmentImpl)
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.java
index bf972f0..9b5dd00 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.java
@@ -442,8 +442,12 @@ public interface BatchTableEnvironment extends TableEnvironment {
* </pre>
*
* @param connectorDescriptor connector descriptor describing the external system
+ * @deprecated The SQL {@code CREATE TABLE} DDL is richer than this part of the API. This method
+ * might be refactored in the next versions. Please use {@link #executeSql(String) executeSql(ddl)}
+ * to register a table instead.
*/
@Override
+ @Deprecated
BatchTableDescriptor connect(ConnectorDescriptor connectorDescriptor);
/**
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
index 6f80220..a287c9d 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
@@ -636,8 +636,12 @@ public interface StreamTableEnvironment extends TableEnvironment {
* </pre>
*
* @param connectorDescriptor connector descriptor describing the external system
+ * @deprecated The SQL {@code CREATE TABLE} DDL is richer than this part of the API. This method
+ * might be refactored in the next versions. Please use {@link #executeSql(String) executeSql(ddl)}
+ * to register a table instead.
*/
@Override
+ @Deprecated
StreamTableDescriptor connect(ConnectorDescriptor connectorDescriptor);
/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index dfe5054e..87fe7d8 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -555,7 +555,7 @@ public interface TableEnvironment {
*
* @param name The name under which the {@link TableSource} is registered.
* @param tableSource The {@link TableSource} to register.
- * @deprecated Use {@link #connect(ConnectorDescriptor)} instead.
+ * @deprecated Use {@link #executeSql(String) executeSql(ddl)} to register a table instead.
*/
@Deprecated
void registerTableSource(String name, TableSource<?> tableSource);
@@ -573,7 +573,7 @@ public interface TableEnvironment {
* @param fieldNames The field names to register with the {@link TableSink}.
* @param fieldTypes The field types to register with the {@link TableSink}.
* @param tableSink The {@link TableSink} to register.
- * @deprecated Use {@link #connect(ConnectorDescriptor)} instead.
+ * @deprecated Use {@link #executeSql(String) executeSql(ddl)} to register a table instead.
*/
@Deprecated
void registerTableSink(String name, String[] fieldNames, TypeInformation<?>[] fieldTypes, TableSink<?> tableSink);
@@ -589,7 +589,7 @@ public interface TableEnvironment {
*
* @param name The name under which the {@link TableSink} is registered.
* @param configuredSink The configured {@link TableSink} to register.
- * @deprecated Use {@link #connect(ConnectorDescriptor)} instead.
+ * @deprecated Use {@link #executeSql(String) executeSql(ddl)} to register a table instead.
*/
@Deprecated
void registerTableSink(String name, TableSink<?> configuredSink);
@@ -728,7 +728,11 @@ public interface TableEnvironment {
*</pre>
*
* @param connectorDescriptor connector descriptor describing the external system
+ * @deprecated The SQL {@code CREATE TABLE} DDL is richer than this part of the API. This method
+ * might be refactored in the next versions. Please use {@link #executeSql(String) executeSql(ddl)}
+ * to register a table instead.
*/
+ @Deprecated
ConnectTableDescriptor connect(ConnectorDescriptor connectorDescriptor);
/**
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/BatchTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/BatchTableEnvironment.scala
index a769a56..829abf0 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/BatchTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/BatchTableEnvironment.scala
@@ -303,34 +303,38 @@ trait BatchTableEnvironment extends TableEnvironment {
override def execute(jobName: String): JobExecutionResult
/**
- * Creates a temporary table from a descriptor.
- *
- * Descriptors allow for declaring the communication to external systems in an
- * implementation-agnostic way. The classpath is scanned for suitable table factories that match
- * the desired configuration.
- *
- * The following example shows how to read from a connector using a JSON format and
- * registering a temporary table as "MyTable":
- *
- * {{{
- *
- * tableEnv
- * .connect(
- * new ExternalSystemXYZ()
- * .version("0.11"))
- * .withFormat(
- * new Json()
- * .jsonSchema("{...}")
- * .failOnMissingField(false))
- * .withSchema(
- * new Schema()
- * .field("user-name", "VARCHAR").from("u_name")
- * .field("count", "DECIMAL")
- * .createTemporaryTable("MyTable")
- * }}}
- *
- * @param connectorDescriptor connector descriptor describing the external system
- */
+ * Creates a temporary table from a descriptor.
+ *
+ * Descriptors allow for declaring the communication to external systems in an
+ * implementation-agnostic way. The classpath is scanned for suitable table factories that match
+ * the desired configuration.
+ *
+ * The following example shows how to read from a connector using a JSON format and
+ * registering a temporary table as "MyTable":
+ *
+ * {{{
+ *
+ * tableEnv
+ * .connect(
+ * new ExternalSystemXYZ()
+ * .version("0.11"))
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withSchema(
+ * new Schema()
+ * .field("user-name", "VARCHAR").from("u_name")
+ * .field("count", "DECIMAL")
+ * .createTemporaryTable("MyTable")
+ * }}}
+ *
+ * @param connectorDescriptor connector descriptor describing the external system
+ * @deprecated The SQL `CREATE TABLE` DDL is richer than this part of the API.
+ * This method might be refactored in the next versions.
+ * Please use [[executeSql]] to register a table instead.
+ */
+ @deprecated
override def connect(connectorDescriptor: ConnectorDescriptor): BatchTableDescriptor
}
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala
index dd0373e..b462d74 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala
@@ -359,39 +359,43 @@ trait StreamTableEnvironment extends TableEnvironment {
override def execute(jobName: String): JobExecutionResult
/**
- * Creates a table source and/or table sink from a descriptor.
- *
- * Descriptors allow for declaring the communication to external systems in an
- * implementation-agnostic way. The classpath is scanned for suitable table factories that match
- * the desired configuration.
- *
- * The following example shows how to read from a Kafka connector using a JSON format and
- * registering a table source "MyTable" in append mode:
- *
- * {{{
- *
- * tableEnv
- * .connect(
- * new Kafka()
- * .version("0.11")
- * .topic("clicks")
- * .property("group.id", "click-group")
- * .startFromEarliest())
- * .withFormat(
- * new Json()
- * .jsonSchema("{...}")
- * .failOnMissingField(false))
- * .withSchema(
- * new Schema()
- * .field("user-name", "VARCHAR").from("u_name")
- * .field("count", "DECIMAL")
- * .field("proc-time", "TIMESTAMP").proctime())
- * .inAppendMode()
- * .createTemporaryTable("MyTable")
- * }}}
- *
- * @param connectorDescriptor connector descriptor describing the external system
- */
+ * Creates a table source and/or table sink from a descriptor.
+ *
+ * Descriptors allow for declaring the communication to external systems in an
+ * implementation-agnostic way. The classpath is scanned for suitable table factories that match
+ * the desired configuration.
+ *
+ * The following example shows how to read from a Kafka connector using a JSON format and
+ * registering a table source "MyTable" in append mode:
+ *
+ * {{{
+ *
+ * tableEnv
+ * .connect(
+ * new Kafka()
+ * .version("0.11")
+ * .topic("clicks")
+ * .property("group.id", "click-group")
+ * .startFromEarliest())
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withSchema(
+ * new Schema()
+ * .field("user-name", "VARCHAR").from("u_name")
+ * .field("count", "DECIMAL")
+ * .field("proc-time", "TIMESTAMP").proctime())
+ * .inAppendMode()
+ * .createTemporaryTable("MyTable")
+ * }}}
+ *
+ * @param connectorDescriptor connector descriptor describing the external system
+ * @deprecated The SQL `CREATE TABLE` DDL is richer than this part of the API.
+ * This method might be refactored in the next versions.
+ * Please use [[executeSql]] to register a table instead.
+ */
+ @deprecated
override def connect(connectorDescriptor: ConnectorDescriptor): StreamTableDescriptor
}