You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/07/05 11:17:01 UTC
[flink] branch master updated: [FLINK-25231][python]Update Pyflink to use the new type system
This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a951831ddda [FLINK-25231][python]Update Pyflink to use the new type system
a951831ddda is described below
commit a951831dddaa163e7d97d4118984d238bb0164a1
Author: acquachen <ac...@gmail.com>
AuthorDate: Fri Mar 18 10:23:03 2022 +0800
[FLINK-25231][python]Update Pyflink to use the new type system
This closes #19140.
---
.../datastream/stream_execution_environment.py | 9 +-
flink-python/pyflink/table/descriptors.py | 4 +-
flink-python/pyflink/table/expressions.py | 21 +-
flink-python/pyflink/table/sinks.py | 12 +-
flink-python/pyflink/table/sources.py | 5 +-
flink-python/pyflink/table/table_environment.py | 25 +-
flink-python/pyflink/table/table_result.py | 4 +-
flink-python/pyflink/table/table_schema.py | 15 +-
flink-python/pyflink/table/tests/test_calc.py | 38 +-
flink-python/pyflink/table/tests/test_correlate.py | 20 +-
.../pyflink/table/tests/test_dependency.py | 38 +-
.../pyflink/table/tests/test_descriptor.py | 24 +-
.../pyflink/table/tests/test_pandas_conversion.py | 29 +-
.../pyflink/table/tests/test_pandas_udaf.py | 201 ++++----
.../pyflink/table/tests/test_pandas_udf.py | 38 +-
.../table/tests/test_row_based_operation.py | 58 +--
flink-python/pyflink/table/tests/test_sql.py | 20 +-
.../table/tests/test_table_environment_api.py | 39 +-
flink-python/pyflink/table/tests/test_types.py | 56 +--
flink-python/pyflink/table/tests/test_udaf.py | 95 ++--
flink-python/pyflink/table/tests/test_udf.py | 111 ++---
flink-python/pyflink/table/tests/test_udtf.py | 15 +-
flink-python/pyflink/table/types.py | 208 ++------
flink-python/pyflink/table/udf.py | 12 +-
flink-python/pyflink/testing/source_sink_utils.py | 39 +-
.../flink/streaming/api/utils/PythonTypeUtils.java | 531 +++++++++++++++++++++
.../utils/python/PythonDynamicTableFactory.java | 69 +++
.../utils/python/PythonDynamicTableOptions.java | 38 ++
.../utils/python/PythonDynamicTableSource.java | 72 +++
.../utils/python/PythonInputFormatTableSource.java | 68 ---
.../flink/table/utils/python/PythonTableUtils.java | 512 +++++++++-----------
.../org.apache.flink.table.factories.Factory} | 2 +-
.../table/legacyutils/ByteMaxAggFunction.java | 76 ---
.../flink/table/legacyutils/CustomAssigner.java | 32 --
.../flink/table/legacyutils/CustomExtractor.java | 69 ---
.../flink/table/legacyutils/MaxAccumulator.java | 25 -
.../apache/flink/table/legacyutils/RichFunc0.java | 69 ---
.../flink/table/legacyutils/RowCollector.java | 79 ---
.../apache/flink/table/legacyutils/RowSink.java | 32 --
.../apache/flink/table/legacyutils/TableFunc1.java | 42 --
.../flink/table/legacyutils/TestAppendSink.java | 73 ---
.../legacyutils/TestCollectionTableFactory.java | 307 ------------
.../flink/table/legacyutils/TestRetractSink.java | 63 ---
.../flink/table/legacyutils/TestUpsertSink.java | 87 ----
.../table/utils/TestCollectionTableFactory.java | 321 +++++++++++++
.../flink/table/utils/TestingDescriptors.java | 109 +++++
.../apache/flink/table/utils/TestingFunctions.java | 164 +++++++
.../flink/table/utils/TestingSinkTableFactory.java | 50 ++
.../org/apache/flink/table/utils/TestingSinks.java | 149 ++++++
...ry => org.apache.flink.table.factories.Factory} | 3 +-
.../functions/python/PythonScalarFunction.java | 21 +-
.../functions/python/PythonTableFunction.java | 22 +-
52 files changed, 2246 insertions(+), 1975 deletions(-)
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py
index 8418f20bc01..7d5d9d4d4da 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -933,12 +933,11 @@ class StreamExecutionEnvironment(object):
else:
j_objs = gateway.jvm.PythonBridgeUtils.readPythonObjects(temp_file.name)
out_put_type_info = type_info
- # Since flink python module depends on table module, we can make use of utils of it when
- # implementing python DataStream API.
- PythonTableUtils = gateway.jvm\
- .org.apache.flink.table.utils.python.PythonTableUtils
+
+ PythonTypeUtils = gateway.jvm\
+ .org.apache.flink.streaming.api.utils.PythonTypeUtils
execution_config = self._j_stream_execution_environment.getConfig()
- j_input_format = PythonTableUtils.getCollectionInputFormat(
+ j_input_format = PythonTypeUtils.getCollectionInputFormat(
j_objs,
out_put_type_info.get_java_type_info(),
execution_config
diff --git a/flink-python/pyflink/table/descriptors.py b/flink-python/pyflink/table/descriptors.py
index 1d1ea823330..c2c1e643a60 100644
--- a/flink-python/pyflink/table/descriptors.py
+++ b/flink-python/pyflink/table/descriptors.py
@@ -24,7 +24,7 @@ from typing import Dict, Union
from pyflink.java_gateway import get_gateway
from pyflink.table.table_schema import TableSchema
-from pyflink.table.types import _to_java_type, DataType
+from pyflink.table.types import DataType, _to_java_data_type
__all__ = [
'Rowtime',
@@ -219,7 +219,7 @@ class Schema(Descriptor):
if isinstance(field_type, str):
self._j_schema = self._j_schema.field(field_name, field_type)
else:
- self._j_schema = self._j_schema.field(field_name, _to_java_type(field_type))
+ self._j_schema = self._j_schema.field(field_name, _to_java_data_type(field_type))
return self
def fields(self, fields: Dict[str, Union[DataType, str]]) -> 'Schema':
diff --git a/flink-python/pyflink/table/expressions.py b/flink-python/pyflink/table/expressions.py
index d870f825e20..503caf852fa 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -20,8 +20,8 @@ from typing import Union
from pyflink import add_version_doc
from pyflink.java_gateway import get_gateway
from pyflink.table.expression import Expression, _get_java_expression, TimePointUnit, JsonOnNull
-from pyflink.table.types import _to_java_data_type, DataType, _to_java_type
-from pyflink.table.udf import UserDefinedFunctionWrapper, UserDefinedTableFunctionWrapper
+from pyflink.table.types import _to_java_data_type, DataType
+from pyflink.table.udf import UserDefinedFunctionWrapper
from pyflink.util.java_utils import to_jarray, load_java_class
__all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 'or_', 'not_', 'UNBOUNDED_ROW',
@@ -767,21 +767,6 @@ def call(f: Union[str, UserDefinedFunctionWrapper], *args) -> Expression:
return Expression(gateway.jvm.Expressions.call(
f, to_jarray(gateway.jvm.Object, [_get_java_expression(arg) for arg in args])))
- def get_function_definition(f):
- if isinstance(f, UserDefinedTableFunctionWrapper):
- """
- TypeInference was not supported for TableFunction in the old planner. Use
- TableFunctionDefinition to work around this issue.
- """
- j_result_types = to_jarray(gateway.jvm.TypeInformation,
- [_to_java_type(i) for i in f._result_types])
- j_result_type = gateway.jvm.org.apache.flink.api.java.typeutils.RowTypeInfo(
- j_result_types)
- return gateway.jvm.org.apache.flink.table.functions.TableFunctionDefinition(
- 'f', f._java_user_defined_function(), j_result_type)
- else:
- return f._java_user_defined_function()
-
expressions_clz = load_java_class("org.apache.flink.table.api.Expressions")
function_definition_clz = load_java_class('org.apache.flink.table.functions.FunctionDefinition')
j_object_array_type = to_jarray(gateway.jvm.Object, []).getClass()
@@ -794,7 +779,7 @@ def call(f: Union[str, UserDefinedFunctionWrapper], *args) -> Expression:
return Expression(api_call_method.invoke(
None,
to_jarray(gateway.jvm.Object,
- [get_function_definition(f),
+ [f._java_user_defined_function(),
to_jarray(gateway.jvm.Object, [_get_java_expression(arg) for arg in args])])))
diff --git a/flink-python/pyflink/table/sinks.py b/flink-python/pyflink/table/sinks.py
index 26df8a040f9..e6091ab0265 100644
--- a/flink-python/pyflink/table/sinks.py
+++ b/flink-python/pyflink/table/sinks.py
@@ -17,7 +17,7 @@
################################################################################
from pyflink.java_gateway import get_gateway
-from pyflink.table.types import _to_java_type
+from pyflink.table.types import _to_java_data_type
from pyflink.util import java_utils
__all__ = ['TableSink', 'CsvTableSink', 'WriteMode']
@@ -68,11 +68,11 @@ class CsvTableSink(TableSink):
j_write_mode = None
else:
raise Exception('Unsupported write_mode: %s' % write_mode)
- j_csv_table_sink = gateway.jvm.CsvTableSink(
- path, field_delimiter, num_files, j_write_mode)
j_field_names = java_utils.to_jarray(gateway.jvm.String, field_names)
j_field_types = java_utils.to_jarray(
- gateway.jvm.TypeInformation,
- [_to_java_type(field_type) for field_type in field_types])
- j_csv_table_sink = j_csv_table_sink.configure(j_field_names, j_field_types)
+ gateway.jvm.DataType,
+ [_to_java_data_type(field_type) for field_type in field_types])
+ j_csv_table_sink = gateway.jvm.CsvTableSink(
+ path, field_delimiter, num_files, j_write_mode, j_field_names, j_field_types)
+
super(CsvTableSink, self).__init__(j_csv_table_sink)
diff --git a/flink-python/pyflink/table/sources.py b/flink-python/pyflink/table/sources.py
index cbce0be0d60..3e0df45e8ea 100644
--- a/flink-python/pyflink/table/sources.py
+++ b/flink-python/pyflink/table/sources.py
@@ -17,8 +17,7 @@
################################################################################
from pyflink.java_gateway import get_gateway
-from pyflink.table.types import _to_java_type
-
+from pyflink.table.types import _to_java_data_type
__all__ = ['TableSource', 'CsvTableSource']
@@ -82,7 +81,7 @@ class CsvTableSource(TableSource):
builder.path(source_path)
for (field_name, field_type) in zip(field_names, field_types):
- builder.field(field_name, _to_java_type(field_type))
+ builder.field(field_name, _to_java_data_type(field_type))
if field_delim is not None:
builder.fieldDelimiter(field_delim)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 544b796bd0c..f7a49fe28c1 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+import atexit
import os
import sys
import tempfile
@@ -40,7 +41,7 @@ from pyflink.table.statement_set import StatementSet
from pyflink.table.table_config import TableConfig
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.table_result import TableResult
-from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, DataType, \
+from pyflink.table.types import _create_type_verifier, RowType, DataType, \
_infer_schema_from_data, _create_converter, from_arrow_type, RowField, create_arrow_schema, \
_to_java_data_type
from pyflink.table.udf import UserDefinedFunctionWrapper, AggregateFunction, udaf, \
@@ -1426,7 +1427,7 @@ class TableEnvironment(object):
elements = [schema.to_sql_type(element) for element in elements]
return self._from_elements(elements, schema)
- def _from_elements(self, elements: List, schema: Union[DataType, List[str]]) -> Table:
+ def _from_elements(self, elements: List, schema: DataType) -> Table:
"""
Creates a table from a collection of elements.
@@ -1439,22 +1440,15 @@ class TableEnvironment(object):
try:
with temp_file:
serializer.serialize(elements, temp_file)
- row_type_info = _to_java_type(schema)
- execution_config = self._get_j_env().getConfig()
+ j_schema = _to_java_data_type(schema)
gateway = get_gateway()
- j_objs = gateway.jvm.PythonBridgeUtils.readPythonObjects(temp_file.name, True)
PythonTableUtils = gateway.jvm \
.org.apache.flink.table.utils.python.PythonTableUtils
- PythonInputFormatTableSource = gateway.jvm \
- .org.apache.flink.table.utils.python.PythonInputFormatTableSource
- j_input_format = PythonTableUtils.getInputFormat(
- j_objs, row_type_info, execution_config)
- j_table_source = PythonInputFormatTableSource(
- j_input_format, row_type_info)
-
- return Table(self._j_tenv.fromTableSource(j_table_source), self)
+ j_table = PythonTableUtils.createTableFromElement(
+ self._j_tenv, temp_file.name, j_schema, True)
+ return Table(j_table, self)
finally:
- os.unlink(temp_file.name)
+ atexit.register(lambda: os.unlink(temp_file.name))
def from_pandas(self, pdf,
schema: Union[RowType, List[str], Tuple[str], List[DataType],
@@ -1526,8 +1520,7 @@ class TableEnvironment(object):
serializer.serialize(data, temp_file)
jvm = get_gateway().jvm
- data_type = jvm.org.apache.flink.table.types.utils.TypeConversions\
- .fromLegacyInfoToDataType(_to_java_type(result_type)).notNull()
+ data_type = _to_java_data_type(result_type).notNull()
data_type = data_type.bridgedTo(
load_java_class('org.apache.flink.table.data.RowData'))
diff --git a/flink-python/pyflink/table/table_result.py b/flink-python/pyflink/table/table_result.py
index 58666e09e28..6c982676d43 100644
--- a/flink-python/pyflink/table/table_result.py
+++ b/flink-python/pyflink/table/table_result.py
@@ -25,7 +25,7 @@ from pyflink.common.job_client import JobClient
from pyflink.java_gateway import get_gateway
from pyflink.table.result_kind import ResultKind
from pyflink.table.table_schema import TableSchema
-from pyflink.table.types import _from_java_type
+from pyflink.table.types import _from_java_data_type
from pyflink.table.utils import pickled_bytes_to_python_converter
__all__ = ['TableResult', 'CloseableIterator']
@@ -226,7 +226,7 @@ class CloseableIterator(object):
def __init__(self, j_closeable_iterator, field_data_types):
self._j_closeable_iterator = j_closeable_iterator
self._j_field_data_types = field_data_types
- self._data_types = [_from_java_type(j_field_data_type)
+ self._data_types = [_from_java_data_type(j_field_data_type)
for j_field_data_type in self._j_field_data_types]
def __iter__(self):
diff --git a/flink-python/pyflink/table/table_schema.py b/flink-python/pyflink/table/table_schema.py
index 3c4b5eab0e3..fd2dbd941fd 100644
--- a/flink-python/pyflink/table/table_schema.py
+++ b/flink-python/pyflink/table/table_schema.py
@@ -18,7 +18,7 @@
from typing import List, Optional, Union
from pyflink.java_gateway import get_gateway
-from pyflink.table.types import _to_java_type, _from_java_type, DataType, RowType
+from pyflink.table.types import DataType, RowType, _to_java_data_type, _from_java_data_type
from pyflink.util.java_utils import to_jarray
__all__ = ['TableSchema']
@@ -34,9 +34,10 @@ class TableSchema(object):
if j_table_schema is None:
gateway = get_gateway()
j_field_names = to_jarray(gateway.jvm.String, field_names)
- j_data_types = to_jarray(gateway.jvm.TypeInformation,
- [_to_java_type(item) for item in data_types])
- self._j_table_schema = gateway.jvm.TableSchema(j_field_names, j_data_types)
+ j_data_types = to_jarray(gateway.jvm.DataType,
+ [_to_java_data_type(item) for item in data_types])
+ self._j_table_schema = gateway.jvm.TableSchema.builder()\
+ .fields(j_field_names, j_data_types).build()
else:
self._j_table_schema = j_table_schema
@@ -54,7 +55,7 @@ class TableSchema(object):
:return: A list of all field data types.
"""
- return [_from_java_type(item) for item in self._j_table_schema.getFieldDataTypes()]
+ return [_from_java_data_type(item) for item in self._j_table_schema.getFieldDataTypes()]
def get_field_data_type(self, field: Union[int, str]) -> Optional[DataType]:
"""
@@ -67,7 +68,7 @@ class TableSchema(object):
raise TypeError("Expected field index or field name, got %s" % type(field))
optional_result = self._j_table_schema.getFieldDataType(field)
if optional_result.isPresent():
- return _from_java_type(optional_result.get())
+ return _from_java_data_type(optional_result.get())
else:
return None
@@ -107,7 +108,7 @@ class TableSchema(object):
:return: The row data type.
"""
- return _from_java_type(self._j_table_schema.toRowDataType())
+ return _from_java_data_type(self._j_table_schema.toRowDataType())
def __repr__(self):
return self._j_table_schema.toString()
diff --git a/flink-python/pyflink/table/tests/test_calc.py b/flink-python/pyflink/table/tests/test_calc.py
index 4f12a5c0104..4fc7955986a 100644
--- a/flink-python/pyflink/table/tests/test_calc.py
+++ b/flink-python/pyflink/table/tests/test_calc.py
@@ -85,8 +85,30 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
list(map(lambda field_name, field_type: DataTypes.FIELD(field_name, field_type),
field_names,
field_types)))
- table_sink = source_sink_utils.TestAppendSink(field_names, field_types)
- t_env.register_table_sink("Results", table_sink)
+
+ sink_table_ddl = """
+ CREATE TABLE Results(
+ a BIGINT,
+ b DOUBLE,
+ c STRING,
+ d STRING,
+ e DATE,
+ f TIME,
+ g TIMESTAMP(3),
+ h INT,
+ i ARRAY<DOUBLE>,
+ j ARRAY<DOUBLE NOT NULL>,
+ k ARRAY<STRING>,
+ l ARRAY<DATE>,
+ m DECIMAL(38, 18),
+ n ROW<a BIGINT, b DOUBLE>,
+ o MAP<STRING, DOUBLE>,
+ p BYTES,
+ q ARRAY<DOUBLE NOT NULL>)
+ WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
+
t = t_env.from_elements(
[(1, 1.0, "hi", "hello", datetime.date(1970, 1, 2), datetime.time(1, 0, 0),
datetime.datetime(1970, 1, 2, 0, 0),
@@ -98,8 +120,8 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
t.execute_insert("Results").wait()
actual = source_sink_utils.results()
- expected = ['+I[1, 1.0, hi, hello, 1970-01-02, 01:00:00, 1970-01-02 00:00:00.0, '
- '86400000, [1.0, null], [1.0, 2.0], [abc], [1970-01-02], '
+ expected = ['+I[1, 1.0, hi, hello, 1970-01-02, 01:00, 1970-01-02T00:00, '
+ '86400, [1.0, null], [1.0, 2.0], [abc], [1970-01-02], '
'1.000000000000000000, +I[1, 2.0], {key=1.0}, [65, 66, 67, 68], [3.0, 4.0]]']
self.assert_equals(actual, expected)
@@ -113,8 +135,12 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
list(map(lambda field_name, field_type: DataTypes.FIELD(field_name, field_type),
field_names,
field_types)))
- table_sink = source_sink_utils.TestAppendSink(field_names, field_types)
- t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b STRING, c FLOAT)
+ WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
+
t = t_env.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)], schema)
t.execute_insert("Results").wait()
actual = source_sink_utils.results()
diff --git a/flink-python/pyflink/table/tests/test_correlate.py b/flink-python/pyflink/table/tests/test_correlate.py
index 6ac1d020166..f2616924c7c 100644
--- a/flink-python/pyflink/table/tests/test_correlate.py
+++ b/flink-python/pyflink/table/tests/test_correlate.py
@@ -23,8 +23,9 @@ class CorrelateTests(PyFlinkStreamTableTestCase):
def test_join_lateral(self):
t_env = self.t_env
- t_env.create_java_temporary_system_function("split",
- "org.apache.flink.table.legacyutils.TableFunc1")
+ t_env.create_java_temporary_system_function(
+ "split",
+ "org.apache.flink.table.utils.TestingFunctions$TableFunc1")
source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])
result = source.join_lateral(expr.call('split', source.words).alias('word'))
@@ -36,8 +37,9 @@ class CorrelateTests(PyFlinkStreamTableTestCase):
def test_join_lateral_with_join_predicate(self):
t_env = self.t_env
- t_env.create_java_temporary_system_function("split",
- "org.apache.flink.table.legacyutils.TableFunc1")
+ t_env.create_java_temporary_system_function(
+ "split",
+ "org.apache.flink.table.utils.TestingFunctions$TableFunc1")
source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])
result = source.join_lateral(expr.call('split', source.words).alias('word'),
@@ -51,8 +53,9 @@ class CorrelateTests(PyFlinkStreamTableTestCase):
def test_left_outer_join_lateral(self):
t_env = self.t_env
- t_env.create_java_temporary_system_function("split",
- "org.apache.flink.table.legacyutils.TableFunc1")
+ t_env.create_java_temporary_system_function(
+ "split",
+ "org.apache.flink.table.utils.TestingFunctions$TableFunc1")
source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])
result = source.left_outer_join_lateral(expr.call('split', source.words).alias('word'))
@@ -64,8 +67,9 @@ class CorrelateTests(PyFlinkStreamTableTestCase):
def test_left_outer_join_lateral_with_join_predicate(self):
t_env = self.t_env
- t_env.create_java_temporary_system_function("split",
- "org.apache.flink.table.legacyutils.TableFunc1")
+ t_env.create_java_temporary_system_function(
+ "split",
+ "org.apache.flink.table.utils.TestingFunctions$TableFunc1")
source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])
# only support "true" as the join predicate currently
diff --git a/flink-python/pyflink/table/tests/test_dependency.py b/flink-python/pyflink/table/tests/test_dependency.py
index 025eba9e884..0e5ca9491a5 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -56,9 +56,10 @@ class DependencyTests(object):
self.t_env.create_temporary_system_function(
"add_two", udf(plus_two, DataTypes.BIGINT(), DataTypes.BIGINT()))
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
t.select(expr.call("add_two", t.a), t.a).execute_insert("Results").wait()
@@ -82,9 +83,12 @@ class DependencyTests(object):
self.t_env.create_temporary_system_function("add_from_file",
udf(add_from_file, DataTypes.BIGINT(),
DataTypes.BIGINT()))
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
+
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
+
t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
t.select(expr.call('add_from_file', t.a), t.a).execute_insert("Results").wait()
@@ -130,9 +134,11 @@ class StreamDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
self.st_env.create_temporary_system_function(
"check_requirements",
udf(check_requirements, DataTypes.BIGINT(), DataTypes.BIGINT()))
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
- self.st_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+ """
+ self.st_env.execute_sql(sink_table_ddl)
+
t = self.st_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
t.select(expr.call('check_requirements', t.a), t.a).execute_insert("Results").wait()
@@ -176,9 +182,10 @@ class StreamDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
self.st_env.create_temporary_system_function(
"add_one",
udf(add_one, DataTypes.BIGINT(), DataTypes.BIGINT()))
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
- self.st_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+ """
+ self.st_env.execute_sql(sink_table_ddl)
t = self.st_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
t.select(expr.call('add_one', t.a), t.a).execute_insert("Results").wait()
@@ -208,9 +215,10 @@ class StreamDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
udf(check_pyflink_gateway_disabled, DataTypes.BIGINT(),
DataTypes.BIGINT()))
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
- self.st_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+ """
+ self.st_env.execute_sql(sink_table_ddl)
t = self.st_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
t.select(
expr.call('check_python_exec', t.a),
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py
index e63429bad08..9bbdf96c744 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -42,18 +42,18 @@ class RowTimeDescriptorTests(PyFlinkTestCase):
def test_timestamps_from_extractor(self):
rowtime = Rowtime().timestamps_from_extractor(
- "org.apache.flink.table.legacyutils.CustomExtractor")
+ "org.apache.flink.table.utils.TestingDescriptors$CustomExtractor")
properties = rowtime.to_properties()
expected = {
'rowtime.timestamps.type': 'custom',
'rowtime.timestamps.class':
- 'org.apache.flink.table.legacyutils.CustomExtractor',
+ 'org.apache.flink.table.utils.TestingDescriptors$CustomExtractor',
'rowtime.timestamps.serialized':
- 'rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmxlZ2FjeXV0aWxzLkN1c3RvbUV4dHJhY3Rvcl4'
- 'ozwVLIwG6AgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cmluZzt4cgA-b3JnLmFwYWNoZS5mbGluay'
- '50YWJsZS5zb3VyY2VzLnRzZXh0cmFjdG9ycy5UaW1lc3RhbXBFeHRyYWN0b3Jf1Y6piFNsGAIAAHhwd'
- 'AACdHM'}
+ 'rO0ABXNyAD9vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnV0aWxzLlRlc3RpbmdEZXNjcmlwdG9ycyRDdXN0b2'
+ '1FeHRyYWN0b3K-MntVKO8Z7QIAAUwABWZpZWxkdAASTGphdmEvbGFuZy9TdHJpbmc7eHIAPm9yZy5hcGFj'
+ 'aGUuZmxpbmsudGFibGUuc291cmNlcy50c2V4dHJhY3RvcnMuVGltZXN0YW1wRXh0cmFjdG9yX9WOqYhTbB'
+ 'gCAAB4cHQAAnRz'}
self.assertEqual(expected, properties)
def test_watermarks_periodic_ascending(self):
@@ -80,18 +80,18 @@ class RowTimeDescriptorTests(PyFlinkTestCase):
def test_watermarks_from_strategy(self):
rowtime = Rowtime().watermarks_from_strategy(
- "org.apache.flink.table.legacyutils.CustomAssigner")
+ "org.apache.flink.table.utils.TestingDescriptors$CustomAssigner")
properties = rowtime.to_properties()
expected = {
'rowtime.watermarks.type': 'custom',
'rowtime.watermarks.class':
- 'org.apache.flink.table.legacyutils.CustomAssigner',
+ 'org.apache.flink.table.utils.TestingDescriptors$CustomAssigner',
'rowtime.watermarks.serialized':
- 'rO0ABXNyADFvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmxlZ2FjeXV0aWxzLkN1c3RvbUFzc2lnbmVyu_8'
- 'TLNBQBsACAAB4cgBHb3JnLmFwYWNoZS5mbGluay50YWJsZS5zb3VyY2VzLndtc3RyYXRlZ2llcy5QdW'
- '5jdHVhdGVkV2F0ZXJtYXJrQXNzaWduZXKBUc57oaWu9AIAAHhyAD1vcmcuYXBhY2hlLmZsaW5rLnRhY'
- 'mxlLnNvdXJjZXMud21zdHJhdGVnaWVzLldhdGVybWFya1N0cmF0ZWd53nt-g2OWaT4CAAB4cA'}
+ 'rO0ABXNyAD5vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnV0aWxzLlRlc3RpbmdEZXNjcmlwdG9ycyRDdXN0b2'
+ '1Bc3NpZ25lcsY_Xt96bBjDAgAAeHIAR29yZy5hcGFjaGUuZmxpbmsudGFibGUuc291cmNlcy53bXN0cmF0'
+ 'ZWdpZXMuUHVuY3R1YXRlZFdhdGVybWFya0Fzc2lnbmVygVHOe6GlrvQCAAB4cgA9b3JnLmFwYWNoZS5mbG'
+ 'luay50YWJsZS5zb3VyY2VzLndtc3RyYXRlZ2llcy5XYXRlcm1hcmtTdHJhdGVned57foNjlmk-AgAAeHA'}
self.assertEqual(expected, properties)
diff --git a/flink-python/pyflink/table/tests/test_pandas_conversion.py b/flink-python/pyflink/table/tests/test_pandas_conversion.py
index 43c3b5e2d2d..703fb9b512f 100644
--- a/flink-python/pyflink/table/tests/test_pandas_conversion.py
+++ b/flink-python/pyflink/table/tests/test_pandas_conversion.py
@@ -121,17 +121,34 @@ class PandasConversionITTests(PandasConversionTestBase):
self.assertEqual(self.data_type, table.get_schema().to_row_data_type())
table = table.filter(table.f2 < 2)
- table_sink = source_sink_utils.TestAppendSink(
- self.data_type.field_names(),
- self.data_type.field_types())
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(
+ f1 TINYINT,
+ f2 SMALLINT,
+ f3 INT,
+ f4 BIGINT,
+ f5 BOOLEAN,
+ f6 FLOAT,
+ f7 DOUBLE,
+ f8 STRING,
+ f9 BYTES,
+ f10 DECIMAL(38, 18),
+ f11 DATE,
+ f12 TIME,
+ f13 TIMESTAMP(3),
+ f14 ARRAY<STRING>,
+ f15 ROW<a INT, b STRING, c TIMESTAMP(3), d ARRAY<INT>>)
+ WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
+
table.execute_insert("Results").wait()
actual = source_sink_utils.results()
self.assert_equals(actual,
["+I[1, 1, 1, 1, true, 1.1, 1.2, hello, [97, 97, 97], "
"1000000000000000000.010000000000000000, 2014-09-13, 01:00:01, "
- "1970-01-01 00:00:00.123, [hello, 中文], +I[1, hello, "
- "1970-01-01 00:00:00.123, [1, 2]]]"])
+ "1970-01-01T00:00:00.123, [hello, 中文], +I[1, hello, "
+ "1970-01-01T00:00:00.123, [1, 2]]]"])
def test_to_pandas(self):
table = self.t_env.from_pandas(self.pdf, self.data_type)
diff --git a/flink-python/pyflink/table/tests/test_pandas_udaf.py b/flink-python/pyflink/table/tests/test_pandas_udaf.py
index ce65d5715b1..71cd6380f41 100644
--- a/flink-python/pyflink/table/tests/test_pandas_udaf.py
+++ b/flink-python/pyflink/table/tests/test_pandas_udaf.py
@@ -45,13 +45,10 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
DataTypes.FIELD("b", DataTypes.SMALLINT()),
DataTypes.FIELD("c", DataTypes.INT())]))
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c'],
- [DataTypes.TINYINT(), DataTypes.FLOAT(),
- DataTypes.ROW(
- [DataTypes.FIELD("a", DataTypes.INT()),
- DataTypes.FIELD("b", DataTypes.INT())])])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TINYINT, b FLOAT,c ROW<a INT, b INT>) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
# general udf
add = udf(lambda a: a + 1, result_type=DataTypes.INT())
# pandas udf
@@ -78,12 +75,12 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
DataTypes.FIELD("b", DataTypes.SMALLINT()),
DataTypes.FIELD("c", DataTypes.INT())]))
- table_sink = source_sink_utils.TestAppendSink(
- ['a'],
- [DataTypes.INT()])
+ sink_table_ddl = """
+ CREATE TABLE Results(a INT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
min_add = udaf(lambda a, b, c: a.min() + b.min() + c.min(),
result_type=DataTypes.INT(), func_type="pandas")
- self.t_env.register_table_sink("Results", table_sink)
t.select(min_add(t.a, t.b, t.c)) \
.execute_insert("Results") \
.wait()
@@ -98,10 +95,11 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
DataTypes.FIELD("b", DataTypes.SMALLINT()),
DataTypes.FIELD("c", DataTypes.INT())]))
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd'],
- [DataTypes.TINYINT(), DataTypes.INT(), DataTypes.FLOAT(), DataTypes.INT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TINYINT, b INT, c FLOAT, d INT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
+ self.t_env.get_config().get_configuration().set_string('python.metric.enabled', 'true')
self.t_env.get_config().set('python.metric.enabled', 'true')
self.t_env.register_function("max_add", udaf(MaxAdd(),
result_type=DataTypes.INT(),
@@ -133,15 +131,10 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
DataTypes.FIELD("b", DataTypes.SMALLINT()),
DataTypes.FIELD("c", DataTypes.INT()),
DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))
-
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c'],
- [
- DataTypes.TIMESTAMP(3),
- DataTypes.TIMESTAMP(3),
- DataTypes.FLOAT()
- ])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TIMESTAMP(3), b TIMESTAMP(3), c FLOAT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
self.t_env.create_temporary_system_function("mean_udaf", mean_udaf)
tumble_window = Tumble.over(lit(1).hours) \
.on(col("rowtime")) \
@@ -154,8 +147,8 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
actual = source_sink_utils.results()
self.assert_equals(actual,
- ["+I[2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2.2]",
- "+I[2018-03-11 04:00:00.0, 2018-03-11 05:00:00.0, 8.0]"])
+ ["+I[2018-03-11T03:00, 2018-03-11T04:00, 2.2]",
+ "+I[2018-03-11T04:00, 2018-03-11T05:00, 8.0]"])
def test_slide_group_window_aggregate_function(self):
import datetime
@@ -174,17 +167,11 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
DataTypes.FIELD("b", DataTypes.SMALLINT()),
DataTypes.FIELD("c", DataTypes.INT()),
DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))
-
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd', 'e'],
- [
- DataTypes.TINYINT(),
- DataTypes.TIMESTAMP(3),
- DataTypes.TIMESTAMP(3),
- DataTypes.FLOAT(),
- DataTypes.INT()
- ])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(
+ a TINYINT, b TIMESTAMP(3), c TIMESTAMP(3), d FLOAT, e INT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
self.t_env.register_function("max_add", udaf(MaxAdd(),
result_type=DataTypes.INT(),
func_type="pandas"))
@@ -204,15 +191,15 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual,
- ["+I[1, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 2.0, 6]",
- "+I[1, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2.5, 7]",
- "+I[1, 2018-03-11 03:30:00.0, 2018-03-11 04:30:00.0, 5.5, 14]",
- "+I[1, 2018-03-11 04:00:00.0, 2018-03-11 05:00:00.0, 8.0, 14]",
- "+I[2, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 1.0, 4]",
- "+I[2, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2.0, 10]",
- "+I[2, 2018-03-11 03:30:00.0, 2018-03-11 04:30:00.0, 3.0, 10]",
- "+I[3, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2.0, 7]",
- "+I[3, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 2.0, 7]"])
+ ["+I[1, 2018-03-11T02:30, 2018-03-11T03:30, 2.0, 6]",
+ "+I[1, 2018-03-11T03:00, 2018-03-11T04:00, 2.5, 7]",
+ "+I[1, 2018-03-11T03:30, 2018-03-11T04:30, 5.5, 14]",
+ "+I[1, 2018-03-11T04:00, 2018-03-11T05:00, 8.0, 14]",
+ "+I[2, 2018-03-11T02:30, 2018-03-11T03:30, 1.0, 4]",
+ "+I[2, 2018-03-11T03:00, 2018-03-11T04:00, 2.0, 10]",
+ "+I[2, 2018-03-11T03:30, 2018-03-11T04:30, 3.0, 10]",
+ "+I[3, 2018-03-11T03:00, 2018-03-11T04:00, 2.0, 7]",
+ "+I[3, 2018-03-11T02:30, 2018-03-11T03:30, 2.0, 7]"])
def test_over_window_aggregate_function(self):
import datetime
@@ -230,13 +217,12 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
DataTypes.FIELD("b", DataTypes.SMALLINT()),
DataTypes.FIELD("c", DataTypes.INT()),
DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))
-
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'],
- [DataTypes.TINYINT(), DataTypes.FLOAT(), DataTypes.INT(), DataTypes.FLOAT(),
- DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(),
- DataTypes.FLOAT(), DataTypes.FLOAT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(
+ a TINYINT, b FLOAT, c INT, d FLOAT, e FLOAT, f FLOAT, g FLOAT, h FLOAT, i FLOAT,
+ j FLOAT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
self.t_env.create_temporary_system_function("mean_udaf", mean_udaf)
self.t_env.register_function("max_add", udaf(MaxAdd(),
result_type=DataTypes.INT(),
@@ -325,15 +311,11 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
""" % source_path
self.t_env.execute_sql(source_table)
t = self.t_env.from_path("source_table")
-
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd'],
- [
- DataTypes.TINYINT(),
- DataTypes.TIMESTAMP(3),
- DataTypes.TIMESTAMP(3),
- DataTypes.FLOAT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TINYINT, b TIMESTAMP(3), c TIMESTAMP(3), d FLOAT)
+ WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t.window(Slide.over(lit(1).hours)
.every(lit(30).minutes)
.on(col("rowtime"))
@@ -344,15 +326,15 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual,
- ["+I[1, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 2.0]",
- "+I[1, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2.5]",
- "+I[1, 2018-03-11 03:30:00.0, 2018-03-11 04:30:00.0, 5.5]",
- "+I[1, 2018-03-11 04:00:00.0, 2018-03-11 05:00:00.0, 8.0]",
- "+I[2, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 1.0]",
- "+I[2, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2.0]",
- "+I[2, 2018-03-11 03:30:00.0, 2018-03-11 04:30:00.0, 3.0]",
- "+I[3, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2.0]",
- "+I[3, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 2.0]"])
+ ["+I[1, 2018-03-11T02:30, 2018-03-11T03:30, 2.0]",
+ "+I[1, 2018-03-11T03:00, 2018-03-11T04:00, 2.5]",
+ "+I[1, 2018-03-11T03:30, 2018-03-11T04:30, 5.5]",
+ "+I[1, 2018-03-11T04:00, 2018-03-11T05:00, 8.0]",
+ "+I[2, 2018-03-11T02:30, 2018-03-11T03:30, 1.0]",
+ "+I[2, 2018-03-11T03:00, 2018-03-11T04:00, 2.0]",
+ "+I[2, 2018-03-11T03:30, 2018-03-11T04:30, 3.0]",
+ "+I[3, 2018-03-11T03:00, 2018-03-11T04:00, 2.0]",
+ "+I[3, 2018-03-11T02:30, 2018-03-11T03:30, 2.0]"])
os.remove(source_path)
def test_sliding_group_window_over_proctime(self):
@@ -429,13 +411,10 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
""" % source_path
self.t_env.execute_sql(source_table)
t = self.t_env.from_path("source_table")
-
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'd'],
- [
- DataTypes.TINYINT(),
- DataTypes.FLOAT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TINYINT, d FLOAT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t.window(Slide.over(row_interval(2))
.every(row_interval(1))
.on(t.protime)
@@ -489,15 +468,12 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
self.t_env.execute_sql(source_table)
t = self.t_env.from_path("source_table")
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd', 'e'],
- [
- DataTypes.TINYINT(),
- DataTypes.TIMESTAMP(3),
- DataTypes.TIMESTAMP(3),
- DataTypes.TIMESTAMP(3),
- DataTypes.FLOAT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(
+ a TINYINT, b TIMESTAMP(3), c TIMESTAMP(3), d TIMESTAMP(3), e FLOAT)
+ WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t.window(Tumble.over(lit(1).hours).on(t.rowtime).alias("w")) \
.group_by(t.a, t.b, col("w")) \
.select(t.a,
@@ -509,10 +485,10 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual, [
- "+I[1, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2018-03-11 03:59:59.999, 2.5]",
- "+I[1, 2018-03-11 04:00:00.0, 2018-03-11 05:00:00.0, 2018-03-11 04:59:59.999, 8.0]",
- "+I[2, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2018-03-11 03:59:59.999, 2.0]",
- "+I[3, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2018-03-11 03:59:59.999, 2.0]",
+ "+I[1, 2018-03-11T03:00, 2018-03-11T04:00, 2018-03-11T03:59:59.999, 2.5]",
+ "+I[1, 2018-03-11T04:00, 2018-03-11T05:00, 2018-03-11T04:59:59.999, 8.0]",
+ "+I[2, 2018-03-11T03:00, 2018-03-11T04:00, 2018-03-11T03:59:59.999, 2.0]",
+ "+I[3, 2018-03-11T03:00, 2018-03-11T04:00, 2018-03-11T03:59:59.999, 2.0]",
])
os.remove(source_path)
@@ -559,12 +535,10 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
self.t_env.execute_sql(source_table)
t = self.t_env.from_path("source_table")
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'd'],
- [
- DataTypes.TINYINT(),
- DataTypes.FLOAT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TINYINT, d FLOAT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t.window(Tumble.over(row_interval(2)).on(t.protime).alias("w")) \
.group_by(t.a, t.b, col("w")) \
.select(t.a, mean_udaf(t.c).alias("b")) \
@@ -613,13 +587,10 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
)
""" % source_path
self.t_env.execute_sql(source_table)
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c'],
- [
- DataTypes.TINYINT(),
- DataTypes.FLOAT(),
- DataTypes.SMALLINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TINYINT, b FLOAT, c SMALLINT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
self.t_env.execute_sql("""
insert into Results
select a,
@@ -681,13 +652,10 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
)
""" % source_path
self.t_env.execute_sql(source_table)
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c'],
- [
- DataTypes.TINYINT(),
- DataTypes.FLOAT(),
- DataTypes.SMALLINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TINYINT, b FLOAT, c SMALLINT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
self.t_env.execute_sql("""
insert into Results
select a,
@@ -749,13 +717,10 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
)
""" % source_path
self.t_env.execute_sql(source_table)
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c'],
- [
- DataTypes.TINYINT(),
- DataTypes.FLOAT(),
- DataTypes.SMALLINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TINYINT, b FLOAT, c SMALLINT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
self.t_env.execute_sql("""
insert into Results
select a,
diff --git a/flink-python/pyflink/table/tests/test_pandas_udf.py b/flink-python/pyflink/table/tests/test_pandas_udf.py
index 415b23d4690..71a3f1cb782 100644
--- a/flink-python/pyflink/table/tests/test_pandas_udf.py
+++ b/flink-python/pyflink/table/tests/test_pandas_udf.py
@@ -45,10 +45,10 @@ class PandasUDFITTests(object):
# general Python UDF
subtract_one = udf(SubtractOne(), DataTypes.BIGINT(), DataTypes.BIGINT())
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd'],
- [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b BIGINT, c BIGINT, d BIGINT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
t.where(add_one(t.b) <= 3) \
@@ -208,17 +208,14 @@ class PandasUDFITTests(object):
'row_param.f4 of wrong type %s !' % type(row_param.f4[0])
return row_param
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q',
- 'r', 's', 't', 'u'],
- [DataTypes.TINYINT(), DataTypes.SMALLINT(), DataTypes.INT(), DataTypes.BIGINT(),
- DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), DataTypes.FLOAT(), DataTypes.DOUBLE(),
- DataTypes.STRING(), DataTypes.STRING(), DataTypes.BYTES(), DataTypes.DECIMAL(38, 18),
- DataTypes.DECIMAL(38, 18), DataTypes.DATE(), DataTypes.TIME(), DataTypes.TIMESTAMP(3),
- DataTypes.ARRAY(DataTypes.STRING()), DataTypes.ARRAY(DataTypes.TIMESTAMP(3)),
- DataTypes.ARRAY(DataTypes.INT()),
- DataTypes.ARRAY(DataTypes.STRING()), row_type])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(
+ a TINYINT, b SMALLINT, c INT, d BIGINT, e BOOLEAN, f BOOLEAN, g FLOAT, h DOUBLE, i STRING,
+ j StRING, k BYTES, l DECIMAL(38, 18), m DECIMAL(38, 18), n DATE, o TIME, p TIMESTAMP(3),
+ q ARRAY<STRING>, r ARRAY<TIMESTAMP(3)>, s ARRAY<INT>, t ARRAY<STRING>,
+ u ROW<f1 INT, f2 STRING, f3 TIMESTAMP(3), f4 ARRAY<INT>>) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t = self.t_env.from_elements(
[(1, 32767, -2147483648, 1, True, False, 1.0, 1.0, 'hello', '中文',
@@ -279,8 +276,8 @@ class PandasUDFITTests(object):
["+I[1, 32767, -2147483648, 1, true, false, 1.0, 1.0, hello, 中文, "
"[102, 108, 105, 110, 107], 1000000000000000000.050000000000000000, "
"1000000000000000000.059999999999999999, 2014-09-13, 01:00:01, "
- "1970-01-02 00:00:00.123, [hello, 中文, null], [1970-01-02 00:00:00.123], "
- "[1, 2], [hello, 中文, null], +I[1, hello, 1970-01-02 00:00:00.123, [1, 2]]]"])
+ "1970-01-02T00:00:00.123, [hello, 中文, null], [1970-01-02T00:00:00.123], "
+ "[1, 2], [hello, 中文, null], +I[1, hello, 1970-01-02T00:00:00.123, [1, 2]]]"])
def test_invalid_pandas_udf(self):
@@ -324,9 +321,10 @@ class PandasUDFITTests(object):
(local_zoned_timestamp_param[0], local_datetime)
return local_zoned_timestamp_param
- table_sink = source_sink_utils.TestAppendSink(
- ['a'], [DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TIMESTAMP_LTZ(3)) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t = self.t_env.from_elements(
[(local_datetime,)],
diff --git a/flink-python/pyflink/table/tests/test_row_based_operation.py b/flink-python/pyflink/table/tests/test_row_based_operation.py
index 1c8734b1df8..8c99c381a99 100644
--- a/flink-python/pyflink/table/tests/test_row_based_operation.py
+++ b/flink-python/pyflink/table/tests/test_row_based_operation.py
@@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+import unittest
+
from pandas.util.testing import assert_frame_equal
from pyflink.common import Row
@@ -35,10 +37,10 @@ class RowBasedOperationTests(object):
DataTypes.FIELD("b", DataTypes.SMALLINT()),
DataTypes.FIELD("c", DataTypes.INT())]))
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b'],
- [DataTypes.BIGINT(), DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
func = udf(lambda x: Row(a=x + 1, b=x * x), result_type=DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.BIGINT()),
@@ -66,10 +68,10 @@ class RowBasedOperationTests(object):
DataTypes.ROW([DataTypes.FIELD("c", DataTypes.INT()),
DataTypes.FIELD("d", DataTypes.INT())]))]))
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b'],
- [DataTypes.BIGINT(), DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
def func(x):
import pandas as pd
@@ -113,11 +115,11 @@ class RowBasedOperationTests(object):
[DataTypes.FIELD("a", DataTypes.TINYINT()),
DataTypes.FIELD("b", DataTypes.STRING())]))
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd', 'e', 'f'],
- [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.BIGINT(),
- DataTypes.STRING(), DataTypes.BIGINT(), DataTypes.STRING()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b STRING, c BIGINT, d STRING, e BIGINT, f STRING)
+ WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
def split(x):
@@ -146,10 +148,10 @@ class BatchRowBasedOperationITTests(RowBasedOperationTests, PyFlinkBatchTableTes
DataTypes.FIELD("b", DataTypes.SMALLINT()),
DataTypes.FIELD("c", DataTypes.INT())]))
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c'],
- [DataTypes.TINYINT(), DataTypes.FLOAT(), DataTypes.INT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TINYINT, b FLOAT, c INT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.a.max()),
result_type=DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.FLOAT()),
@@ -172,10 +174,10 @@ class BatchRowBasedOperationITTests(RowBasedOperationTests, PyFlinkBatchTableTes
DataTypes.FIELD("b", DataTypes.SMALLINT()),
DataTypes.FIELD("c", DataTypes.INT())]))
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b'],
- [DataTypes.FLOAT(), DataTypes.INT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a FLOAT, b INT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
pandas_udaf = udaf(lambda pd: Row(pd.b.mean(), pd.b.max()),
result_type=DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.FLOAT()),
@@ -189,6 +191,7 @@ class BatchRowBasedOperationITTests(RowBasedOperationTests, PyFlinkBatchTableTes
actual = source_sink_utils.results()
self.assert_equals(actual, ["+I[3.8, 8]"])
+ @unittest.skip("Not supported yet")
def test_window_aggregate_with_pandas_udaf(self):
import datetime
from pyflink.table.window import Tumble
@@ -207,14 +210,11 @@ class BatchRowBasedOperationITTests(RowBasedOperationTests, PyFlinkBatchTableTes
DataTypes.FIELD("c", DataTypes.INT()),
DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c'],
- [
- DataTypes.TIMESTAMP(3),
- DataTypes.FLOAT(),
- DataTypes.INT()
- ])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TIMESTAMP(3), b FLOAT, c INT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
+ print(t.get_schema())
pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
result_type=DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.FLOAT()),
diff --git a/flink-python/pyflink/table/tests/test_sql.py b/flink-python/pyflink/table/tests/test_sql.py
index 6158321efdb..5b51f62ee98 100644
--- a/flink-python/pyflink/table/tests/test_sql.py
+++ b/flink-python/pyflink/table/tests/test_sql.py
@@ -21,7 +21,7 @@ import subprocess
from pyflink.find_flink_home import _find_flink_source_root
from pyflink.java_gateway import get_gateway
-from pyflink.table import DataTypes, ResultKind
+from pyflink.table import ResultKind
from pyflink.table import expressions as expr
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, \
@@ -42,11 +42,10 @@ class StreamSqlTests(PyFlinkStreamTableTestCase):
def test_sql_query(self):
t_env = self.t_env
source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
- t_env.register_table_sink(
- "sinks",
- source_sink_utils.TestAppendSink(field_names, field_types))
+ sink_table_ddl = """
+ CREATE TABLE sinks(a BIGINT, b STRING, c STRING) WITH ('connector'='test-sink')
+ """
+ t_env.execute_sql(sink_table_ddl)
result = t_env.sql_query("select a + 1, b, c from %s" % source)
result.execute_insert("sinks").wait()
@@ -77,11 +76,10 @@ class StreamSqlTests(PyFlinkStreamTableTestCase):
self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
table_result.print()
- field_names = ["k1", "k2", "c"]
- field_types = [DataTypes.BIGINT(), DataTypes.INT(), DataTypes.STRING()]
- t_env.register_table_sink(
- "sinks",
- source_sink_utils.TestAppendSink(field_names, field_types))
+ sink_table_ddl = """
+ CREATE TABLE sinks(k1 BIGINT, k2 INT, c STRING) WITH ('connector'='test-sink')
+ """
+ t_env.execute_sql(sink_table_ddl)
table_result = t_env.execute_sql("insert into sinks select * from tbl")
from pyflink.common.job_status import JobStatus
from py4j.protocol import Py4JJavaError
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index a3732ced477..4b3f47daadd 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -89,10 +89,11 @@ class TableEnvironmentTest(object):
"python_scalar_func", udf(lambda i: i, result_type=DataTypes.INT()))
t_env.register_java_function("scalar_func",
- "org.apache.flink.table.legacyutils.RichFunc0")
+ "org.apache.flink.table.utils.TestingFunctions$RichFunc0")
t_env.register_java_function(
- "agg_func", "org.apache.flink.table.legacyutils.ByteMaxAggFunction")
- t_env.register_java_function("table_func", "org.apache.flink.table.legacyutils.TableFunc1")
+ "agg_func", "org.apache.flink.table.utils.TestingFunctions$ByteMaxAggFunction")
+ t_env.register_java_function("table_func",
+ "org.apache.flink.table.utils.TestingFunctions$TableFunc1")
actual = t_env.list_user_defined_functions()
expected = ['python_scalar_func', 'scalar_func', 'agg_func', 'table_func']
@@ -157,11 +158,11 @@ class TableEnvironmentTest(object):
t_env = self.t_env
t_env.create_java_temporary_system_function(
- "scalar_func", "org.apache.flink.table.legacyutils.RichFunc0")
+ "scalar_func", "org.apache.flink.table.utils.TestingFunctions$RichFunc0")
t_env.create_java_function(
- "agg_func", "org.apache.flink.table.legacyutils.ByteMaxAggFunction")
+ "agg_func", "org.apache.flink.table.utils.TestingFunctions$ByteMaxAggFunction")
t_env.create_java_temporary_function(
- "table_func", "org.apache.flink.table.legacyutils.TableFunc1")
+ "table_func", "org.apache.flink.table.utils.TestingFunctions$TableFunc1")
self.assert_equals(t_env.list_user_defined_functions(),
['scalar_func', 'agg_func', 'table_func'])
@@ -280,10 +281,14 @@ class DataStreamConversionTestCases(PyFlinkTestCase):
Types.STRING()]))
t_env = self.t_env
table = t_env.from_data_stream(ds)
- field_names = ['a', 'b', 'c']
- field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
- t_env.register_table_sink("Sink",
- source_sink_utils.TestAppendSink(field_names, field_types))
+ sink_table_ddl = """
+ CREATE TABLE Sink(a INT, b STRING, c STRING) WITH ('connector'='test-sink')
+ """
+ t_env.execute_sql(sink_table_ddl)
+ expr_sink_ddl = """
+ CREATE TABLE ExprSink(a INT, b STRING, c STRING) WITH ('connector'='test-sink')
+ """
+ t_env.execute_sql(expr_sink_ddl)
table.execute_insert("Sink").wait()
result = source_sink_utils.results()
expected = ['+I[1, Hi, Hello]', '+I[2, Hello, Hi]']
@@ -292,8 +297,6 @@ class DataStreamConversionTestCases(PyFlinkTestCase):
ds = ds.map(lambda x: x, Types.ROW([Types.INT(), Types.STRING(), Types.STRING()])) \
.map(lambda x: x, Types.ROW([Types.INT(), Types.STRING(), Types.STRING()]))
table = t_env.from_data_stream(ds, col('a'), col('b'), col('c'))
- t_env.register_table_sink("ExprSink",
- source_sink_utils.TestAppendSink(field_names, field_types))
table.execute_insert("ExprSink").wait()
result = source_sink_utils.results()
self.assert_equals(result, expected)
@@ -631,13 +634,13 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
t_env = self.t_env
t_env.register_java_function(
- "scalar_func", "org.apache.flink.table.legacyutils.RichFunc0")
+ "scalar_func", "org.apache.flink.table.utils.TestingFunctions$RichFunc0")
t_env.register_java_function(
- "agg_func", "org.apache.flink.table.legacyutils.ByteMaxAggFunction")
+ "agg_func", "org.apache.flink.table.utils.TestingFunctions$ByteMaxAggFunction")
t_env.register_java_function(
- "table_func", "org.apache.flink.table.legacyutils.TableFunc1")
+ "table_func", "org.apache.flink.table.utils.TestingFunctions$TableFunc1")
actual = t_env.list_user_defined_functions()
expected = ['scalar_func', 'agg_func', 'table_func']
@@ -709,11 +712,11 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
t_env = self.t_env
t_env.create_java_temporary_system_function(
- "scalar_func", "org.apache.flink.table.legacyutils.RichFunc0")
+ "scalar_func", "org.apache.flink.table.utils.TestingFunctions$RichFunc0")
t_env.create_java_function(
- "agg_func", "org.apache.flink.table.legacyutils.ByteMaxAggFunction")
+ "agg_func", "org.apache.flink.table.utils.TestingFunctions$ByteMaxAggFunction")
t_env.create_java_temporary_function(
- "table_func", "org.apache.flink.table.legacyutils.TableFunc1")
+ "table_func", "org.apache.flink.table.utils.TestingFunctions$TableFunc1")
self.assert_equals(t_env.list_user_defined_functions(),
['scalar_func', 'agg_func', 'table_func'])
diff --git a/flink-python/pyflink/table/tests/test_types.py b/flink-python/pyflink/table/tests/test_types.py
index 7520fd68a7a..d3bd37c6ffd 100644
--- a/flink-python/pyflink/table/tests/test_types.py
+++ b/flink-python/pyflink/table/tests/test_types.py
@@ -34,8 +34,8 @@ from pyflink.table.types import (_infer_schema_from_data, _infer_type,
_array_type_mappings, _merge_type,
_create_type_verifier, UserDefinedType, DataTypes, Row, RowField,
RowType, ArrayType, BigIntType, VarCharType, MapType, DataType,
- _to_java_type, _from_java_type, ZonedTimestampType,
- LocalZonedTimestampType)
+ _from_java_data_type, ZonedTimestampType,
+ LocalZonedTimestampType, _to_java_data_type)
from pyflink.testing.test_case_utils import PyFlinkTestCase
@@ -816,9 +816,9 @@ class DataTypeConvertTests(PyFlinkTestCase):
DataTypes.TIME(),
DataTypes.TIMESTAMP(3)]
- java_types = [_to_java_type(item) for item in test_types]
+ java_types = [_to_java_data_type(item) for item in test_types]
- converted_python_types = [_from_java_type(item) for item in java_types]
+ converted_python_types = [_from_java_data_type(item) for item in java_types]
self.assertEqual(test_types, converted_python_types)
@@ -833,7 +833,7 @@ class DataTypeConvertTests(PyFlinkTestCase):
JDataTypes.CHAR(50).notNull(),
JDataTypes.DECIMAL(20, 10).notNull()]
- converted_python_types = [_from_java_type(item) for item in java_types]
+ converted_python_types = [_from_java_data_type(item) for item in java_types]
expected = [DataTypes.TIME(3, False),
DataTypes.TIMESTAMP(3).not_null(),
@@ -844,22 +844,6 @@ class DataTypeConvertTests(PyFlinkTestCase):
DataTypes.DECIMAL(20, 10, False)]
self.assertEqual(converted_python_types, expected)
- # Legacy type tests
- Types = gateway.jvm.org.apache.flink.table.api.Types
- InternalBigDecimalTypeInfo = \
- gateway.jvm.org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo
-
- java_types = [Types.STRING(),
- Types.DECIMAL(),
- InternalBigDecimalTypeInfo(12, 5)]
-
- converted_python_types = [_from_java_type(item) for item in java_types]
-
- expected = [DataTypes.VARCHAR(2147483647),
- DataTypes.DECIMAL(38, 18),
- DataTypes.DECIMAL(12, 5)]
- self.assertEqual(converted_python_types, expected)
-
def test_array_type(self):
# nullable/not_null flag will be lost during the conversion.
test_types = [DataTypes.ARRAY(DataTypes.BIGINT()),
@@ -868,9 +852,9 @@ class DataTypeConvertTests(PyFlinkTestCase):
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.BIGINT())),
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING()))]
- java_types = [_to_java_type(item) for item in test_types]
+ java_types = [_to_java_data_type(item) for item in test_types]
- converted_python_types = [_from_java_type(item) for item in java_types]
+ converted_python_types = [_from_java_data_type(item) for item in java_types]
self.assertEqual(test_types, converted_python_types)
@@ -880,9 +864,9 @@ class DataTypeConvertTests(PyFlinkTestCase):
DataTypes.MULTISET(DataTypes.MULTISET(DataTypes.BIGINT())),
DataTypes.MULTISET(DataTypes.MULTISET(DataTypes.STRING()))]
- java_types = [_to_java_type(item) for item in test_types]
+ java_types = [_to_java_data_type(item) for item in test_types]
- converted_python_types = [_from_java_type(item) for item in java_types]
+ converted_python_types = [_from_java_data_type(item) for item in java_types]
self.assertEqual(test_types, converted_python_types)
@@ -894,9 +878,9 @@ class DataTypeConvertTests(PyFlinkTestCase):
DataTypes.MAP(DataTypes.STRING(),
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))]
- java_types = [_to_java_type(item) for item in test_types]
+ java_types = [_to_java_data_type(item) for item in test_types]
- converted_python_types = [_from_java_type(item) for item in java_types]
+ converted_python_types = [_from_java_data_type(item) for item in java_types]
self.assertEqual(test_types, converted_python_types)
@@ -907,9 +891,9 @@ class DataTypeConvertTests(PyFlinkTestCase):
[DataTypes.FIELD("c",
DataTypes.STRING())]))])]
- java_types = [_to_java_type(item) for item in test_types]
+ java_types = [_to_java_data_type(item) for item in test_types]
- converted_python_types = [_from_java_type(item) for item in java_types]
+ converted_python_types = [_from_java_data_type(item) for item in java_types]
self.assertEqual(test_types, converted_python_types)
@@ -917,9 +901,19 @@ class DataTypeConvertTests(PyFlinkTestCase):
test_types = [DataTypes.LIST_VIEW(DataTypes.BIGINT()),
DataTypes.LIST_VIEW(DataTypes.STRING())]
- java_types = [_to_java_type(item) for item in test_types]
+ java_types = [_to_java_data_type(item) for item in test_types]
+
+ converted_python_types = [_from_java_data_type(item) for item in java_types]
+
+ self.assertEqual(test_types, converted_python_types)
+
+ def test_map_view_type(self):
+ test_types = [DataTypes.MAP_VIEW(DataTypes.STRING(), DataTypes.BIGINT()),
+ DataTypes.MAP_VIEW(DataTypes.INT(), DataTypes.STRING())]
+
+ java_types = [_to_java_data_type(item) for item in test_types]
- converted_python_types = [_from_java_type(item) for item in java_types]
+ converted_python_types = [_from_java_data_type(item) for item in java_types]
self.assertEqual(test_types, converted_python_types)
diff --git a/flink-python/pyflink/table/tests/test_udaf.py b/flink-python/pyflink/table/tests/test_udaf.py
index 59615092f1e..e6f96527ff7 100644
--- a/flink-python/pyflink/table/tests/test_udaf.py
+++ b/flink-python/pyflink/table/tests/test_udaf.py
@@ -570,15 +570,11 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
t = self.t_env.from_path("source_table")
from pyflink.testing import source_sink_utils
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd', 'e'],
- [
- DataTypes.TINYINT(),
- DataTypes.TIMESTAMP(3),
- DataTypes.TIMESTAMP(3),
- DataTypes.BIGINT(),
- DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TINYINT, b TIMESTAMP(3), c TIMESTAMP(3), d BIGINT, e BIGINT)
+ WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t.window(Tumble.over(lit(1).hours).on(t.rowtime).alias("w")) \
.group_by(t.a, col("w")) \
.select(t.a,
@@ -590,10 +586,10 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual,
- ["+I[2, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2, 1]",
- "+I[3, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 1, 1]",
- "+I[1, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2, 2]",
- "+I[1, 2018-03-11 04:00:00.0, 2018-03-11 05:00:00.0, 1, 1]"])
+ ["+I[2, 2018-03-11T03:00, 2018-03-11T04:00, 2, 1]",
+ "+I[3, 2018-03-11T03:00, 2018-03-11T04:00, 1, 1]",
+ "+I[1, 2018-03-11T03:00, 2018-03-11T04:00, 2, 2]",
+ "+I[1, 2018-03-11T04:00, 2018-03-11T05:00, 1, 1]"])
def test_tumbling_group_window_over_count(self):
self.t_env.get_config().set("parallelism.default", "1")
@@ -633,12 +629,11 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
t = self.t_env.from_path("source_table")
from pyflink.testing import source_sink_utils
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'd'],
- [
- DataTypes.TINYINT(),
- DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TINYINT, d BIGINT)
+ WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t.window(Tumble.over(row_interval(2)).on(t.protime).alias("w")) \
.group_by(t.a, col("w")) \
.select(t.a, call("my_sum", t.c).alias("b")) \
@@ -684,14 +679,11 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
t = self.t_env.from_path("source_table")
from pyflink.testing import source_sink_utils
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd'],
- [
- DataTypes.TINYINT(),
- DataTypes.TIMESTAMP(3),
- DataTypes.TIMESTAMP(3),
- DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TINYINT, b TIMESTAMP(3), c TIMESTAMP(3), d BIGINT)
+ WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t.window(Slide.over(lit(1).hours)
.every(lit(30).minutes)
.on(t.rowtime)
@@ -702,15 +694,15 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual,
- ["+I[1, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 2]",
- "+I[2, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 1]",
- "+I[3, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 2]",
- "+I[1, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 5]",
- "+I[3, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2]",
- "+I[2, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2]",
- "+I[2, 2018-03-11 03:30:00.0, 2018-03-11 04:30:00.0, 1]",
- "+I[1, 2018-03-11 03:30:00.0, 2018-03-11 04:30:00.0, 11]",
- "+I[1, 2018-03-11 04:00:00.0, 2018-03-11 05:00:00.0, 8]"])
+ ["+I[1, 2018-03-11T02:30, 2018-03-11T03:30, 2]",
+ "+I[2, 2018-03-11T02:30, 2018-03-11T03:30, 1]",
+ "+I[3, 2018-03-11T02:30, 2018-03-11T03:30, 2]",
+ "+I[1, 2018-03-11T03:00, 2018-03-11T04:00, 5]",
+ "+I[3, 2018-03-11T03:00, 2018-03-11T04:00, 2]",
+ "+I[2, 2018-03-11T03:00, 2018-03-11T04:00, 2]",
+ "+I[2, 2018-03-11T03:30, 2018-03-11T04:30, 1]",
+ "+I[1, 2018-03-11T03:30, 2018-03-11T04:30, 11]",
+ "+I[1, 2018-03-11T04:00, 2018-03-11T05:00, 8]"])
def test_sliding_group_window_over_count(self):
self.t_env.get_config().set("parallelism.default", "1")
@@ -750,12 +742,10 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
t = self.t_env.from_path("source_table")
from pyflink.testing import source_sink_utils
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'd'],
- [
- DataTypes.TINYINT(),
- DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TINYINT, d BIGINT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t.window(Slide.over(row_interval(2)).every(row_interval(1)).on(t.protime).alias("w")) \
.group_by(t.a, col("w")) \
.select(t.a, call("my_sum", t.c).alias("b")) \
@@ -801,14 +791,11 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
t = self.t_env.from_path("source_table")
from pyflink.testing import source_sink_utils
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd'],
- [
- DataTypes.TINYINT(),
- DataTypes.TIMESTAMP(3),
- DataTypes.TIMESTAMP(3),
- DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TINYINT, b TIMESTAMP(3), c TIMESTAMP(3), d BIGINT)
+ WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t.window(Session.with_gap(lit(30).minutes).on(t.rowtime).alias("w")) \
.group_by(t.a, t.b, col("w")) \
.select(t.a, col("w").start, col("w").end, call("my_count", t.c).alias("c")) \
@@ -816,10 +803,10 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual,
- ["+I[3, 2018-03-11 03:10:00.0, 2018-03-11 03:40:00.0, 1]",
- "+I[2, 2018-03-11 03:10:00.0, 2018-03-11 04:00:00.0, 2]",
- "+I[1, 2018-03-11 03:10:00.0, 2018-03-11 04:10:00.0, 2]",
- "+I[1, 2018-03-11 04:20:00.0, 2018-03-11 04:50:00.0, 1]"])
+ ["+I[3, 2018-03-11T03:10, 2018-03-11T03:40, 1]",
+ "+I[2, 2018-03-11T03:10, 2018-03-11T04:00, 2]",
+ "+I[1, 2018-03-11T03:10, 2018-03-11T04:10, 2]",
+ "+I[1, 2018-03-11T04:20, 2018-03-11T04:50, 1]"])
@unittest.skip("Python UDFs are currently unsupported in JSON plan")
def test_execute_group_aggregate_from_json_plan(self):
diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py
index 91e418e9083..c53f8cbe08c 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -60,11 +60,11 @@ class UserDefinedFunctionTests(object):
assert os.environ['_PYTHON_WORKER_MEMORY_LIMIT'] is not None
return 1
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd', 'e', 'f', 'g'],
- [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(),
- DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b BIGINT, c BIGINT, d BIGINT, e BIGINT, f BIGINT,
+ g BIGINT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
execution_mode = self.t_env.get_config().get("python.execution-mode", "process")
@@ -80,10 +80,10 @@ class UserDefinedFunctionTests(object):
add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c'],
- [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.INT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b BIGINT, c INT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t = self.t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1)) \
@@ -97,11 +97,11 @@ class UserDefinedFunctionTests(object):
f = udf(lambda i: i, result_type=DataTypes.BIGINT())
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd'],
- [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.BIGINT(), DataTypes.STRING()])
- self.t_env.register_table_sink("Results", table_sink)
-
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b STRING, c BIGINT, d StRING)
+ WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t1.join(t2).where(f(t1.a) == t2.c).execute_insert("Results").wait()
actual = source_sink_utils.results()
self.assert_equals(actual, ["+I[2, Hi, 2, Flink]"])
@@ -112,10 +112,10 @@ class UserDefinedFunctionTests(object):
f = udf(lambda i: i, result_type=DataTypes.BIGINT())
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd'],
- [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.BIGINT(), DataTypes.STRING()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b STRING, c BIGINT, d STRING) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t1.join(t2).where(f(t1.a) == f(t2.c)).execute_insert("Results").wait()
actual = source_sink_utils.results()
@@ -180,9 +180,10 @@ class UserDefinedFunctionTests(object):
"udf_with_all_constant_params", udf(lambda i, j: i + j,
result_type=DataTypes.BIGINT()))
- table_sink = source_sink_utils.TestAppendSink(['a', 'b'],
- [DataTypes.BIGINT(), DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
self.t_env.register_table("test_table", t)
@@ -212,8 +213,10 @@ class UserDefinedFunctionTests(object):
"plus", udf(lambda i, j: i + j - 1,
result_type=DataTypes.BIGINT()))
- table_sink = source_sink_utils.TestAppendSink(['a'], [DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
t.select(t.a + t.b).execute_insert("Results").wait()
@@ -229,9 +232,10 @@ class UserDefinedFunctionTests(object):
else:
subtract = udf(Subtract(), result_type=DataTypes.BIGINT())
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t = self.t_env.from_elements([(1, 2), (2, 5), (3, 4)], ['a', 'b'])
t.select(t.a, subtract(t.b)).execute_insert("Results").wait()
@@ -242,9 +246,10 @@ class UserDefinedFunctionTests(object):
one = udf(lambda: 1, result_type=DataTypes.BIGINT(), deterministic=True)
two = udf(lambda: 2, result_type=DataTypes.BIGINT(), deterministic=False)
- table_sink = source_sink_utils.TestAppendSink(['a', 'b'],
- [DataTypes.BIGINT(), DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
t.select(one(), two()).execute_insert("Results").wait()
@@ -360,16 +365,13 @@ class UserDefinedFunctionTests(object):
'decimal_param is wrong value %s !' % decimal_param
return decimal_param
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q'],
- [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.TINYINT(),
- DataTypes.BOOLEAN(), DataTypes.SMALLINT(), DataTypes.INT(),
- DataTypes.FLOAT(), DataTypes.DOUBLE(), DataTypes.BYTES(),
- DataTypes.STRING(), DataTypes.DATE(), DataTypes.TIME(),
- DataTypes.TIMESTAMP(3), DataTypes.ARRAY(DataTypes.BIGINT()),
- DataTypes.MAP(DataTypes.BIGINT(), DataTypes.STRING()),
- DataTypes.DECIMAL(38, 18), DataTypes.DECIMAL(38, 18)])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(
+ a BIGINT, b BIGINT, c TINYINT, d BOOLEAN, e SMALLINT, f INT, g FLOAT, h DOUBLE, i BYTES,
+ j STRING, k DATE, l TIME, m TIMESTAMP(3), n ARRAY<BIGINT>, o MAP<BIGINT, STRING>,
+ p DECIMAL(38, 18), q DECIMAL(38, 18)) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
import datetime
import decimal
@@ -422,8 +424,8 @@ class UserDefinedFunctionTests(object):
# Currently the sink result precision of DataTypes.TIME(precision) only supports 0.
self.assert_equals(actual,
["+I[1, null, 1, true, 32767, -2147483648, 1.23, 1.98932, "
- "[102, 108, 105, 110, 107], pyflink, 2014-09-13, "
- "12:00:00, 2018-03-11 03:00:00.123, [1, 2, 3], "
+ "[102, 108, 105, 110, 107], pyflink, 2014-09-13, 12:00:00.123, "
+ "2018-03-11T03:00:00.123, [1, 2, 3], "
"{1=flink, 2=pyflink}, 1000000000000000000.050000000000000000, "
"1000000000000000000.059999999999999999]"])
@@ -571,16 +573,14 @@ class UserDefinedFunctionTests(object):
self.t_env.register_function(
"decimal_cut_func", udf(decimal_cut_func, result_type=DataTypes.DECIMAL(38, 18)))
- table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q'],
- [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.TINYINT(),
- DataTypes.BOOLEAN(), DataTypes.SMALLINT(), DataTypes.INT(),
- DataTypes.FLOAT(), DataTypes.DOUBLE(), DataTypes.BYTES(),
- DataTypes.STRING(), DataTypes.DATE(), DataTypes.TIME(),
- DataTypes.TIMESTAMP(3), DataTypes.ARRAY(DataTypes.BIGINT()),
- DataTypes.MAP(DataTypes.BIGINT(), DataTypes.STRING()),
- DataTypes.DECIMAL(38, 18), DataTypes.DECIMAL(38, 18)])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(
+ a BIGINT, b BIGINT, c TINYINT, d BOOLEAN, e SMALLINT, f INT, g FLOAT, h DOUBLE,
+ i BYTES, j STRING, k DATE, l TIME, m TIMESTAMP(3), n ARRAY<BIGINT>,
+ o MAP<BIGINT, STRING>, p DECIMAL(38, 18), q DECIMAL(38, 18))
+ WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
import datetime
import decimal
@@ -625,7 +625,7 @@ class UserDefinedFunctionTests(object):
self.assert_equals(actual,
["+I[1, null, 1, true, 32767, -2147483648, 1.23, 1.98932, "
"[102, 108, 105, 110, 107], pyflink, 2014-09-13, "
- "12:00:00, 2018-03-11 03:00:00.123, [1, 2, 3], "
+ "12:00:00.123, 2018-03-11T03:00:00.123, [1, 2, 3], "
"{1=flink, 2=pyflink}, 1000000000000000000.050000000000000000, "
"1000000000000000000.059999999999999999]"])
@@ -726,9 +726,10 @@ class PyFlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
'local_zoned_timestamp_param is wrong value %s !' % local_zoned_timestamp_param
return local_zoned_timestamp_param
- table_sink = source_sink_utils.TestAppendSink(
- ['a'], [DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
- self.t_env.register_table_sink("Results", table_sink)
+ sink_table_ddl = """
+ CREATE TABLE Results(a TIMESTAMP_LTZ(3)) WITH ('connector'='test-sink')
+ """
+ self.t_env.execute_sql(sink_table_ddl)
t = self.t_env.from_elements(
[(local_datetime,)],
diff --git a/flink-python/pyflink/table/tests/test_udtf.py b/flink-python/pyflink/table/tests/test_udtf.py
index 9b1a011995f..087d56de200 100644
--- a/flink-python/pyflink/table/tests/test_udtf.py
+++ b/flink-python/pyflink/table/tests/test_udtf.py
@@ -28,9 +28,8 @@ from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, \
class UserDefinedTableFunctionTests(object):
def test_table_function(self):
- self._register_table_sink(
- ['a', 'b', 'c'],
- [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()])
+ self._register_table_sink("""CREATE TABLE Results(a BIGINT, b BIGINT, c BIGINT)
+ WITH ('connector'='test-sink')""")
multi_emit = udtf(MultiEmit(), result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()])
multi_num = udf(MultiNum(), result_type=DataTypes.BIGINT())
@@ -48,9 +47,8 @@ class UserDefinedTableFunctionTests(object):
"+I[3, 1, 2]", "+I[3, 2, 2]", "+I[3, 3, null]"])
def test_table_function_with_sql_query(self):
- self._register_table_sink(
- ['a', 'b', 'c'],
- [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()])
+ self._register_table_sink("""CREATE TABLE Results(a BIGINT, b BIGINT, c BIGINT)
+ WITH ('connector'='test-sink')""")
self.t_env.create_temporary_system_function(
"multi_emit", udtf(MultiEmit(), result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()]))
@@ -63,9 +61,8 @@ class UserDefinedTableFunctionTests(object):
actual = self._get_output(t)
self.assert_equals(actual, ["+I[1, 1, 0]", "+I[2, 2, 0]", "+I[3, 3, 0]", "+I[3, 3, 1]"])
- def _register_table_sink(self, field_names: list, field_types: list):
- table_sink = source_sink_utils.TestAppendSink(field_names, field_types)
- self.t_env.register_table_sink("Results", table_sink)
+ def _register_table_sink(self, ddl: str):
+ self.t_env.execute_sql(ddl)
def _get_output(self, t):
t.execute_insert("Results").wait()
diff --git a/flink-python/pyflink/table/types.py b/flink-python/pyflink/table/types.py
index 44458da9b3c..add5b042f7d 100644
--- a/flink-python/pyflink/table/types.py
+++ b/flink-python/pyflink/table/types.py
@@ -1681,161 +1681,15 @@ _primitive_array_element_types = {BooleanType, TinyIntType, SmallIntType, IntTyp
FloatType, DoubleType}
-def _to_java_type(data_type):
+def _from_java_data_type(j_data_type):
"""
- Converts Python type to Java TypeInformation.
- """
-
- global _python_java_types_mapping
- global _python_java_types_mapping_lock
-
- gateway = get_gateway()
- Types = gateway.jvm.org.apache.flink.table.api.Types
-
- if _python_java_types_mapping is None:
- with _python_java_types_mapping_lock:
- _python_java_types_mapping = {
- BooleanType: Types.BOOLEAN(),
- TinyIntType: Types.BYTE(),
- SmallIntType: Types.SHORT(),
- IntType: Types.INT(),
- BigIntType: Types.LONG(),
- FloatType: Types.FLOAT(),
- DoubleType: Types.DOUBLE(),
- DateType: Types.SQL_DATE(),
- }
-
- # basic types
- if type(data_type) in _python_java_types_mapping:
- return _python_java_types_mapping[type(data_type)]
-
- # DecimalType
- elif isinstance(data_type, DecimalType):
- if data_type.precision == 38 and data_type.scale == 18:
- return Types.DECIMAL()
- else:
- raise TypeError("The precision must be 38 and the scale must be 18 for DecimalType, "
- "got %s" % repr(data_type))
-
- # TimeType
- elif isinstance(data_type, TimeType):
- if data_type.precision == 0:
- return Types.SQL_TIME()
- else:
- raise TypeError("The precision must be 0 for TimeType, got %s" % repr(data_type))
-
- # TimestampType
- elif isinstance(data_type, TimestampType):
- if data_type.precision == 3:
- return Types.SQL_TIMESTAMP()
- else:
- raise TypeError("The precision must be 3 for TimestampType, got %s" % repr(data_type))
-
- # LocalZonedTimestampType
- elif isinstance(data_type, LocalZonedTimestampType):
- if data_type.precision == 3:
- return gateway.jvm.org.apache.flink.api.common.typeinfo.Types.INSTANT
- else:
- raise TypeError("The precision must be 3 for LocalZonedTimestampType, got %s"
- % repr(data_type))
-
- # VarCharType
- elif isinstance(data_type, VarCharType):
- if data_type.length == 0x7fffffff:
- return Types.STRING()
- else:
- raise TypeError("The length limit must be 0x7fffffff(2147483647) for VarCharType, "
- "got %s" % repr(data_type))
-
- # VarBinaryType
- elif isinstance(data_type, VarBinaryType):
- if data_type.length == 0x7fffffff:
- return Types.PRIMITIVE_ARRAY(Types.BYTE())
- else:
- raise TypeError("The length limit must be 0x7fffffff(2147483647) for VarBinaryType, "
- "got %s" % repr(data_type))
-
- # YearMonthIntervalType
- elif isinstance(data_type, YearMonthIntervalType):
- if data_type.resolution == YearMonthIntervalType.YearMonthResolution.MONTH and \
- data_type.precision == 2:
- return Types.INTERVAL_MONTHS()
- else:
- raise TypeError("The resolution must be YearMonthResolution.MONTH and the precision "
- "must be 2 for YearMonthIntervalType, got %s" % repr(data_type))
-
- # DayTimeIntervalType
- elif isinstance(data_type, DayTimeIntervalType):
- if data_type.resolution == DayTimeIntervalType.DayTimeResolution.SECOND and \
- data_type.day_precision == 2 and data_type.fractional_precision == 3:
- return Types.INTERVAL_MILLIS()
- else:
- raise TypeError("The resolution must be DayTimeResolution.SECOND, the day_precision "
- "must be 2 and the fractional_precision must be 3 for "
- "DayTimeIntervalType, got %s" % repr(data_type))
-
- # ArrayType
- elif isinstance(data_type, ArrayType):
- return Types.OBJECT_ARRAY(_to_java_type(data_type.element_type))
-
- # ListViewType
- elif isinstance(data_type, ListViewType):
- return gateway.jvm.org.apache.flink.table.dataview.ListViewTypeInfo(_to_java_type(
- data_type._element_type))
-
- # MapType
- elif isinstance(data_type, MapType):
- return Types.MAP(_to_java_type(data_type.key_type), _to_java_type(data_type.value_type))
-
- # MapViewType
- elif isinstance(data_type, MapViewType):
- return gateway.jvm.org.apache.flink.table.dataview.MapViewTypeInfo(
- _to_java_type(data_type._key_type), _to_java_type(data_type._value_type))
-
- # MultisetType
- elif isinstance(data_type, MultisetType):
- return Types.MULTISET(_to_java_type(data_type.element_type))
-
- # RowType
- elif isinstance(data_type, RowType):
- return Types.ROW(
- to_jarray(gateway.jvm.String, data_type.field_names()),
- to_jarray(gateway.jvm.TypeInformation,
- [_to_java_type(f.data_type) for f in data_type.fields]))
-
- # UserDefinedType
- elif isinstance(data_type, UserDefinedType):
- if data_type.java_udt():
- return gateway.jvm.org.apache.flink.util.InstantiationUtil.instantiate(
- gateway.jvm.Class.forName(
- data_type.java_udt(),
- True,
- gateway.jvm.Thread.currentThread().getContextClassLoader()))
- else:
- return _to_java_type(data_type.sql_type())
-
- else:
- raise TypeError("Not supported type: %s" % repr(data_type))
-
-
-def _from_java_type(j_data_type):
- """
- Converts Java TypeInformation to Python DataType.
+ Converts Java DataType to Python DataType.
"""
gateway = get_gateway()
- if is_instance_of(j_data_type, gateway.jvm.TypeInformation):
- # input is TypeInformation
- LegacyTypeInfoDataTypeConverter = \
- gateway.jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
- java_data_type = LegacyTypeInfoDataTypeConverter.toDataType(j_data_type)
- else:
- # input is DataType
- java_data_type = j_data_type
-
# Atomic Type with parameters.
- if is_instance_of(java_data_type, gateway.jvm.AtomicDataType):
- logical_type = java_data_type.getLogicalType()
+ if is_instance_of(j_data_type, gateway.jvm.AtomicDataType):
+ logical_type = j_data_type.getLogicalType()
if is_instance_of(logical_type, gateway.jvm.CharType):
data_type = DataTypes.CHAR(logical_type.getLength(), logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.VarCharType):
@@ -1892,12 +1746,12 @@ def _from_java_type(j_data_type):
data_type = DataTypes.DECIMAL(type_info.precision(), type_info.scale())
elif type_info.getClass() == \
get_java_class(gateway.jvm.org.apache.flink.table.dataview.ListViewTypeInfo):
- data_type = DataTypes.LIST_VIEW(_from_java_type(type_info.getElementType()))
+ data_type = DataTypes.LIST_VIEW(_from_java_data_type(type_info.getElementType()))
elif type_info.getClass() == \
get_java_class(gateway.jvm.org.apache.flink.table.dataview.MapViewTypeInfo):
data_type = DataTypes.MAP_VIEW(
- _from_java_type(type_info.getKeyType()),
- _from_java_type(type_info.getValueType()))
+ _from_java_data_type(type_info.getKeyType()),
+ _from_java_data_type(type_info.getValueType()))
else:
raise TypeError("Unsupported type: %s, it is recognized as a legacy type."
% type_info)
@@ -1910,13 +1764,14 @@ def _from_java_type(j_data_type):
return data_type
# Array Type, MultiSet Type.
- elif is_instance_of(java_data_type, gateway.jvm.CollectionDataType):
- logical_type = java_data_type.getLogicalType()
- element_type = java_data_type.getElementDataType()
+ elif is_instance_of(j_data_type, gateway.jvm.CollectionDataType):
+ logical_type = j_data_type.getLogicalType()
+ element_type = j_data_type.getElementDataType()
if is_instance_of(logical_type, gateway.jvm.ArrayType):
- data_type = DataTypes.ARRAY(_from_java_type(element_type), logical_type.isNullable())
+ data_type = DataTypes.ARRAY(_from_java_data_type(element_type),
+ logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.MultisetType):
- data_type = DataTypes.MULTISET(_from_java_type(element_type),
+ data_type = DataTypes.MULTISET(_from_java_data_type(element_type),
logical_type.isNullable())
else:
raise TypeError("Unsupported collection data type: %s" % j_data_type)
@@ -1924,14 +1779,14 @@ def _from_java_type(j_data_type):
return data_type
# Map Type.
- elif is_instance_of(java_data_type, gateway.jvm.KeyValueDataType):
- logical_type = java_data_type.getLogicalType()
- key_type = java_data_type.getKeyDataType()
- value_type = java_data_type.getValueDataType()
+ elif is_instance_of(j_data_type, gateway.jvm.KeyValueDataType):
+ logical_type = j_data_type.getLogicalType()
+ key_type = j_data_type.getKeyDataType()
+ value_type = j_data_type.getValueDataType()
if is_instance_of(logical_type, gateway.jvm.MapType):
data_type = DataTypes.MAP(
- _from_java_type(key_type),
- _from_java_type(value_type),
+ _from_java_data_type(key_type),
+ _from_java_data_type(value_type),
logical_type.isNullable())
else:
raise TypeError("Unsupported map data type: %s" % j_data_type)
@@ -1939,13 +1794,21 @@ def _from_java_type(j_data_type):
return data_type
# Row Type.
- elif is_instance_of(java_data_type, gateway.jvm.FieldsDataType):
- logical_type = java_data_type.getLogicalType()
- field_data_types = java_data_type.getChildren()
+ elif is_instance_of(j_data_type, gateway.jvm.FieldsDataType):
+ logical_type = j_data_type.getLogicalType()
+ field_data_types = j_data_type.getChildren()
if is_instance_of(logical_type, gateway.jvm.RowType):
- fields = [DataTypes.FIELD(name, _from_java_type(field_data_types[idx]))
+ fields = [DataTypes.FIELD(name, _from_java_data_type(field_data_types[idx]))
for idx, name in enumerate(logical_type.getFieldNames())]
data_type = DataTypes.ROW(fields, logical_type.isNullable())
+ elif j_data_type.getConversionClass().isAssignableFrom(
+ gateway.jvm.org.apache.flink.table.api.dataview.ListView._java_lang_class):
+ array_type = _from_java_data_type(field_data_types[0])
+ data_type = DataTypes.LIST_VIEW(array_type.element_type)
+ elif j_data_type.getConversionClass().isAssignableFrom(
+ gateway.jvm.org.apache.flink.table.api.dataview.MapView._java_lang_class):
+ map_type = _from_java_data_type(field_data_types[0])
+ data_type = DataTypes.MAP_VIEW(map_type.key_type, map_type.value_type)
else:
raise TypeError("Unsupported row data type: %s" % j_data_type)
@@ -2007,6 +1870,15 @@ def _to_java_data_type(data_type: DataType):
fields = [JDataTypes.FIELD(f.name, _to_java_data_type(f.data_type))
for f in data_type.fields]
j_data_type = JDataTypes.ROW(to_jarray(JDataTypes.Field, fields))
+ elif isinstance(data_type, UserDefinedType):
+ if data_type.java_udt():
+ return gateway.jvm.org.apache.flink.util.InstantiationUtil.instantiate(
+ gateway.jvm.Class.forName(
+ data_type.java_udt(),
+ True,
+ gateway.jvm.Thread.currentThread().getContextClassLoader()))
+ else:
+ return _to_java_data_type(data_type.sql_type())
elif isinstance(data_type, MultisetType):
j_data_type = JDataTypes.MULTISET(_to_java_data_type(data_type.element_type))
elif isinstance(data_type, NullType):
diff --git a/flink-python/pyflink/table/udf.py b/flink-python/pyflink/table/udf.py
index 5d38617bc03..50ff1839b51 100644
--- a/flink-python/pyflink/table/udf.py
+++ b/flink-python/pyflink/table/udf.py
@@ -24,7 +24,7 @@ from typing import Union, List, Type, Callable, TypeVar, Generic, Iterable
from pyflink.java_gateway import get_gateway
from pyflink.metrics import MetricGroup
from pyflink.table import Expression
-from pyflink.table.types import DataType, _to_java_type, _to_java_data_type
+from pyflink.table.types import DataType, _to_java_data_type
from pyflink.util import java_utils
__all__ = ['FunctionContext', 'AggregateFunction', 'ScalarFunction', 'TableFunction',
@@ -378,7 +378,7 @@ class UserDefinedFunctionWrapper(object):
if self._input_types is not None:
j_input_types = java_utils.to_jarray(
- gateway.jvm.TypeInformation, [_to_java_type(i) for i in self._input_types])
+ gateway.jvm.DataType, [_to_java_data_type(i) for i in self._input_types])
else:
j_input_types = None
j_function_kind = get_python_function_kind()
@@ -416,7 +416,7 @@ class UserDefinedScalarFunctionWrapper(UserDefinedFunctionWrapper):
def _create_judf(self, serialized_func, j_input_types, j_function_kind):
gateway = get_gateway()
- j_result_type = _to_java_type(self._result_type)
+ j_result_type = _to_java_data_type(self._result_type)
PythonScalarFunction = gateway.jvm \
.org.apache.flink.table.functions.python.PythonScalarFunction
j_scalar_function = PythonScalarFunction(
@@ -458,9 +458,9 @@ class UserDefinedTableFunctionWrapper(UserDefinedFunctionWrapper):
def _create_judf(self, serialized_func, j_input_types, j_function_kind):
gateway = get_gateway()
- j_result_types = java_utils.to_jarray(gateway.jvm.TypeInformation,
- [_to_java_type(i) for i in self._result_types])
- j_result_type = gateway.jvm.org.apache.flink.api.java.typeutils.RowTypeInfo(j_result_types)
+ j_result_types = java_utils.to_jarray(gateway.jvm.DataType,
+ [_to_java_data_type(i) for i in self._result_types])
+ j_result_type = gateway.jvm.DataTypes.ROW(j_result_types)
PythonTableFunction = gateway.jvm \
.org.apache.flink.table.functions.python.PythonTableFunction
j_table_function = PythonTableFunction(
diff --git a/flink-python/pyflink/testing/source_sink_utils.py b/flink-python/pyflink/testing/source_sink_utils.py
index e37fed9eb70..3f713bf49e1 100644
--- a/flink-python/pyflink/testing/source_sink_utils.py
+++ b/flink-python/pyflink/testing/source_sink_utils.py
@@ -24,7 +24,7 @@ from py4j.java_gateway import java_import
from pyflink.find_flink_home import _find_flink_source_root
from pyflink.java_gateway import get_gateway
from pyflink.table.sinks import TableSink
-from pyflink.table.types import _to_java_type
+from pyflink.table.types import _to_java_data_type
from pyflink.util import java_utils
@@ -35,13 +35,7 @@ class TestTableSink(TableSink):
_inited = False
- def __init__(self, j_table_sink, field_names, field_types):
- gateway = get_gateway()
- j_field_names = java_utils.to_jarray(gateway.jvm.String, field_names)
- j_field_types = java_utils.to_jarray(
- gateway.jvm.TypeInformation,
- [_to_java_type(field_type) for field_type in field_types])
- j_table_sink = j_table_sink.configure(j_field_names, j_field_types)
+ def __init__(self, j_table_sink):
super(TestTableSink, self).__init__(j_table_sink)
@classmethod
@@ -57,10 +51,8 @@ class TestTableSink(TableSink):
"'flink-python*-tests.jar' is not available. Will skip the related tests.")
gateway = get_gateway()
- java_import(gateway.jvm, "org.apache.flink.table.legacyutils.TestAppendSink")
- java_import(gateway.jvm, "org.apache.flink.table.legacyutils.TestRetractSink")
- java_import(gateway.jvm, "org.apache.flink.table.legacyutils.TestUpsertSink")
- java_import(gateway.jvm, "org.apache.flink.table.legacyutils.RowCollector")
+ java_import(gateway.jvm, "org.apache.flink.table.utils.TestingSinks$TestAppendingSink")
+ java_import(gateway.jvm, "org.apache.flink.table.utils.TestingSinks$RowCollector")
TestTableSink._inited = True
@@ -72,10 +64,13 @@ class TestAppendSink(TestTableSink):
def __init__(self, field_names, field_types):
TestTableSink._ensure_initialized()
-
gateway = get_gateway()
- super(TestAppendSink, self).__init__(
- gateway.jvm.TestAppendSink(), field_names, field_types)
+ j_field_names = java_utils.to_jarray(gateway.jvm.String, field_names)
+ j_field_types = java_utils.to_jarray(
+ gateway.jvm.DataType,
+ [_to_java_data_type(field_type) for field_type in field_types])
+ super(TestAppendSink, self).__init__(gateway.jvm.org.apache.flink.table.utils.TestingSinks
+ .TestAppendingSink(j_field_names, j_field_types))
class TestRetractSink(TestTableSink):
@@ -85,10 +80,13 @@ class TestRetractSink(TestTableSink):
def __init__(self, field_names, field_types):
TestTableSink._ensure_initialized()
-
gateway = get_gateway()
- super(TestRetractSink, self).__init__(
- gateway.jvm.TestRetractSink(), field_names, field_types)
+ j_field_names = java_utils.to_jarray(gateway.jvm.String, field_names)
+ j_field_types = java_utils.to_jarray(
+ gateway.jvm.DataType,
+ [_to_java_data_type(field_type) for field_type in field_types])
+ super(TestRetractSink, self).__init__(gateway.jvm.org.apache.flink.table.utils.TestingSinks
+ .TestAppendingSink(j_field_names, j_field_types))
class TestUpsertSink(TestTableSink):
@@ -120,8 +118,9 @@ def retract_results():
Retrieves the results from a retract table sink.
"""
gateway = get_gateway()
- results = gateway.jvm.RowCollector.getAndClearValues()
- return gateway.jvm.RowCollector.retractResults(results)
+ results = gateway.jvm.org.apache.flink.table.utils.TestingSinks.RowCollector.getAndClearValues()
+ return gateway.jvm\
+ .org.apache.flink.table.utils.TestingSinks.RowCollector.retractResults(results)
def upsert_results(keys):
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
index 595167dd694..5bb30ae8f82 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
@@ -20,11 +20,13 @@ package org.apache.flink.streaming.api.utils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.BigIntSerializer;
import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
@@ -47,6 +49,7 @@ import org.apache.flink.api.common.typeutils.base.array.FloatPrimitiveArraySeria
import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer;
import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer;
+import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
@@ -55,6 +58,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.streaming.api.typeinfo.python.PickledByteArrayTypeInfo;
@@ -65,14 +70,34 @@ import org.apache.flink.table.runtime.typeutils.serializers.python.StringSeriali
import org.apache.flink.table.runtime.typeutils.serializers.python.TimeSerializer;
import org.apache.flink.table.runtime.typeutils.serializers.python.TimestampSerializer;
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
+import java.io.IOException;
+import java.lang.reflect.Array;
import java.lang.reflect.Field;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TimeZone;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/** A util class for converting the given TypeInformation to other objects. */
@Internal
@@ -590,4 +615,510 @@ public class PythonTypeUtils {
typeInformation.toString()));
}
}
+
+ /**
+ * Wrap the unpickled python data with an InputFormat. It will be passed to
+ * StreamExecutionEnvironment.creatInput() to create an InputFormat later.
+ *
+ * @param data The unpickled python data.
+ * @param dataType The python data type.
+ * @param config The execution config used to create serializer.
+ * @return An InputFormat containing the python data.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> InputFormat<T, ?> getCollectionInputFormat(
+ final List<T> data, final TypeInformation<T> dataType, final ExecutionConfig config) {
+ Function<Object, Object> converter = converter(dataType, config);
+ return new CollectionInputFormat<>(
+ data.stream()
+ .map(objects -> (T) converter.apply(objects))
+ .collect(Collectors.toList()),
+ dataType.createSerializer(config));
+ }
+
+ private static BiFunction<Integer, Function<Integer, Object>, Object> arrayConstructor(
+ final TypeInformation<?> elementType, final boolean primitiveArray) {
+ if (elementType.equals(BasicTypeInfo.BOOLEAN_TYPE_INFO)) {
+ if (primitiveArray) {
+ return (length, elementGetter) -> {
+ boolean[] array = new boolean[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (boolean) elementGetter.apply(i);
+ }
+ return array;
+ };
+ } else {
+ return (length, elementGetter) -> {
+ Boolean[] array = new Boolean[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (Boolean) elementGetter.apply(i);
+ }
+ return array;
+ };
+ }
+ }
+ if (elementType.equals(BasicTypeInfo.BYTE_TYPE_INFO)) {
+ if (primitiveArray) {
+ return (length, elementGetter) -> {
+ byte[] array = new byte[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (byte) elementGetter.apply(i);
+ }
+ return array;
+ };
+ } else {
+ return (length, elementGetter) -> {
+ Byte[] array = new Byte[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (Byte) elementGetter.apply(i);
+ }
+ return array;
+ };
+ }
+ }
+ if (elementType.equals(BasicTypeInfo.SHORT_TYPE_INFO)) {
+ if (primitiveArray) {
+ return (length, elementGetter) -> {
+ short[] array = new short[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (short) elementGetter.apply(i);
+ }
+ return array;
+ };
+ } else {
+ return (length, elementGetter) -> {
+ Short[] array = new Short[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (Short) elementGetter.apply(i);
+ }
+ return array;
+ };
+ }
+ }
+ if (elementType.equals(BasicTypeInfo.INT_TYPE_INFO)) {
+ if (primitiveArray) {
+ return (length, elementGetter) -> {
+ int[] array = new int[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (int) elementGetter.apply(i);
+ }
+ return array;
+ };
+ } else {
+ return (length, elementGetter) -> {
+ Integer[] array = new Integer[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (Integer) elementGetter.apply(i);
+ }
+ return array;
+ };
+ }
+ }
+ if (elementType.equals(BasicTypeInfo.LONG_TYPE_INFO)) {
+ if (primitiveArray) {
+ return (length, elementGetter) -> {
+ long[] array = new long[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (long) elementGetter.apply(i);
+ }
+ return array;
+ };
+ } else {
+ return (length, elementGetter) -> {
+ Long[] array = new Long[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (Long) elementGetter.apply(i);
+ }
+ return array;
+ };
+ }
+ }
+ if (elementType.equals(BasicTypeInfo.FLOAT_TYPE_INFO)) {
+ if (primitiveArray) {
+ return (length, elementGetter) -> {
+ float[] array = new float[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (float) elementGetter.apply(i);
+ }
+ return array;
+ };
+ } else {
+ return (length, elementGetter) -> {
+ Float[] array = new Float[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (Float) elementGetter.apply(i);
+ }
+ return array;
+ };
+ }
+ }
+ if (elementType.equals(BasicTypeInfo.DOUBLE_TYPE_INFO)) {
+ if (primitiveArray) {
+ return (length, elementGetter) -> {
+ double[] array = new double[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (double) elementGetter.apply(i);
+ }
+ return array;
+ };
+ } else {
+ return (length, elementGetter) -> {
+ Double[] array = new Double[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (Double) elementGetter.apply(i);
+ }
+ return array;
+ };
+ }
+ }
+ if (elementType.equals(BasicTypeInfo.STRING_TYPE_INFO)) {
+ return (length, elementGetter) -> {
+ String[] array = new String[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (String) elementGetter.apply(i);
+ }
+ return array;
+ };
+ }
+ return (length, elementGetter) -> {
+ Object[] array = new Object[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = elementGetter.apply(i);
+ }
+ return array;
+ };
+ }
+
+ private static Function<Object, Object> converter(
+ final TypeInformation<?> dataType, final ExecutionConfig config) {
+ if (dataType.equals(Types.BOOLEAN)) {
+ return b -> b instanceof Boolean ? b : null;
+ }
+ if (dataType.equals(Types.BYTE)) {
+ return c -> {
+ if (c instanceof Byte) {
+ return c;
+ }
+ if (c instanceof Short) {
+ return ((Short) c).byteValue();
+ }
+ if (c instanceof Integer) {
+ return ((Integer) c).byteValue();
+ }
+ if (c instanceof Long) {
+ return ((Long) c).byteValue();
+ }
+ return null;
+ };
+ }
+ if (dataType.equals(Types.SHORT)) {
+ return c -> {
+ if (c instanceof Byte) {
+ return ((Byte) c).shortValue();
+ }
+ if (c instanceof Short) {
+ return c;
+ }
+ if (c instanceof Integer) {
+ return ((Integer) c).shortValue();
+ }
+ if (c instanceof Long) {
+ return ((Long) c).shortValue();
+ }
+ return null;
+ };
+ }
+ if (dataType.equals(Types.INT)) {
+ return c -> {
+ if (c instanceof Byte) {
+ return ((Byte) c).intValue();
+ }
+ if (c instanceof Short) {
+ return ((Short) c).intValue();
+ }
+ if (c instanceof Integer) {
+ return c;
+ }
+ if (c instanceof Long) {
+ return ((Long) c).intValue();
+ }
+ return null;
+ };
+ }
+ if (dataType.equals(Types.LONG)) {
+ return c -> {
+ if (c instanceof Byte) {
+ return ((Byte) c).longValue();
+ }
+ if (c instanceof Short) {
+ return ((Short) c).longValue();
+ }
+ if (c instanceof Integer) {
+ return ((Integer) c).longValue();
+ }
+ if (c instanceof Long) {
+ return c;
+ }
+ return null;
+ };
+ }
+ if (dataType.equals(Types.FLOAT)) {
+ return c -> {
+ if (c instanceof Float) {
+ return c;
+ }
+ if (c instanceof Double) {
+ return ((Double) c).floatValue();
+ }
+ return null;
+ };
+ }
+ if (dataType.equals(Types.DOUBLE)) {
+ return c -> {
+ if (c instanceof Float) {
+ return ((Float) c).doubleValue();
+ }
+ if (c instanceof Double) {
+ return c;
+ }
+ return null;
+ };
+ }
+ if (dataType.equals(Types.BIG_DEC)) {
+ return c -> c instanceof BigDecimal ? c : null;
+ }
+ if (dataType.equals(Types.SQL_DATE)) {
+ return c -> {
+ if (c instanceof Integer) {
+ long millisLocal = ((Integer) c).longValue() * 86400000;
+ long millisUtc = millisLocal - getOffsetFromLocalMillis(millisLocal);
+ return new Date(millisUtc);
+ }
+ return null;
+ };
+ }
+
+ if (dataType.equals(Types.SQL_TIME)) {
+ return c ->
+ c instanceof Integer || c instanceof Long
+ ? new Time(((Number) c).longValue() / 1000)
+ : null;
+ }
+
+ if (dataType.equals(Types.SQL_TIMESTAMP)) {
+ return c ->
+ c instanceof Integer || c instanceof Long
+ ? new Timestamp(((Number) c).longValue() / 1000)
+ : null;
+ }
+
+ if (dataType.equals(Types.LOCAL_DATE)) {
+ return c -> {
+ if (c instanceof Integer) {
+ long millisLocal = ((Integer) c).longValue() * 86400000;
+ long millisUtc = millisLocal - getOffsetFromLocalMillis(millisLocal);
+ return Instant.ofEpochMilli(millisUtc)
+ .atZone(ZoneId.systemDefault())
+ .toLocalDate();
+ }
+ return null;
+ };
+ }
+
+ if (dataType.equals(Types.LOCAL_TIME)) {
+ return c ->
+ c instanceof Integer || c instanceof Long
+ ? Instant.ofEpochMilli(((Number) c).longValue() / 1000)
+ .atZone(ZoneId.systemDefault())
+ .toLocalTime()
+ : null;
+ }
+
+ if (dataType.equals(Types.LOCAL_DATE_TIME)) {
+ return c ->
+ c instanceof Integer || c instanceof Long
+ ? Instant.ofEpochMilli(((Number) c).longValue() / 1000)
+ .atZone(ZoneId.systemDefault())
+ .toLocalDateTime()
+ : null;
+ }
+
+ if (dataType.equals(org.apache.flink.api.common.typeinfo.Types.INSTANT)) {
+ return c ->
+ c instanceof Integer || c instanceof Long
+ ? Instant.ofEpochMilli(((Number) c).longValue() / 1000)
+ : null;
+ }
+ if (dataType.equals(TimeIntervalTypeInfo.INTERVAL_MILLIS)) {
+ return c ->
+ c instanceof Integer || c instanceof Long
+ ? ((Number) c).longValue() / 1000
+ : null;
+ }
+ if (dataType.equals(Types.STRING)) {
+ return c -> c != null ? c.toString() : null;
+ }
+ if (dataType.equals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) {
+ return c -> {
+ if (c instanceof String) {
+ return ((String) c).getBytes(StandardCharsets.UTF_8);
+ }
+ if (c instanceof byte[]) {
+ return c;
+ }
+ return null;
+ };
+ }
+ if (dataType instanceof PrimitiveArrayTypeInfo
+ || dataType instanceof BasicArrayTypeInfo
+ || dataType instanceof ObjectArrayTypeInfo) {
+ TypeInformation<?> elementType =
+ dataType instanceof PrimitiveArrayTypeInfo
+ ? ((PrimitiveArrayTypeInfo<?>) dataType).getComponentType()
+ : dataType instanceof BasicArrayTypeInfo
+ ? ((BasicArrayTypeInfo<?, ?>) dataType).getComponentInfo()
+ : ((ObjectArrayTypeInfo<?, ?>) dataType).getComponentInfo();
+ boolean primitive = dataType instanceof PrimitiveArrayTypeInfo;
+ Function<Object, Object> elementConverter = converter(elementType, config);
+ BiFunction<Integer, Function<Integer, Object>, Object> arrayConstructor =
+ arrayConstructor(elementType, primitive);
+ return c -> {
+ int length = -1;
+ Function<Integer, Object> elementGetter = null;
+ if (c instanceof List) {
+ length = ((List<?>) c).size();
+ elementGetter = i -> elementConverter.apply(((List<?>) c).get(i));
+ }
+ if (c != null && c.getClass().isArray()) {
+ length = Array.getLength(c);
+ elementGetter = i -> elementConverter.apply(Array.get(c, i));
+ }
+ if (elementGetter != null) {
+ return arrayConstructor.apply(length, elementGetter);
+ }
+ return null;
+ };
+ }
+ if (dataType instanceof MapTypeInfo) {
+ Function<Object, Object> keyConverter =
+ converter(((MapTypeInfo<?, ?>) dataType).getKeyTypeInfo(), config);
+ Function<Object, Object> valueConverter =
+ converter(((MapTypeInfo<?, ?>) dataType).getValueTypeInfo(), config);
+ return c ->
+ c instanceof Map
+ ? ((Map<?, ?>) c)
+ .entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ e -> keyConverter.apply(e.getKey()),
+ e ->
+ valueConverter.apply(
+ e.getValue())))
+ : null;
+ }
+ if (dataType instanceof RowTypeInfo) {
+ TypeInformation<?>[] fieldTypes = ((RowTypeInfo) dataType).getFieldTypes();
+ List<Function<Object, Object>> fieldConverters =
+ Arrays.stream(fieldTypes)
+ .map(x -> converter(x, config))
+ .collect(Collectors.toList());
+ return c -> {
+ if (c != null && c.getClass().isArray()) {
+ int length = Array.getLength(c);
+ if (length - 1 != fieldTypes.length) {
+ throw new IllegalStateException(
+ "Input row doesn't have expected number of values required by the schema. "
+ + fieldTypes.length
+ + " fields are required while "
+ + (length - 1)
+ + " values are provided.");
+ }
+
+ Row row = new Row(length - 1);
+ row.setKind(RowKind.fromByteValue(((Number) Array.get(c, 0)).byteValue()));
+
+ for (int i = 0; i < row.getArity(); i++) {
+ row.setField(i, fieldConverters.get(i).apply(Array.get(c, i + 1)));
+ }
+
+ return row;
+ }
+ return null;
+ };
+ }
+ if (dataType instanceof TupleTypeInfo) {
+ TypeInformation<?>[] fieldTypes = ((TupleTypeInfo<?>) dataType).getFieldTypes();
+ List<Function<Object, Object>> fieldConverters =
+ Arrays.stream(fieldTypes)
+ .map(x -> converter(x, config))
+ .collect(Collectors.toList());
+ return c -> {
+ if (c != null && c.getClass().isArray()) {
+ int length = Array.getLength(c);
+ if (length != fieldTypes.length) {
+ throw new IllegalStateException(
+ "Input tuple doesn't have expected number of values required by the schema. "
+ + fieldTypes.length
+ + " fields are required while "
+ + length
+ + " values are provided.");
+ }
+
+ Tuple tuple = Tuple.newInstance(length);
+ for (int i = 0; i < tuple.getArity(); i++) {
+ tuple.setField(fieldConverters.get(i).apply(Array.get(c, i)), i);
+ }
+
+ return tuple;
+ }
+ return null;
+ };
+ }
+
+ return c -> {
+ if (c == null
+ || c.getClass() != byte[].class
+ || dataType instanceof PickledByteArrayTypeInfo) {
+ return c;
+ }
+
+ // other typeinfos will use the corresponding serializer to deserialize data.
+ byte[] b = (byte[]) c;
+ TypeSerializer<?> dataSerializer = dataType.createSerializer(config);
+ ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos();
+ DataInputViewStreamWrapper baisWrapper = new DataInputViewStreamWrapper(bais);
+ bais.setBuffer(b, 0, b.length);
+ try {
+ return dataSerializer.deserialize(baisWrapper);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "Failed to deserialize the object with datatype " + dataType, e);
+ }
+ };
+ }
+
+ private static int getOffsetFromLocalMillis(final long millisLocal) {
+ TimeZone localZone = TimeZone.getDefault();
+ int result = localZone.getRawOffset();
+ // the actual offset should be calculated based on milliseconds in UTC
+ int offset = localZone.getOffset(millisLocal - (long) result);
+ if (offset != result) {
+ // DayLight Saving Time
+ result = localZone.getOffset(millisLocal - (long) offset);
+ if (result != offset) {
+ // fallback to do the reverse lookup using java.time.LocalDateTime
+ // this should only happen near the start or end of DST
+ LocalDate localDate = LocalDate.ofEpochDay(millisLocal / 86400000L);
+ LocalTime localTime =
+ LocalTime.ofNanoOfDay(
+ Math.floorMod(millisLocal, 86400000L) * 1000L * 1000L);
+ LocalDateTime localDateTime = LocalDateTime.of(localDate, localTime);
+ long millisEpoch =
+ localDateTime.atZone(localZone.toZoneId()).toInstant().toEpochMilli();
+ result = (int) (millisLocal - millisEpoch);
+ }
+ }
+
+ return result;
+ }
}
diff --git a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableFactory.java b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableFactory.java
new file mode 100644
index 00000000000..31012f9815a
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils.python;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.table.utils.python.PythonDynamicTableOptions.BATCH_MODE;
+import static org.apache.flink.table.utils.python.PythonDynamicTableOptions.INPUT_FILE_PATH;
+
+/** Table source factory for PythonDynamicTableSource. */
+public class PythonDynamicTableFactory implements DynamicTableSourceFactory {
+
+ public static final String IDENTIFIER = "python-input-format";
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+ ReadableConfig tableOptions = helper.getOptions();
+
+ String inputFilePath = tableOptions.get(INPUT_FILE_PATH);
+ ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
+ boolean batched = tableOptions.get(BATCH_MODE);
+ return new PythonDynamicTableSource(inputFilePath, batched, schema.toPhysicalRowDataType());
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(INPUT_FILE_PATH);
+ options.add(BATCH_MODE);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Collections.emptySet();
+ }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableOptions.java b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableOptions.java
new file mode 100644
index 00000000000..68d748842d2
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils.python;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Options for PythonDynamicTableSource. */
+public class PythonDynamicTableOptions {
+
+ public static final ConfigOption<String> INPUT_FILE_PATH =
+ ConfigOptions.key("file-path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The path of the input file.");
+
+ public static final ConfigOption<Boolean> BATCH_MODE =
+ ConfigOptions.key("batched")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether the values are serialized in batch.");
+}
diff --git a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableSource.java b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableSource.java
new file mode 100644
index 00000000000..4a306803ea0
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableSource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils.python;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.python.PythonBridgeUtils;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.InputFormatProvider;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import java.io.IOException;
+
+/** Implementation of {@link ScanTableSource} for python elements table. */
+public class PythonDynamicTableSource implements ScanTableSource {
+ private final String filePath;
+ private final boolean batched;
+ private final DataType producedDataType;
+
+ public PythonDynamicTableSource(String filePath, boolean batched, DataType producedDataType) {
+ this.filePath = filePath;
+ this.batched = batched;
+ this.producedDataType = producedDataType;
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new PythonDynamicTableSource(filePath, batched, producedDataType);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Python Table Source";
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+ try {
+ InputFormat<RowData, ?> inputFormat =
+ PythonTableUtils.getInputFormat(
+ PythonBridgeUtils.readPythonObjects(filePath, batched),
+ producedDataType);
+ return InputFormatProvider.of(inputFormat);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to read input data from %s.", filePath), e);
+ }
+ }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonInputFormatTableSource.java b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonInputFormatTableSource.java
deleted file mode 100644
index c6d6ff9887d..00000000000
--- a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonInputFormatTableSource.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.utils.python;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.sources.InputFormatTableSource;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-
-/** An {@link InputFormatTableSource} created by python 'from_element' method. */
-@Internal
-public class PythonInputFormatTableSource extends InputFormatTableSource<Row> {
-
- /**
- * The input format which contains the python data collection, usually created by {@link
- * PythonTableUtils#getInputFormat(List, TypeInformation, ExecutionConfig)} method.
- */
- private final InputFormat<Row, ? extends InputSplit> inputFormat;
-
- /**
- * The row type info of the python data. It is generated by the python 'from_element' method.
- */
- private final RowTypeInfo rowTypeInfo;
-
- public PythonInputFormatTableSource(
- InputFormat<Row, ? extends InputSplit> inputFormat, RowTypeInfo rowTypeInfo) {
- this.inputFormat = inputFormat;
- this.rowTypeInfo = rowTypeInfo;
- }
-
- @Override
- public InputFormat<Row, ?> getInputFormat() {
- return inputFormat;
- }
-
- @Override
- public TableSchema getTableSchema() {
- return TableSchema.fromTypeInfo(rowTypeInfo);
- }
-
- @Override
- public TypeInformation<Row> getReturnType() {
- return rowTypeInfo;
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java
index f406c475edf..1d1ed0aab20 100644
--- a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java
@@ -19,40 +19,64 @@
package org.apache.flink.table.utils.python;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.io.CollectionInputFormat;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.streaming.api.typeinfo.python.PickledByteArrayTypeInfo;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.dataview.ListView;
+import org.apache.flink.table.api.dataview.MapView;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
import org.apache.flink.types.RowKind;
-import java.io.IOException;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.ZoneId;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.TimeZone;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -65,205 +89,131 @@ public final class PythonTableUtils {
private PythonTableUtils() {}
/**
- * Wrap the unpickled python data with an InputFormat. It will be passed to
- * PythonInputFormatTableSource later.
+ * Create a table from {@link PythonDynamicTableSource} that read data from input file with
+ * specific {@link DataType}.
*
- * @param data The unpickled python data.
- * @param dataType The python data type.
- * @param config The execution config used to create serializer.
- * @return An InputFormat containing the python data.
+ * @param tEnv The TableEnvironment to create table.
+ * @param filePath the file path of the input data.
+ * @param schema The python data type.
+ * @param batched Whether to read data in a batch
+ * @return Table with InputFormat.
*/
- public static InputFormat<Row, ?> getInputFormat(
- final List<Object[]> data,
- final TypeInformation<Row> dataType,
- final ExecutionConfig config) {
- Function<Object, Object> converter = converter(dataType, config);
- return new CollectionInputFormat<>(
- data.stream()
- .map(objects -> (Row) converter.apply(objects))
- .collect(Collectors.toList()),
- dataType.createSerializer(config));
+ public static Table createTableFromElement(
+ TableEnvironment tEnv, String filePath, DataType schema, boolean batched) {
+ TableDescriptor.Builder builder =
+ TableDescriptor.forConnector(PythonDynamicTableFactory.IDENTIFIER)
+ .option(PythonDynamicTableOptions.INPUT_FILE_PATH, filePath)
+ .option(PythonDynamicTableOptions.BATCH_MODE, batched)
+ .schema(Schema.newBuilder().fromRowDataType(schema).build());
+ return tEnv.from(builder.build());
}
/**
* Wrap the unpickled python data with an InputFormat. It will be passed to
- * StreamExecutionEnvironment.creatInput() to create an InputFormat later.
+ * PythonDynamicTableSource later.
*
* @param data The unpickled python data.
* @param dataType The python data type.
- * @param config The execution config used to create serializer.
* @return An InputFormat containing the python data.
*/
- @SuppressWarnings("unchecked")
- public static <T> InputFormat<T, ?> getCollectionInputFormat(
- final List<T> data, final TypeInformation<T> dataType, final ExecutionConfig config) {
- Function<Object, Object> converter = converter(dataType, config);
- return new CollectionInputFormat<>(
+ public static InputFormat<RowData, ?> getInputFormat(
+ final List<Object[]> data, final DataType dataType) {
+ Function<Object, Object> converter = converter(dataType.getLogicalType());
+ Collection<RowData> dataCollection =
data.stream()
- .map(objects -> (T) converter.apply(objects))
- .collect(Collectors.toList()),
- dataType.createSerializer(config));
+ .map(objects -> (RowData) converter.apply(objects))
+ .collect(Collectors.toList());
+ return new CollectionInputFormat<>(
+ dataCollection, InternalSerializers.create(dataType.getLogicalType()));
}
private static BiFunction<Integer, Function<Integer, Object>, Object> arrayConstructor(
- final TypeInformation<?> elementType, final boolean primitiveArray) {
- if (elementType.equals(BasicTypeInfo.BOOLEAN_TYPE_INFO)) {
- if (primitiveArray) {
- return (length, elementGetter) -> {
- boolean[] array = new boolean[length];
- for (int i = 0; i < length; i++) {
- array[i] = (boolean) elementGetter.apply(i);
- }
- return array;
- };
- } else {
- return (length, elementGetter) -> {
- Boolean[] array = new Boolean[length];
- for (int i = 0; i < length; i++) {
- array[i] = (Boolean) elementGetter.apply(i);
- }
- return array;
- };
- }
- }
- if (elementType.equals(BasicTypeInfo.BYTE_TYPE_INFO)) {
- if (primitiveArray) {
- return (length, elementGetter) -> {
- byte[] array = new byte[length];
- for (int i = 0; i < length; i++) {
- array[i] = (byte) elementGetter.apply(i);
- }
- return array;
- };
- } else {
- return (length, elementGetter) -> {
- Byte[] array = new Byte[length];
- for (int i = 0; i < length; i++) {
- array[i] = (Byte) elementGetter.apply(i);
- }
- return array;
- };
- }
+ final LogicalType elementType) {
+ if (elementType instanceof BooleanType) {
+ return (length, elementGetter) -> {
+ Boolean[] array = new Boolean[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (Boolean) elementGetter.apply(i);
+ }
+ return new GenericArrayData(array);
+ };
}
- if (elementType.equals(BasicTypeInfo.SHORT_TYPE_INFO)) {
- if (primitiveArray) {
- return (length, elementGetter) -> {
- short[] array = new short[length];
- for (int i = 0; i < length; i++) {
- array[i] = (short) elementGetter.apply(i);
- }
- return array;
- };
- } else {
- return (length, elementGetter) -> {
- Short[] array = new Short[length];
- for (int i = 0; i < length; i++) {
- array[i] = (Short) elementGetter.apply(i);
- }
- return array;
- };
- }
+ if (elementType instanceof TinyIntType) {
+ return (length, elementGetter) -> {
+ Byte[] array = new Byte[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (Byte) elementGetter.apply(i);
+ }
+ return new GenericArrayData(array);
+ };
}
- if (elementType.equals(BasicTypeInfo.INT_TYPE_INFO)) {
- if (primitiveArray) {
- return (length, elementGetter) -> {
- int[] array = new int[length];
- for (int i = 0; i < length; i++) {
- array[i] = (int) elementGetter.apply(i);
- }
- return array;
- };
- } else {
- return (length, elementGetter) -> {
- Integer[] array = new Integer[length];
- for (int i = 0; i < length; i++) {
- array[i] = (Integer) elementGetter.apply(i);
- }
- return array;
- };
- }
+
+ if (elementType instanceof IntType) {
+ return (length, elementGetter) -> {
+ Integer[] array = new Integer[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (Integer) elementGetter.apply(i);
+ }
+ return new GenericArrayData(array);
+ };
}
- if (elementType.equals(BasicTypeInfo.LONG_TYPE_INFO)) {
- if (primitiveArray) {
- return (length, elementGetter) -> {
- long[] array = new long[length];
- for (int i = 0; i < length; i++) {
- array[i] = (long) elementGetter.apply(i);
- }
- return array;
- };
- } else {
- return (length, elementGetter) -> {
- Long[] array = new Long[length];
- for (int i = 0; i < length; i++) {
- array[i] = (Long) elementGetter.apply(i);
- }
- return array;
- };
- }
+ if (elementType instanceof BigIntType) {
+ return (length, elementGetter) -> {
+ Long[] array = new Long[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (Long) elementGetter.apply(i);
+ }
+ return new GenericArrayData(array);
+ };
}
- if (elementType.equals(BasicTypeInfo.FLOAT_TYPE_INFO)) {
- if (primitiveArray) {
- return (length, elementGetter) -> {
- float[] array = new float[length];
- for (int i = 0; i < length; i++) {
- array[i] = (float) elementGetter.apply(i);
- }
- return array;
- };
- } else {
- return (length, elementGetter) -> {
- Float[] array = new Float[length];
- for (int i = 0; i < length; i++) {
- array[i] = (Float) elementGetter.apply(i);
- }
- return array;
- };
- }
+ if (elementType instanceof FloatType) {
+ return (length, elementGetter) -> {
+ Float[] array = new Float[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (Float) elementGetter.apply(i);
+ }
+ return new GenericArrayData(array);
+ };
}
- if (elementType.equals(BasicTypeInfo.DOUBLE_TYPE_INFO)) {
- if (primitiveArray) {
- return (length, elementGetter) -> {
- double[] array = new double[length];
- for (int i = 0; i < length; i++) {
- array[i] = (double) elementGetter.apply(i);
- }
- return array;
- };
- } else {
- return (length, elementGetter) -> {
- Double[] array = new Double[length];
- for (int i = 0; i < length; i++) {
- array[i] = (Double) elementGetter.apply(i);
- }
- return array;
- };
- }
+ if (elementType instanceof DoubleType) {
+ return (length, elementGetter) -> {
+ Double[] array = new Double[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = (Double) elementGetter.apply(i);
+ }
+ return new GenericArrayData(array);
+ };
}
- if (elementType.equals(BasicTypeInfo.STRING_TYPE_INFO)) {
+ if (elementType instanceof CharType || elementType instanceof VarCharType) {
return (length, elementGetter) -> {
- String[] array = new String[length];
+ StringData[] array = new StringData[length];
for (int i = 0; i < length; i++) {
- array[i] = (String) elementGetter.apply(i);
+ array[i] = (StringData) elementGetter.apply(i);
}
- return array;
+ return new GenericArrayData(array);
};
}
+
return (length, elementGetter) -> {
Object[] array = new Object[length];
for (int i = 0; i < length; i++) {
array[i] = elementGetter.apply(i);
}
- return array;
+ return new GenericArrayData(array);
};
}
- private static Function<Object, Object> converter(
- final TypeInformation<?> dataType, final ExecutionConfig config) {
- if (dataType.equals(Types.BOOLEAN())) {
+ private static Function<Object, Object> converter(LogicalType logicalType) {
+
+ if (logicalType instanceof NullType) {
+ return n -> null;
+ }
+
+ if (logicalType instanceof BooleanType) {
return b -> b instanceof Boolean ? b : null;
}
- if (dataType.equals(Types.BYTE())) {
+
+ if (logicalType instanceof TinyIntType) {
return c -> {
if (c instanceof Byte) {
return c;
@@ -280,7 +230,8 @@ public final class PythonTableUtils {
return null;
};
}
- if (dataType.equals(Types.SHORT())) {
+
+ if (logicalType instanceof SmallIntType) {
return c -> {
if (c instanceof Byte) {
return ((Byte) c).shortValue();
@@ -297,7 +248,8 @@ public final class PythonTableUtils {
return null;
};
}
- if (dataType.equals(Types.INT())) {
+
+ if (logicalType instanceof IntType) {
return c -> {
if (c instanceof Byte) {
return ((Byte) c).intValue();
@@ -314,7 +266,8 @@ public final class PythonTableUtils {
return null;
};
}
- if (dataType.equals(Types.LONG())) {
+
+ if (logicalType instanceof BigIntType) {
return c -> {
if (c instanceof Byte) {
return ((Byte) c).longValue();
@@ -331,7 +284,8 @@ public final class PythonTableUtils {
return null;
};
}
- if (dataType.equals(Types.FLOAT())) {
+
+ if (logicalType instanceof FloatType) {
return c -> {
if (c instanceof Float) {
return c;
@@ -342,7 +296,7 @@ public final class PythonTableUtils {
return null;
};
}
- if (dataType.equals(Types.DOUBLE())) {
+ if (logicalType instanceof DoubleType) {
return c -> {
if (c instanceof Float) {
return ((Float) c).doubleValue();
@@ -353,48 +307,81 @@ public final class PythonTableUtils {
return null;
};
}
- if (dataType.equals(Types.DECIMAL())) {
- return c -> c instanceof BigDecimal ? c : null;
+
+ if (logicalType instanceof DecimalType) {
+ int precision = ((DecimalType) logicalType).getPrecision();
+ int scale = ((DecimalType) logicalType).getScale();
+ return c ->
+ c instanceof BigDecimal
+ ? DecimalData.fromBigDecimal((BigDecimal) c, precision, scale)
+ : null;
}
- if (dataType.equals(Types.SQL_DATE())) {
+
+ if (logicalType instanceof DateType) {
return c -> {
if (c instanceof Integer) {
- long millisLocal = ((Integer) c).longValue() * 86400000;
- long millisUtc =
- millisLocal - PythonTableUtils.getOffsetFromLocalMillis(millisLocal);
- return new Date(millisUtc);
+ return (Integer) c;
}
return null;
};
}
- if (dataType.equals(Types.SQL_TIME())) {
+
+ if (logicalType instanceof TimeType) {
+ return c -> {
+ if (c instanceof Integer || c instanceof Long) {
+ long millisLocal = ((Number) c).longValue() / 1000;
+ long millisUtc = millisLocal + getOffsetFromLocalMillis(millisLocal);
+ return (int) millisUtc;
+ }
+ return null;
+ };
+ }
+
+ if (logicalType instanceof TimestampType) {
+ return c ->
+ c instanceof Integer || c instanceof Long
+ ? TimestampData.fromLocalDateTime(
+ Instant.ofEpochMilli(((Number) c).longValue() / 1000)
+ .atZone(ZoneId.systemDefault())
+ .toLocalDateTime())
+ : null;
+ }
+
+ if (logicalType instanceof ZonedTimestampType) {
return c ->
c instanceof Integer || c instanceof Long
- ? new Time(((Number) c).longValue() / 1000)
+ ? TimestampData.fromInstant(
+ Instant.ofEpochMilli(((Number) c).longValue() / 1000))
: null;
}
- if (dataType.equals(Types.SQL_TIMESTAMP())) {
+
+ if (logicalType instanceof LocalZonedTimestampType) {
return c ->
c instanceof Integer || c instanceof Long
- ? new Timestamp(((Number) c).longValue() / 1000)
+ ? TimestampData.fromInstant(
+ Instant.ofEpochMilli(((Number) c).longValue() / 1000))
: null;
}
- if (dataType.equals(org.apache.flink.api.common.typeinfo.Types.INSTANT)) {
+
+ if (logicalType instanceof DayTimeIntervalType) {
return c ->
c instanceof Integer || c instanceof Long
- ? Instant.ofEpochMilli(((Number) c).longValue() / 1000)
+ ? ((Number) c).longValue() / 1000
: null;
}
- if (dataType.equals(Types.INTERVAL_MILLIS())) {
+
+ if (logicalType instanceof YearMonthIntervalType) {
return c ->
c instanceof Integer || c instanceof Long
? ((Number) c).longValue() / 1000
: null;
}
- if (dataType.equals(Types.STRING())) {
- return c -> c != null ? c.toString() : null;
+
+ if (logicalType instanceof CharType || logicalType instanceof VarCharType) {
+ return c -> c != null ? StringData.fromString(c.toString()) : null;
}
- if (dataType.equals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) {
+
+ if (logicalType instanceof BinaryType || logicalType instanceof VarBinaryType) {
return c -> {
if (c instanceof String) {
return ((String) c).getBytes(StandardCharsets.UTF_8);
@@ -405,19 +392,12 @@ public final class PythonTableUtils {
return null;
};
}
- if (dataType instanceof PrimitiveArrayTypeInfo
- || dataType instanceof BasicArrayTypeInfo
- || dataType instanceof ObjectArrayTypeInfo) {
- TypeInformation<?> elementType =
- dataType instanceof PrimitiveArrayTypeInfo
- ? ((PrimitiveArrayTypeInfo<?>) dataType).getComponentType()
- : dataType instanceof BasicArrayTypeInfo
- ? ((BasicArrayTypeInfo<?, ?>) dataType).getComponentInfo()
- : ((ObjectArrayTypeInfo<?, ?>) dataType).getComponentInfo();
- boolean primitive = dataType instanceof PrimitiveArrayTypeInfo;
- Function<Object, Object> elementConverter = converter(elementType, config);
+
+ if (logicalType instanceof ArrayType) {
+ LogicalType elementType = ((ArrayType) logicalType).getElementType();
+ Function<Object, Object> elementConverter = converter(elementType);
BiFunction<Integer, Function<Integer, Object>, Object> arrayConstructor =
- arrayConstructor(elementType, primitive);
+ arrayConstructor(elementType);
return c -> {
int length = -1;
Function<Integer, Object> elementGetter = null;
@@ -435,28 +415,39 @@ public final class PythonTableUtils {
return null;
};
}
- if (dataType instanceof MapTypeInfo) {
- Function<Object, Object> keyConverter =
- converter(((MapTypeInfo<?, ?>) dataType).getKeyTypeInfo(), config);
+
+ if (logicalType instanceof MultisetType) {
+ return c -> c;
+ }
+
+ if (logicalType instanceof MapType) {
+ Function<Object, Object> keyConverter = converter(((MapType) logicalType).getKeyType());
Function<Object, Object> valueConverter =
- converter(((MapTypeInfo<?, ?>) dataType).getValueTypeInfo(), config);
- return c ->
- c instanceof Map
- ? ((Map<?, ?>) c)
+ converter(((MapType) logicalType).getValueType());
+
+ return c -> {
+ if (c instanceof Map) {
+ Map<?, ?> mapData =
+ ((Map<?, ?>) c)
.entrySet().stream()
.collect(
Collectors.toMap(
e -> keyConverter.apply(e.getKey()),
e ->
valueConverter.apply(
- e.getValue())))
- : null;
+ e.getValue())));
+ return new GenericMapData(mapData);
+ } else {
+ return null;
+ }
+ };
}
- if (dataType instanceof RowTypeInfo) {
- TypeInformation<?>[] fieldTypes = ((RowTypeInfo) dataType).getFieldTypes();
+
+ if (logicalType instanceof RowType) {
+ LogicalType[] fieldTypes = logicalType.getChildren().toArray(new LogicalType[0]);
List<Function<Object, Object>> fieldConverters =
Arrays.stream(fieldTypes)
- .map(x -> PythonTableUtils.converter(x, config))
+ .map(PythonTableUtils::converter)
.collect(Collectors.toList());
return c -> {
if (c != null && c.getClass().isArray()) {
@@ -470,8 +461,8 @@ public final class PythonTableUtils {
+ " values are provided.");
}
- Row row = new Row(length - 1);
- row.setKind(RowKind.fromByteValue(((Number) Array.get(c, 0)).byteValue()));
+ GenericRowData row = new GenericRowData(length - 1);
+ row.setRowKind(RowKind.fromByteValue(((Number) Array.get(c, 0)).byteValue()));
for (int i = 0; i < row.getArity(); i++) {
row.setField(i, fieldConverters.get(i).apply(Array.get(c, i + 1)));
@@ -481,56 +472,19 @@ public final class PythonTableUtils {
}
return null;
};
- }
- if (dataType instanceof TupleTypeInfo) {
- TypeInformation<?>[] fieldTypes = ((TupleTypeInfo<?>) dataType).getFieldTypes();
- List<Function<Object, Object>> fieldConverters =
- Arrays.stream(fieldTypes)
- .map(x -> PythonTableUtils.converter(x, config))
- .collect(Collectors.toList());
- return c -> {
- if (c != null && c.getClass().isArray()) {
- int length = Array.getLength(c);
- if (length != fieldTypes.length) {
- throw new IllegalStateException(
- "Input tuple doesn't have expected number of values required by the schema. "
- + fieldTypes.length
- + " fields are required while "
- + length
- + " values are provided.");
- }
-
- Tuple tuple = Tuple.newInstance(length);
- for (int i = 0; i < tuple.getArity(); i++) {
- tuple.setField(fieldConverters.get(i).apply(Array.get(c, i)), i);
- }
-
- return tuple;
- }
- return null;
- };
- }
-
- return c -> {
- if (c == null
- || c.getClass() != byte[].class
- || dataType instanceof PickledByteArrayTypeInfo) {
- return c;
+ } else if (logicalType instanceof StructuredType) {
+ Optional<Class<?>> implClass = ((StructuredType) logicalType).getImplementationClass();
+ if (implClass.isPresent()
+ && (implClass.get() == ListView.class || implClass.get() == MapView.class)) {
+ return converter(logicalType.getChildren().get(0));
}
+ throw new IllegalStateException(
+ "Failed to get the data converter for StructuredType with implementation "
+ + "class: "
+ + implClass.orElse(null));
+ }
- // other typeinfos will use the corresponding serializer to deserialize data.
- byte[] b = (byte[]) c;
- TypeSerializer<?> dataSerializer = dataType.createSerializer(config);
- ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos();
- DataInputViewStreamWrapper baisWrapper = new DataInputViewStreamWrapper(bais);
- bais.setBuffer(b, 0, b.length);
- try {
- return dataSerializer.deserialize(baisWrapper);
- } catch (IOException e) {
- throw new IllegalStateException(
- "Failed to deserialize the object with datatype " + dataType, e);
- }
- };
+ throw new IllegalStateException("Failed to get converter for LogicalType: " + logicalType);
}
private static int getOffsetFromLocalMillis(final long millisLocal) {
diff --git a/flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-python/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
similarity index 92%
copy from flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
copy to flink-python/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 511a31a088b..a1a1ce73c4a 100644
--- a/flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-python/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.table.legacyutils.TestCollectionTableFactory
+org.apache.flink.table.utils.python.PythonDynamicTableFactory
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/ByteMaxAggFunction.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/ByteMaxAggFunction.java
deleted file mode 100644
index ad9ebb44fad..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/ByteMaxAggFunction.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.table.functions.AggregateFunction;
-
-/** {@link AggregateFunction} for {@link Byte}. */
-public class ByteMaxAggFunction extends AggregateFunction<Byte, MaxAccumulator<Byte>> {
-
- private static final long serialVersionUID = 1233840393767061909L;
-
- @Override
- public MaxAccumulator<Byte> createAccumulator() {
- final MaxAccumulator<Byte> acc = new MaxAccumulator<>();
- resetAccumulator(acc);
- return acc;
- }
-
- public void accumulate(MaxAccumulator<Byte> acc, Byte value) {
- if (value != null) {
- if (!acc.f1 || Byte.compare(acc.f0, value) < 0) {
- acc.f0 = value;
- acc.f1 = true;
- }
- }
- }
-
- @Override
- public Byte getValue(MaxAccumulator<Byte> acc) {
- if (acc.f1) {
- return acc.f0;
- } else {
- return null;
- }
- }
-
- public void merge(MaxAccumulator<Byte> acc, Iterable<MaxAccumulator<Byte>> its) {
- its.forEach(
- a -> {
- if (a.f1) {
- accumulate(acc, a.f0);
- }
- });
- }
-
- public void resetAccumulator(MaxAccumulator<Byte> acc) {
- acc.f0 = 0;
- acc.f1 = false;
- }
-
- @Override
- public TypeInformation<MaxAccumulator<Byte>> getAccumulatorType() {
- return new TupleTypeInfo(
- MaxAccumulator.class,
- BasicTypeInfo.BYTE_TYPE_INFO,
- BasicTypeInfo.BOOLEAN_TYPE_INFO);
- }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomAssigner.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomAssigner.java
deleted file mode 100644
index d69cce51acf..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomAssigner.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;
-import org.apache.flink.types.Row;
-
-/** A watermark assigner that throws an exception if a watermark is requested. */
-public class CustomAssigner extends PunctuatedWatermarkAssigner {
- private static final long serialVersionUID = -4900176786361416000L;
-
- @Override
- public Watermark getWatermark(Row row, long timestamp) {
- throw new UnsupportedOperationException();
- }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomExtractor.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomExtractor.java
deleted file mode 100644
index cf6673315aa..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomExtractor.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.expressions.ApiExpressionUtils;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.FieldReferenceExpression;
-import org.apache.flink.table.expressions.ResolvedFieldReference;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.util.Preconditions;
-
-/** A timestamp extractor that looks for the SQL_TIMESTAMP "ts" field. */
-public class CustomExtractor extends TimestampExtractor {
- private static final long serialVersionUID = 6784900460276023738L;
-
- private final String field = "ts";
-
- @Override
- public String[] getArgumentFields() {
- return new String[] {field};
- }
-
- @Override
- public void validateArgumentFields(TypeInformation<?>[] argumentFieldTypes) {
- if (argumentFieldTypes[0] != Types.SQL_TIMESTAMP) {
- throw new ValidationException(
- String.format(
- "Field 'ts' must be of type Timestamp but is of type %s.",
- argumentFieldTypes[0]));
- }
- }
-
- @Override
- public Expression getExpression(ResolvedFieldReference[] fieldAccesses) {
- ResolvedFieldReference fieldAccess = fieldAccesses[0];
- Preconditions.checkArgument(fieldAccess.resultType() == Types.SQL_TIMESTAMP);
- FieldReferenceExpression fieldReferenceExpr =
- new FieldReferenceExpression(
- fieldAccess.name(),
- TypeConversions.fromLegacyInfoToDataType(fieldAccess.resultType()),
- 0,
- fieldAccess.fieldIndex());
- return ApiExpressionUtils.unresolvedCall(
- BuiltInFunctionDefinitions.CAST,
- fieldReferenceExpr,
- ApiExpressionUtils.typeLiteral(DataTypes.BIGINT()));
- }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/MaxAccumulator.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/MaxAccumulator.java
deleted file mode 100644
index 191aa8754a4..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/MaxAccumulator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/** Utility class to make working with tuples more readable. */
-public class MaxAccumulator<T> extends Tuple2<T, Boolean> {
- private static final long serialVersionUID = 6089142148200600733L;
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RichFunc0.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RichFunc0.java
deleted file mode 100644
index 0b28990908e..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RichFunc0.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.table.functions.FunctionContext;
-import org.apache.flink.table.functions.ScalarFunction;
-
-import static org.assertj.core.api.Assertions.fail;
-
-/**
- * Testing scalar function to verify that lifecycle methods are called in the expected order and
- * only once.
- */
-public class RichFunc0 extends ScalarFunction {
- private static final long serialVersionUID = 931156471687322386L;
-
- private boolean openCalled = false;
- private boolean closeCalled = false;
-
- @Override
- public void open(FunctionContext context) throws Exception {
- super.open(context);
- if (openCalled) {
- fail("Open called more than once.");
- } else {
- openCalled = true;
- }
- if (closeCalled) {
- fail("Close called before open.");
- }
- }
-
- public void eval(int index) {
- if (!openCalled) {
- fail("Open was not called before eval.");
- }
- if (closeCalled) {
- fail("Close called before eval.");
- }
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (closeCalled) {
- fail("Close called more than once.");
- } else {
- closeCalled = true;
- }
- if (!openCalled) {
- fail("Open was not called before close.");
- }
- }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowCollector.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowCollector.java
deleted file mode 100644
index 9fed5168e25..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowCollector.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.Row;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-/** A collector for storing results in memory. */
-class RowCollector {
- private static final ArrayDeque<Tuple2<Boolean, Row>> sink = new ArrayDeque<>();
-
- public static void addValue(Tuple2<Boolean, Row> value) {
- synchronized (sink) {
- sink.add(value.copy());
- }
- }
-
- public static List<Tuple2<Boolean, Row>> getAndClearValues() {
- final ArrayList<Tuple2<Boolean, Row>> out = new ArrayList<>(sink);
- sink.clear();
- return out;
- }
-
- public static List<String> retractResults(List<Tuple2<Boolean, Row>> results) {
- final Map<String, Integer> retracted =
- results.stream()
- .collect(
- Collectors.groupingBy(
- r -> r.f1.toString(),
- Collectors.mapping(
- r -> r.f0 ? 1 : -1,
- Collectors.reducing(
- 0, (left, right) -> left + right))));
-
- if (retracted.values().stream().anyMatch(c -> c < 0)) {
- throw new AssertionError("Received retracted rows which have not been accumulated.");
- }
-
- return retracted.entrySet().stream()
- .flatMap(e -> IntStream.range(0, e.getValue()).mapToObj(i -> e.getKey()))
- .collect(Collectors.toList());
- }
-
- public static List<String> upsertResults(List<Tuple2<Boolean, Row>> results, int[] keys) {
- final HashMap<Row, String> upserted = new HashMap<>();
- for (Tuple2<Boolean, Row> r : results) {
- final Row key = Row.project(r.f1, keys);
- if (r.f0) {
- upserted.put(key, r.f1.toString());
- } else {
- upserted.remove(key);
- }
- }
- return new ArrayList<>(upserted.values());
- }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowSink.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowSink.java
deleted file mode 100644
index 0051d9ceb6f..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowSink.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.types.Row;
-
-/** A sink that stores data in the {@link RowCollector}. */
-public class RowSink implements SinkFunction<Tuple2<Boolean, Row>> {
- private static final long serialVersionUID = -7264802354440479084L;
-
- @Override
- public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
- RowCollector.addValue(value);
- }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TableFunc1.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TableFunc1.java
deleted file mode 100644
index 4e34e65106b..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TableFunc1.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.table.functions.TableFunction;
-
-/** A function that splits strings, optionally adding a prefix. */
-public class TableFunc1 extends TableFunction<String> {
-
- private static final long serialVersionUID = -5471603822898040617L;
-
- public void eval(String str) {
- if (str.contains("#")) {
- for (String s : str.split("#")) {
- collect(s);
- }
- }
- }
-
- public void eval(String str, String prefix) {
- if (str.contains("#")) {
- for (String s : str.split("#")) {
- collect(prefix + s);
- }
- }
- }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestAppendSink.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestAppendSink.java
deleted file mode 100644
index cde54670581..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestAppendSink.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-
-/** Testing append sink. */
-public class TestAppendSink implements AppendStreamTableSink<Row> {
-
- private String[] fNames = null;
- private TypeInformation<?>[] fTypes = null;
-
- @Override
- public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
- return dataStream
- .map(
- new MapFunction<Row, Tuple2<Boolean, Row>>() {
- private static final long serialVersionUID = 4671583708680989488L;
-
- @Override
- public Tuple2<Boolean, Row> map(Row value) throws Exception {
- return Tuple2.of(true, value);
- }
- })
- .addSink(new RowSink());
- }
-
- @Override
- public TypeInformation<Row> getOutputType() {
- return new RowTypeInfo(fTypes, fNames);
- }
-
- @Override
- public String[] getFieldNames() {
- return fNames;
- }
-
- @Override
- public TypeInformation<?>[] getFieldTypes() {
- return fTypes;
- }
-
- @Override
- public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
- final TestAppendSink copy = new TestAppendSink();
- copy.fNames = fieldNames;
- copy.fTypes = fieldTypes;
- return copy;
- }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestCollectionTableFactory.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestCollectionTableFactory.java
deleted file mode 100644
index dfa2db4149e..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestCollectionTableFactory.java
+++ /dev/null
@@ -1,307 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.io.CollectionInputFormat;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.Schema;
-import org.apache.flink.table.factories.StreamTableSinkFactory;
-import org.apache.flink.table.factories.StreamTableSourceFactory;
-import org.apache.flink.table.functions.AsyncTableFunction;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.StreamTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.sources.LookupableTableSource;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/** Factory for the testing sinks. */
-public class TestCollectionTableFactory
- implements StreamTableSourceFactory<Row>, StreamTableSinkFactory<Row> {
-
- private static boolean isStreaming = true;
-
- private static final List<Row> SOURCE_DATA = new LinkedList<>();
- private static final List<Row> DIM_DATA = new LinkedList<>();
- private static final List<Row> RESULT = new LinkedList<>();
-
- private long emitIntervalMS = -1L;
-
- @Override
- public TableSource<Row> createTableSource(Map<String, String> properties) {
- return getCollectionSource(properties, isStreaming);
- }
-
- @Override
- public TableSink<Row> createTableSink(Map<String, String> properties) {
- return getCollectionSink(properties);
- }
-
- @Override
- public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
- return getCollectionSource(properties, true);
- }
-
- @Override
- public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
- return getCollectionSink(properties);
- }
-
- private CollectionTableSource getCollectionSource(
- Map<String, String> props, boolean isStreaming) {
- final DescriptorProperties properties = new DescriptorProperties();
- properties.putProperties(props);
- final TableSchema schema = properties.getTableSchema(Schema.SCHEMA);
- final Optional<Integer> parallelism = properties.getOptionalInt("parallelism");
- return new CollectionTableSource(emitIntervalMS, schema, isStreaming, parallelism);
- }
-
- private CollectionTableSink getCollectionSink(Map<String, String> props) {
- final DescriptorProperties properties = new DescriptorProperties();
- properties.putProperties(props);
- final TableSchema schema = properties.getTableSchema(Schema.SCHEMA);
- return new CollectionTableSink((RowTypeInfo) schema.toRowType());
- }
-
- @Override
- public Map<String, String> requiredContext() {
- final HashMap<String, String> context = new HashMap<>();
- context.put(ConnectorDescriptorValidator.CONNECTOR, "COLLECTION");
- return context;
- }
-
- @Override
- public List<String> supportedProperties() {
- return Arrays.asList("*");
- }
-
- private static class CollectionTableSource
- implements StreamTableSource<Row>, LookupableTableSource<Row> {
- private final long emitIntervalMs;
- private final TableSchema schema;
- private final boolean isStreaming;
- private final Optional<Integer> parallelism;
- private final TypeInformation<Row> rowType;
-
- private CollectionTableSource(
- long emitIntervalMs,
- TableSchema schema,
- boolean isStreaming,
- Optional<Integer> parallelism) {
- this.emitIntervalMs = emitIntervalMs;
- this.schema = schema;
- this.isStreaming = isStreaming;
- this.parallelism = parallelism;
- this.rowType = schema.toRowType();
- }
-
- @Override
- public boolean isBounded() {
- return !isStreaming;
- }
-
- @Override
- public DataStream<Row> getDataStream(StreamExecutionEnvironment streamEnv) {
- final DataStreamSource<Row> dataStream =
- streamEnv.createInput(
- new TestCollectionInputFormat<>(
- emitIntervalMs,
- SOURCE_DATA,
- rowType.createSerializer(new ExecutionConfig())),
- rowType);
- if (parallelism.isPresent()) {
- dataStream.setParallelism(parallelism.get());
- }
- return dataStream;
- }
-
- @Override
- public TypeInformation<Row> getReturnType() {
- return rowType;
- }
-
- @Override
- public TableSchema getTableSchema() {
- return schema;
- }
-
- @Override
- public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
- final String[] schemaFieldNames = schema.getFieldNames();
- final int[] keys =
- Arrays.stream(lookupKeys)
- .map(
- k -> {
- for (int x = 0; x < schemaFieldNames.length; x++) {
- if (k.equals(schemaFieldNames[x])) {
- return x;
- }
- }
- throw new IllegalStateException();
- })
- .mapToInt(i -> i)
- .toArray();
-
- return new TemporalTableFetcher(DIM_DATA, keys);
- }
-
- @Override
- public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys) {
- return null;
- }
-
- @Override
- public boolean isAsyncEnabled() {
- return false;
- }
- }
-
- private static class CollectionTableSink implements AppendStreamTableSink<Row> {
- private final RowTypeInfo outputType;
-
- private CollectionTableSink(RowTypeInfo outputType) {
- this.outputType = outputType;
- }
-
- @Override
- public RowTypeInfo getOutputType() {
- return outputType;
- }
-
- @Override
- public String[] getFieldNames() {
- return outputType.getFieldNames();
- }
-
- @Override
- public TypeInformation<?>[] getFieldTypes() {
- return outputType.getFieldTypes();
- }
-
- @Override
- public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
- return dataStream.addSink(new UnsafeMemorySinkFunction(outputType)).setParallelism(1);
- }
-
- @Override
- public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
- return this;
- }
- }
-
- private static class UnsafeMemorySinkFunction extends RichSinkFunction<Row> {
- private static final long serialVersionUID = -7880686562734099699L;
-
- private final TypeInformation<Row> outputType;
- private TypeSerializer<Row> serializer = null;
-
- private UnsafeMemorySinkFunction(TypeInformation<Row> outputType) {
- this.outputType = outputType;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- serializer = outputType.createSerializer(new ExecutionConfig());
- }
-
- @Override
- public void invoke(Row row, Context context) throws Exception {
- RESULT.add(serializer.copy(row));
- }
- }
-
- private static class TestCollectionInputFormat<T> extends CollectionInputFormat<T> {
-
- private static final long serialVersionUID = -3222731547793350189L;
-
- private final long emitIntervalMs;
-
- public TestCollectionInputFormat(
- long emitIntervalMs, Collection<T> dataSet, TypeSerializer<T> serializer) {
- super(dataSet, serializer);
- this.emitIntervalMs = emitIntervalMs;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- if (emitIntervalMs > 0) {
- try {
- Thread.sleep(emitIntervalMs);
- } catch (InterruptedException e) {
- }
- }
- return super.reachedEnd();
- }
- }
-
- private static class TemporalTableFetcher extends TableFunction<Row> {
- private static final long serialVersionUID = 6248306950388784015L;
-
- private final List<Row> dimData;
- private final int[] keys;
-
- private TemporalTableFetcher(List<Row> dimData, int[] keys) {
- this.dimData = dimData;
- this.keys = keys;
- }
-
- public void eval(Row values) {
- for (Row data : dimData) {
- boolean matched = true;
- int idx = 0;
- while (matched && idx < keys.length) {
- final Object dimField = data.getField(keys[idx]);
- final Object inputField = values.getField(idx);
- matched = dimField.equals(inputField);
- idx += 1;
- }
- if (matched) {
- // copy the row data
- final Row ret = new Row(data.getArity());
- for (int x = 0; x < data.getArity(); x++) {
- ret.setField(x, data.getField(x));
- }
- collect(ret);
- }
- }
- }
- }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestRetractSink.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestRetractSink.java
deleted file mode 100644
index 449aea756e4..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestRetractSink.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.table.sinks.RetractStreamTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-
-/** Testing retract sink. */
-public class TestRetractSink implements RetractStreamTableSink<Row> {
-
- private String[] fNames = null;
- private TypeInformation<?>[] fTypes = null;
-
- @Override
- public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
- return dataStream.addSink(new RowSink());
- }
-
- @Override
- public TypeInformation<Row> getRecordType() {
- return new RowTypeInfo(fTypes, fNames);
- }
-
- @Override
- public String[] getFieldNames() {
- return fNames;
- }
-
- @Override
- public TypeInformation<?>[] getFieldTypes() {
- return fTypes;
- }
-
- @Override
- public TableSink<Tuple2<Boolean, Row>> configure(
- String[] fieldNames, TypeInformation<?>[] fieldTypes) {
- final TestRetractSink copy = new TestRetractSink();
- copy.fNames = fieldNames;
- copy.fTypes = fieldTypes;
- return copy;
- }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestUpsertSink.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestUpsertSink.java
deleted file mode 100644
index 1740d5380a8..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestUpsertSink.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.sinks.UpsertStreamTableSink;
-import org.apache.flink.types.Row;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** testing upsert sink. */
-public class TestUpsertSink implements UpsertStreamTableSink<Row> {
-
- private String[] fNames = null;
- private TypeInformation<?>[] fTypes = null;
-
- private final String[] expectedKeys;
- private final boolean expectedIsAppendOnly;
-
- public TestUpsertSink(String[] expectedKeys, boolean expectedIsAppendOnly) {
- this.expectedKeys = expectedKeys;
- this.expectedIsAppendOnly = expectedIsAppendOnly;
- }
-
- @Override
- public void setKeyFields(String[] keys) {
- assertThat(keys)
- .as("Provided key fields do not match expected keys")
- .containsExactlyInAnyOrder(expectedKeys);
- }
-
- @Override
- public void setIsAppendOnly(Boolean isAppendOnly) {
- assertThat(isAppendOnly)
- .as("Provided isAppendOnly does not match expected isAppendOnly")
- .isEqualTo(expectedIsAppendOnly);
- }
-
- @Override
- public TypeInformation<Row> getRecordType() {
- return new RowTypeInfo(fTypes, fNames);
- }
-
- @Override
- public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> s) {
- return s.addSink(new RowSink());
- }
-
- @Override
- public String[] getFieldNames() {
- return fNames;
- }
-
- @Override
- public TypeInformation<?>[] getFieldTypes() {
- return fTypes;
- }
-
- @Override
- public TableSink<Tuple2<Boolean, Row>> configure(
- String[] fieldNames, TypeInformation<?>[] fieldTypes) {
- final TestUpsertSink copy = new TestUpsertSink(expectedKeys, expectedIsAppendOnly);
- copy.fNames = fieldNames;
- copy.fTypes = fieldTypes;
- return copy;
- }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/utils/TestCollectionTableFactory.java b/flink-python/src/test/java/org/apache/flink/table/utils/TestCollectionTableFactory.java
new file mode 100644
index 00000000000..125c1317352
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/utils/TestCollectionTableFactory.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.io.CollectionInputFormat;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Testing CollectionTableFactory that creates collection DynamicTableSource and DynamicTableSink.
+ */
+public class TestCollectionTableFactory
+ implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+ public static boolean isStreaming = true;
+ private static final LinkedList<Row> SOURCE_DATA = new LinkedList<>();
+ private static final LinkedList<Row> DIM_DATA = new LinkedList<>();
+ private static final LinkedList<Row> RESULT = new LinkedList<>();
+
+ private static long emitIntervalMS = -1L;
+
+ public static void initData(List<Row> sourceData, List<Row> dimData, Long emitInterval) {
+ SOURCE_DATA.addAll(sourceData);
+ DIM_DATA.addAll(dimData);
+ emitIntervalMS = emitInterval == null ? -1L : emitInterval;
+ }
+
+ public static void reset() {
+ RESULT.clear();
+ SOURCE_DATA.clear();
+ DIM_DATA.clear();
+ emitIntervalMS = -1L;
+ }
+
+ public static CollectionTableSource getCollectionSource(
+ ResolvedCatalogTable catalogTable, boolean isStreaming) {
+ String parallelismProp = catalogTable.getOptions().getOrDefault("parallelism", null);
+ Optional<Integer> parallelism;
+ if (parallelismProp == null) {
+ parallelism = Optional.empty();
+ } else {
+ parallelism = Optional.of(Integer.parseInt(parallelismProp));
+ }
+ return new CollectionTableSource(
+ emitIntervalMS,
+ catalogTable.getResolvedSchema().toSourceRowDataType(),
+ isStreaming,
+ parallelism);
+ }
+
+ public static CollectionTableSink getCollectionSink(ResolvedCatalogTable catalogTable) {
+ return new CollectionTableSink(catalogTable.getResolvedSchema().toSinkRowDataType());
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ return getCollectionSource(
+ context.getCatalogTable(), TestCollectionTableFactory.isStreaming);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return "COLLECTION";
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ return getCollectionSink(context.getCatalogTable());
+ }
+
+ static class CollectionTableSource implements ScanTableSource, LookupTableSource {
+
+ private final Long emitIntervalMS;
+ private final DataType rowType;
+ private final boolean isStreaming;
+ private final Optional<Integer> parallelism;
+
+ public CollectionTableSource(
+ Long emitIntervalMS,
+ DataType rowType,
+ boolean isStreaming,
+ Optional<Integer> parallelism) {
+ this.emitIntervalMS = emitIntervalMS;
+ this.rowType = rowType;
+ this.isStreaming = isStreaming;
+ this.parallelism = parallelism;
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new CollectionTableSource(emitIntervalMS, rowType, isStreaming, parallelism);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "CollectionTableSource";
+ }
+
+ @Override
+ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+ int[] lookupIndices = Arrays.stream(context.getKeys()).mapToInt(k -> k[0]).toArray();
+ return TableFunctionProvider.of(new TemporalTableFetcher(DIM_DATA, lookupIndices));
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+ TypeInformation<RowData> type = runtimeProviderContext.createTypeInformation(rowType);
+ TypeSerializer<RowData> serializer = type.createSerializer(new ExecutionConfig());
+ DataStructureConverter converter =
+ runtimeProviderContext.createDataStructureConverter(rowType);
+ List<RowData> rowData =
+ SOURCE_DATA.stream()
+ .map(row -> (RowData) converter.toInternal(row))
+ .collect(Collectors.toList());
+
+ return new DataStreamScanProvider() {
+ @Override
+ public DataStream<RowData> produceDataStream(
+ ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
+ DataStreamSource<RowData> dataStream =
+ execEnv.createInput(
+ new TestCollectionInputFormat<>(
+ emitIntervalMS, rowData, serializer),
+ type);
+ parallelism.ifPresent(dataStream::setParallelism);
+ return dataStream;
+ }
+
+ @Override
+ public boolean isBounded() {
+ return !isStreaming;
+ }
+ };
+ }
+ }
+
+ static class CollectionTableSink implements DynamicTableSink {
+
+ private final DataType outputType;
+
+ public CollectionTableSink(DataType outputType) {
+ this.outputType = outputType;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return requestedMode;
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ TypeInformation<Row> typeInformation = context.createTypeInformation(outputType);
+ DataStructureConverter converter = context.createDataStructureConverter(outputType);
+ return new DataStreamSinkProvider() {
+ @Override
+ public DataStreamSink<?> consumeDataStream(
+ ProviderContext providerContext, DataStream<RowData> dataStream) {
+ return dataStream
+ .addSink(new UnsafeMemorySinkFunction(typeInformation, converter))
+ .setParallelism(1);
+ }
+ };
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new CollectionTableSink(outputType);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return String.format("CollectionTableSink(%s)", outputType);
+ }
+ }
+
+ static class TestCollectionInputFormat<T> extends CollectionInputFormat<T> {
+ private final long emitIntervalMs;
+
+ public TestCollectionInputFormat(
+ long emitIntervalMs, Collection<T> dataSet, TypeSerializer<T> serializer) {
+ super(dataSet, serializer);
+ this.emitIntervalMs = emitIntervalMs;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ if (emitIntervalMs > 0) {
+ try {
+ Thread.sleep(emitIntervalMs);
+ } catch (Exception ignored) {
+
+ }
+ }
+ return super.reachedEnd();
+ }
+ }
+
+ static class TemporalTableFetcher extends TableFunction<Row> {
+ private final LinkedList<Row> dimData;
+ private final int[] keyes;
+
+ public TemporalTableFetcher(LinkedList<Row> dimData, int[] keyes) {
+ this.dimData = dimData;
+ this.keyes = keyes;
+ }
+
+ public void eval(Object... values) {
+ for (Row data : dimData) {
+ boolean matched = true;
+ int idx = 0;
+ while (matched && idx < keyes.length) {
+ Object dimField = data.getField(keyes[idx]);
+ Object inputField = values[idx];
+ if (dimField != null) {
+ matched = dimField.equals(inputField);
+ } else {
+ matched = inputField == null;
+ }
+ idx += 1;
+ }
+
+ if (matched) {
+ Row ret = new Row(data.getArity());
+ for (int i = 0; i < data.getArity(); i++) {
+ ret.setField(i, data.getField(i));
+ }
+ collect(ret);
+ }
+ }
+ }
+ }
+
+ static class UnsafeMemorySinkFunction extends RichSinkFunction<RowData> {
+ private TypeSerializer<Row> serializer;
+
+ private final TypeInformation<Row> outputType;
+ private final DynamicTableSink.DataStructureConverter converter;
+
+ public UnsafeMemorySinkFunction(
+ TypeInformation<Row> outputType,
+ DynamicTableSink.DataStructureConverter converter) {
+ this.outputType = outputType;
+ this.converter = converter;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ serializer = outputType.createSerializer(new ExecutionConfig());
+ }
+
+ @Override
+ public void invoke(RowData value, Context context) throws Exception {
+ RESULT.add(serializer.copy((Row) converter.toExternal(value)));
+ }
+ }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/utils/TestingDescriptors.java b/flink-python/src/test/java/org/apache/flink/table/utils/TestingDescriptors.java
new file mode 100644
index 00000000000..6b0a85b44ca
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/utils/TestingDescriptors.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
+import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Testing Descriptors for python tests. */
+public class TestingDescriptors {
+
+ /** CustomAssigner for testing. */
+ public static class CustomAssigner extends PunctuatedWatermarkAssigner {
+ @Override
+ public Watermark getWatermark(Row row, long timestamp) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /** CustomExtractor for testing. */
+ public static class CustomExtractor extends TimestampExtractor {
+
+ private final String field;
+
+ public CustomExtractor(String field) {
+ this.field = field;
+ }
+
+ public CustomExtractor() {
+ this("ts");
+ }
+
+ @Override
+ public String[] getArgumentFields() {
+ return new String[] {field};
+ }
+
+ @Override
+ public void validateArgumentFields(TypeInformation<?>[] argumentFieldTypes) {
+ if (argumentFieldTypes[0] != Types.SQL_TIMESTAMP) {
+ throw new ValidationException(
+ String.format(
+ "Field 'ts' must be of type Timestamp " + "but is of type %s.",
+ argumentFieldTypes[0]));
+ }
+ }
+
+ @Override
+ public Expression getExpression(ResolvedFieldReference[] fieldAccesses) {
+ ResolvedFieldReference fieldAccess = fieldAccesses[0];
+ checkState(fieldAccess.resultType() == Types.SQL_TIMESTAMP);
+ FieldReferenceExpression fieldReferenceExpr =
+ new FieldReferenceExpression(
+ fieldAccess.name(),
+ TypeConversions.fromLegacyInfoToDataType(fieldAccess.resultType()),
+ 0,
+ fieldAccess.fieldIndex());
+ return ApiExpressionUtils.unresolvedCall(
+ BuiltInFunctionDefinitions.CAST,
+ fieldReferenceExpr,
+ ApiExpressionUtils.typeLiteral(DataTypes.BIGINT()));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof CustomExtractor) {
+ return field.equals(((CustomExtractor) o).field);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(field);
+ }
+ }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/utils/TestingFunctions.java b/flink-python/src/test/java/org/apache/flink/table/utils/TestingFunctions.java
new file mode 100644
index 00000000000..9bbbf570f53
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/utils/TestingFunctions.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+
+import org.junit.Assert;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+/** Testing Function for python udf tests. */
+public class TestingFunctions {
+
+ /** RichFunc0 for testing. */
+ public static class RichFunc0 extends ScalarFunction {
+ private boolean openCalled = false;
+ private boolean closeCalled = false;
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ super.open(context);
+ if (openCalled) {
+ Assert.fail("Open called more than once.");
+ } else {
+ openCalled = true;
+ }
+
+ if (closeCalled) {
+ Assert.fail("Close called before open.");
+ }
+ }
+
+ public Integer eval(Integer index) {
+ if (!openCalled) {
+ Assert.fail("Open was not called before eval.");
+ }
+
+ if (closeCalled) {
+ Assert.fail("Close called before eval.");
+ }
+
+ return index + 1;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (closeCalled) {
+ Assert.fail("Close called more than once.");
+ } else {
+ closeCalled = true;
+ }
+
+ if (!openCalled) {
+ Assert.fail("Open was not called before close.");
+ }
+ }
+ }
+
+ /** MaxAccumulator for testing. */
+ static class MaxAccumulator<T extends Comparable> extends Tuple2<T, Boolean> {}
+
+ /** MaxAggFunction for testing. */
+ abstract static class MaxAggFunction<T extends Comparable>
+ extends AggregateFunction<T, MaxAccumulator<T>> {
+ @Override
+ public T getValue(MaxAccumulator<T> accumulator) {
+ if (accumulator.f1) {
+ return accumulator.f0;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public MaxAccumulator<T> createAccumulator() {
+ MaxAccumulator<T> accumulator = new MaxAccumulator<>();
+ accumulator.f0 = getInitValue();
+ accumulator.f1 = false;
+ return accumulator;
+ }
+
+ public void accumulate(MaxAccumulator<T> acc, Object value) {
+ if (value != null) {
+ T t = (T) value;
+ if (!acc.f1 || acc.f0.compareTo(t) < 0) {
+ acc.f0 = t;
+ acc.f1 = true;
+ }
+ }
+ }
+
+ public void merge(MaxAccumulator<T> accumulator, Iterable<MaxAccumulator<T>> its) {
+ Iterator<MaxAccumulator<T>> iter = its.iterator();
+ while (iter.hasNext()) {
+ MaxAccumulator<T> a = iter.next();
+ if (a.f1) {
+ accumulate(accumulator, a.f0);
+ }
+ }
+ }
+
+ public void resetAccumulator(MaxAccumulator<T> accumulator) {
+ accumulator.f0 = getInitValue();
+ accumulator.f1 = false;
+ }
+
+ abstract T getInitValue();
+
+ abstract TypeInformation<?> getValueTypeInfo();
+ }
+
+ /** ByteMaxAggFunction for testing. */
+ public static class ByteMaxAggFunction extends MaxAggFunction<Byte> {
+
+ @Override
+ Byte getInitValue() {
+ return 0;
+ }
+
+ @Override
+ TypeInformation<?> getValueTypeInfo() {
+ return Types.BYTE;
+ }
+ }
+
+ /** TableFunc1 for testing. */
+ public static class TableFunc1 extends TableFunction<String> {
+ public void eval(String str) {
+ if (str.contains("#")) {
+ Arrays.stream(str.split("#")).forEach(this::collect);
+ }
+ }
+
+ public void eval(String str, String prefix) {
+ if (str.contains("#")) {
+ Arrays.stream(str.split("#")).forEach(s -> collect(prefix + s));
+ }
+ }
+ }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinkTableFactory.java b/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinkTableFactory.java
new file mode 100644
index 00000000000..f27c0538425
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinkTableFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** TableFactory for TestingSinks. */
+public class TestingSinkTableFactory implements DynamicTableSinkFactory {
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ return new TestingSinks.TestAppendingSink(context.getPhysicalRowDataType());
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return "test-sink";
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return new HashSet<>();
+ }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinks.java b/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinks.java
new file mode 100644
index 00000000000..b7645a67a03
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinks.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Testing Sinks that collects test output data for validation. */
+public class TestingSinks {
+
+ /** TestAppendingSink for testing. */
+ public static class TestAppendingSink implements DynamicTableSink {
+ private final DataType rowDataType;
+
+ public TestAppendingSink(DataType rowDataType) {
+ this.rowDataType = rowDataType;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return requestedMode;
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ final DataStructureConverter converter =
+ context.createDataStructureConverter(rowDataType);
+ return (DataStreamSinkProvider)
+ (providerContext, dataStream) -> dataStream.addSink(new RowSink(converter));
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new TestAppendingSink(rowDataType);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return String.format("TestingAppendSink(%s)", DataType.getFields(rowDataType));
+ }
+ }
+
+ /** RowSink for testing. */
+ static class RowSink implements SinkFunction<RowData> {
+ private final DynamicTableSink.DataStructureConverter converter;
+
+ public RowSink(DynamicTableSink.DataStructureConverter converter) {
+ this.converter = converter;
+ }
+
+ @Override
+ public void invoke(RowData value, Context context) {
+ RowKind rowKind = value.getRowKind();
+ Row data = (Row) converter.toExternal(value);
+ if (rowKind == RowKind.INSERT || rowKind == RowKind.UPDATE_AFTER) {
+ RowCollector.addValue(Tuple2.of(true, data));
+ } else {
+ RowCollector.addValue(Tuple2.of(false, data));
+ }
+ }
+ }
+
+ /** RowCollector for testing. */
+ public static class RowCollector {
+ private static final List<Tuple2<Boolean, Row>> SINK = new ArrayList<>();
+
+ public static void addValue(Tuple2<Boolean, Row> value) {
+ final Tuple2<Boolean, Row> copy = new Tuple2<>(value.f0, value.f1);
+ synchronized (SINK) {
+ SINK.add(copy);
+ }
+ }
+
+ public static List<Tuple2<Boolean, Row>> getAndClearValues() {
+ final List<Tuple2<Boolean, Row>> out = new ArrayList<>(SINK);
+ SINK.clear();
+ return out;
+ }
+
+ public static List<String> retractResults(List<Tuple2<Boolean, Row>> results) {
+ Map<String, Integer> retractedResult = new HashMap<>();
+ results.forEach(
+ v -> {
+ int cnt = retractedResult.getOrDefault(v.f1.toString(), 0);
+ if (v.f0) {
+ retractedResult.put(v.f1.toString(), cnt + 1);
+ } else {
+ retractedResult.put(v.f1.toString(), cnt - 1);
+ }
+ });
+
+ if (retractedResult.entrySet().stream().allMatch(entry -> entry.getValue() < 0)) {
+ throw new AssertionError(
+ "Received retracted rows which have not been " + "accumulated.");
+ }
+ List<String> retractedString = new ArrayList<>();
+ retractedResult.forEach(
+ (k, v) -> {
+ for (int i = 0; i < v; i++) {
+ retractedString.add(k);
+ }
+ });
+
+ return retractedString;
+ }
+
+ public static List<String> upsertResults(List<Tuple2<Boolean, Row>> results, int[] keys) {
+ Map<Row, String> upsertResult = new HashMap<>();
+ results.forEach(
+ v -> {
+ Row key = Row.project(v.f1, keys);
+ if (v.f0) {
+ upsertResult.put(key, v.f1.toString());
+ } else {
+ upsertResult.remove(key);
+ }
+ });
+ return new ArrayList<>(upsertResult.values());
+ }
+ }
+}
diff --git a/flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
similarity index 87%
rename from flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
rename to flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 511a31a088b..679e7f4a8c6 100644
--- a/flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.table.legacyutils.TestCollectionTableFactory
+org.apache.flink.table.utils.TestCollectionTableFactory
+org.apache.flink.table.utils.TestingSinkTableFactory
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
index c6fa42a98df..b3dfb12730e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
@@ -39,8 +39,8 @@ public class PythonScalarFunction extends ScalarFunction implements PythonFuncti
private final String name;
private final byte[] serializedScalarFunction;
- private final TypeInformation[] inputTypes;
- private final TypeInformation resultType;
+ private final DataType[] inputTypes;
+ private final DataType resultType;
private final PythonFunctionKind pythonFunctionKind;
private final boolean deterministic;
private final PythonEnv pythonEnv;
@@ -49,8 +49,8 @@ public class PythonScalarFunction extends ScalarFunction implements PythonFuncti
public PythonScalarFunction(
String name,
byte[] serializedScalarFunction,
- TypeInformation[] inputTypes,
- TypeInformation resultType,
+ DataType[] inputTypes,
+ DataType resultType,
PythonFunctionKind pythonFunctionKind,
boolean deterministic,
boolean takesRowAsInput,
@@ -98,7 +98,7 @@ public class PythonScalarFunction extends ScalarFunction implements PythonFuncti
@Override
public TypeInformation[] getParameterTypes(Class[] signature) {
if (inputTypes != null) {
- return inputTypes;
+ return TypeConversions.fromDataTypeToLegacyInfo(inputTypes);
} else {
return super.getParameterTypes(signature);
}
@@ -106,7 +106,7 @@ public class PythonScalarFunction extends ScalarFunction implements PythonFuncti
@Override
public TypeInformation getResultType(Class[] signature) {
- return resultType;
+ return TypeConversions.fromDataTypeToLegacyInfo(resultType);
}
@Override
@@ -114,15 +114,10 @@ public class PythonScalarFunction extends ScalarFunction implements PythonFuncti
TypeInference.Builder builder = TypeInference.newBuilder();
if (inputTypes != null) {
final List<DataType> argumentDataTypes =
- Stream.of(inputTypes)
- .map(TypeConversions::fromLegacyInfoToDataType)
- .collect(Collectors.toList());
+ Stream.of(inputTypes).collect(Collectors.toList());
builder.typedArguments(argumentDataTypes);
}
- return builder.outputTypeStrategy(
- TypeStrategies.explicit(
- TypeConversions.fromLegacyInfoToDataType(resultType)))
- .build();
+ return builder.outputTypeStrategy(TypeStrategies.explicit(resultType)).build();
}
@Override
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
index fc920aee7d9..29df9416efa 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.functions.python;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
@@ -41,8 +40,8 @@ public class PythonTableFunction extends TableFunction<Row> implements PythonFun
private final String name;
private final byte[] serializedScalarFunction;
- private final TypeInformation[] inputTypes;
- private final RowTypeInfo resultType;
+ private final DataType[] inputTypes;
+ private final DataType resultType;
private final PythonFunctionKind pythonFunctionKind;
private final boolean deterministic;
private final PythonEnv pythonEnv;
@@ -51,8 +50,8 @@ public class PythonTableFunction extends TableFunction<Row> implements PythonFun
public PythonTableFunction(
String name,
byte[] serializedScalarFunction,
- TypeInformation[] inputTypes,
- RowTypeInfo resultType,
+ DataType[] inputTypes,
+ DataType resultType,
PythonFunctionKind pythonFunctionKind,
boolean deterministic,
boolean takesRowAsInput,
@@ -100,7 +99,7 @@ public class PythonTableFunction extends TableFunction<Row> implements PythonFun
@Override
public TypeInformation[] getParameterTypes(Class[] signature) {
if (inputTypes != null) {
- return inputTypes;
+ return TypeConversions.fromDataTypeToLegacyInfo(inputTypes);
} else {
return super.getParameterTypes(signature);
}
@@ -108,7 +107,7 @@ public class PythonTableFunction extends TableFunction<Row> implements PythonFun
@Override
public TypeInformation<Row> getResultType() {
- return resultType;
+ return (TypeInformation<Row>) TypeConversions.fromDataTypeToLegacyInfo(resultType);
}
@Override
@@ -116,15 +115,10 @@ public class PythonTableFunction extends TableFunction<Row> implements PythonFun
TypeInference.Builder builder = TypeInference.newBuilder();
if (inputTypes != null) {
final List<DataType> argumentDataTypes =
- Stream.of(inputTypes)
- .map(TypeConversions::fromLegacyInfoToDataType)
- .collect(Collectors.toList());
+ Stream.of(inputTypes).collect(Collectors.toList());
builder.typedArguments(argumentDataTypes);
}
- return builder.outputTypeStrategy(
- TypeStrategies.explicit(
- TypeConversions.fromLegacyInfoToDataType(resultType)))
- .build();
+ return builder.outputTypeStrategy(TypeStrategies.explicit(resultType)).build();
}
@Override