You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/09/28 05:01:37 UTC

[flink] branch master updated: [FLINK-19417][python] Fix the implementation of StreamTableEnvironment.from_data_stream

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 900071b  [FLINK-19417][python] Fix the implementation of StreamTableEnvironment.from_data_stream
900071b is described below

commit 900071bb7a9073f67b8af1097ee858e59626593c
Author: SteNicholas <pr...@163.com>
AuthorDate: Sun Sep 27 18:39:45 2020 +0800

    [FLINK-19417][python] Fix the implementation of StreamTableEnvironment.from_data_stream
    
    This closes #13491.
---
 flink-python/pyflink/table/table_environment.py    | 25 ++++++++++++++++------
 .../table/tests/test_table_environment_api.py      |  9 ++++++++
 2 files changed, 28 insertions(+), 6 deletions(-)

diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index db748a4..6f2cbeb 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -1747,7 +1747,7 @@ class StreamTableEnvironment(TableEnvironment):
                     stream_execution_environment._j_stream_execution_environment)
         return StreamTableEnvironment(j_tenv)
 
-    def from_data_stream(self, data_stream: DataStream, fields: List[str] = None) -> Table:
+    def from_data_stream(self, data_stream: DataStream, *fields: Union[str, Expression]) -> Table:
         """
         Converts the given DataStream into a Table with specified field names.
 
@@ -1769,12 +1769,21 @@ class StreamTableEnvironment(TableEnvironment):
         :param fields: The fields expressions to map original fields of the DataStream to the fields
                        of the Table
         :return: The converted Table.
+
+        .. versionadded:: 1.12.0
         """
-        if fields is not None:
-            j_table = self._j_tenv.fromDataStream(data_stream._j_data_stream, fields)
-        else:
-            j_table = self._j_tenv.fromDataStream(data_stream._j_data_stream)
-        return Table(j_table=j_table, t_env=self._j_tenv)
+        j_data_stream = data_stream._j_data_stream
+        if len(fields) == 0:
+            return Table(j_table=self._j_tenv.fromDataStream(j_data_stream), t_env=self)
+        elif all(isinstance(f, Expression) for f in fields):
+            return Table(j_table=self._j_tenv.fromDataStream(
+                j_data_stream, to_expression_jarray(fields)), t_env=self)
+        elif len(fields) == 1 and isinstance(fields[0], str):
+            warnings.warn(
+                "Deprecated in 1.12. Use from_data_stream(DataStream, *Expression) instead.",
+                DeprecationWarning)
+            return Table(j_table=self._j_tenv.fromDataStream(j_data_stream, fields[0]), t_env=self)
+        raise ValueError("Invalid arguments for 'fields': %r" % fields)
 
     def to_append_stream(self, table: Table, type_info: TypeInformation) -> DataStream:
         """
@@ -1788,6 +1797,8 @@ class StreamTableEnvironment(TableEnvironment):
         :param table: The Table to convert.
         :param type_info: The TypeInformation that specifies the type of the DataStream.
         :return: The converted DataStream.
+
+        .. versionadded:: 1.12.0
         """
         j_data_stream = self._j_tenv.toAppendStream(table._j_table, type_info.get_java_type_info())
         return DataStream(j_data_stream=j_data_stream)
@@ -1806,6 +1817,8 @@ class StreamTableEnvironment(TableEnvironment):
         :param table: The Table to convert.
         :param type_info: The TypeInformation of the requested record type.
         :return: The converted DataStream.
+
+        .. versionadded:: 1.12.0
         """
         j_data_stream = self._j_tenv.toRetractStream(table._j_table, type_info.get_java_type_info())
         return DataStream(j_data_stream=j_data_stream)
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 10210ea..783e591 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -32,6 +32,7 @@ from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment, Envir
     Module, ResultKind
 from pyflink.table.descriptors import FileSystem, OldCsv, Schema
 from pyflink.table.explain_detail import ExplainDetail
+from pyflink.table.expressions import col
 from pyflink.table.table_config import TableConfig
 from pyflink.table.table_environment import BatchTableEnvironment
 from pyflink.table.types import RowType
@@ -411,6 +412,14 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCa
         expected = ['1,Hi,Hello', '2,Hello,Hi']
         self.assert_equals(result, expected)
 
+        table = t_env.from_data_stream(ds, col('a'), col('b'), col('c'))
+        t_env.register_table_sink("ExprSink",
+                                  source_sink_utils.TestAppendSink(field_names, field_types))
+        t_env.insert_into("ExprSink", table)
+        t_env.execute("test_from_data_stream_with_expr")
+        result = source_sink_utils.results()
+        self.assert_equals(result, expected)
+
     def test_to_append_stream(self):
         self.env.set_parallelism(1)
         t_env = StreamTableEnvironment.create(