You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/09/28 05:01:37 UTC
[flink] branch master updated: [FLINK-19417][python] Fix the
implementation of StreamTableEnvironment.from_data_stream
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 900071b [FLINK-19417][python] Fix the implementation of StreamTableEnvironment.from_data_stream
900071b is described below
commit 900071bb7a9073f67b8af1097ee858e59626593c
Author: SteNicholas <pr...@163.com>
AuthorDate: Sun Sep 27 18:39:45 2020 +0800
[FLINK-19417][python] Fix the implementation of StreamTableEnvironment.from_data_stream
This closes #13491.
---
flink-python/pyflink/table/table_environment.py | 25 ++++++++++++++++------
.../table/tests/test_table_environment_api.py | 9 ++++++++
2 files changed, 28 insertions(+), 6 deletions(-)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index db748a4..6f2cbeb 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -1747,7 +1747,7 @@ class StreamTableEnvironment(TableEnvironment):
stream_execution_environment._j_stream_execution_environment)
return StreamTableEnvironment(j_tenv)
- def from_data_stream(self, data_stream: DataStream, fields: List[str] = None) -> Table:
+ def from_data_stream(self, data_stream: DataStream, *fields: Union[str, Expression]) -> Table:
"""
Converts the given DataStream into a Table with specified field names.
@@ -1769,12 +1769,21 @@ class StreamTableEnvironment(TableEnvironment):
:param fields: The fields expressions to map original fields of the DataStream to the fields
of the Table
:return: The converted Table.
+
+ .. versionadded:: 1.12.0
"""
- if fields is not None:
- j_table = self._j_tenv.fromDataStream(data_stream._j_data_stream, fields)
- else:
- j_table = self._j_tenv.fromDataStream(data_stream._j_data_stream)
- return Table(j_table=j_table, t_env=self._j_tenv)
+ j_data_stream = data_stream._j_data_stream
+ if len(fields) == 0:
+ return Table(j_table=self._j_tenv.fromDataStream(j_data_stream), t_env=self)
+ elif all(isinstance(f, Expression) for f in fields):
+ return Table(j_table=self._j_tenv.fromDataStream(
+ j_data_stream, to_expression_jarray(fields)), t_env=self)
+ elif len(fields) == 1 and isinstance(fields[0], str):
+ warnings.warn(
+ "Deprecated in 1.12. Use from_data_stream(DataStream, *Expression) instead.",
+ DeprecationWarning)
+ return Table(j_table=self._j_tenv.fromDataStream(j_data_stream, fields[0]), t_env=self)
+ raise ValueError("Invalid arguments for 'fields': %r" % fields)
def to_append_stream(self, table: Table, type_info: TypeInformation) -> DataStream:
"""
@@ -1788,6 +1797,8 @@ class StreamTableEnvironment(TableEnvironment):
:param table: The Table to convert.
:param type_info: The TypeInformation that specifies the type of the DataStream.
:return: The converted DataStream.
+
+ .. versionadded:: 1.12.0
"""
j_data_stream = self._j_tenv.toAppendStream(table._j_table, type_info.get_java_type_info())
return DataStream(j_data_stream=j_data_stream)
@@ -1806,6 +1817,8 @@ class StreamTableEnvironment(TableEnvironment):
:param table: The Table to convert.
:param type_info: The TypeInformation of the requested record type.
:return: The converted DataStream.
+
+ .. versionadded:: 1.12.0
"""
j_data_stream = self._j_tenv.toRetractStream(table._j_table, type_info.get_java_type_info())
return DataStream(j_data_stream=j_data_stream)
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 10210ea..783e591 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -32,6 +32,7 @@ from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment, Envir
Module, ResultKind
from pyflink.table.descriptors import FileSystem, OldCsv, Schema
from pyflink.table.explain_detail import ExplainDetail
+from pyflink.table.expressions import col
from pyflink.table.table_config import TableConfig
from pyflink.table.table_environment import BatchTableEnvironment
from pyflink.table.types import RowType
@@ -411,6 +412,14 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCa
expected = ['1,Hi,Hello', '2,Hello,Hi']
self.assert_equals(result, expected)
+ table = t_env.from_data_stream(ds, col('a'), col('b'), col('c'))
+ t_env.register_table_sink("ExprSink",
+ source_sink_utils.TestAppendSink(field_names, field_types))
+ t_env.insert_into("ExprSink", table)
+ t_env.execute("test_from_data_stream_with_expr")
+ result = source_sink_utils.results()
+ self.assert_equals(result, expected)
+
def test_to_append_stream(self):
self.env.set_parallelism(1)
t_env = StreamTableEnvironment.create(