You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/07/05 11:17:01 UTC

[flink] branch master updated: [FLINK-25231][python]Update Pyflink to use the new type system

This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a951831ddda [FLINK-25231][python]Update Pyflink to use the new type system
a951831ddda is described below

commit a951831dddaa163e7d97d4118984d238bb0164a1
Author: acquachen <ac...@gmail.com>
AuthorDate: Fri Mar 18 10:23:03 2022 +0800

    [FLINK-25231][python]Update Pyflink to use the new type system
    
    This closes #19140.
---
 .../datastream/stream_execution_environment.py     |   9 +-
 flink-python/pyflink/table/descriptors.py          |   4 +-
 flink-python/pyflink/table/expressions.py          |  21 +-
 flink-python/pyflink/table/sinks.py                |  12 +-
 flink-python/pyflink/table/sources.py              |   5 +-
 flink-python/pyflink/table/table_environment.py    |  25 +-
 flink-python/pyflink/table/table_result.py         |   4 +-
 flink-python/pyflink/table/table_schema.py         |  15 +-
 flink-python/pyflink/table/tests/test_calc.py      |  38 +-
 flink-python/pyflink/table/tests/test_correlate.py |  20 +-
 .../pyflink/table/tests/test_dependency.py         |  38 +-
 .../pyflink/table/tests/test_descriptor.py         |  24 +-
 .../pyflink/table/tests/test_pandas_conversion.py  |  29 +-
 .../pyflink/table/tests/test_pandas_udaf.py        | 201 ++++----
 .../pyflink/table/tests/test_pandas_udf.py         |  38 +-
 .../table/tests/test_row_based_operation.py        |  58 +--
 flink-python/pyflink/table/tests/test_sql.py       |  20 +-
 .../table/tests/test_table_environment_api.py      |  39 +-
 flink-python/pyflink/table/tests/test_types.py     |  56 +--
 flink-python/pyflink/table/tests/test_udaf.py      |  95 ++--
 flink-python/pyflink/table/tests/test_udf.py       | 111 ++---
 flink-python/pyflink/table/tests/test_udtf.py      |  15 +-
 flink-python/pyflink/table/types.py                | 208 ++------
 flink-python/pyflink/table/udf.py                  |  12 +-
 flink-python/pyflink/testing/source_sink_utils.py  |  39 +-
 .../flink/streaming/api/utils/PythonTypeUtils.java | 531 +++++++++++++++++++++
 .../utils/python/PythonDynamicTableFactory.java    |  69 +++
 .../utils/python/PythonDynamicTableOptions.java    |  38 ++
 .../utils/python/PythonDynamicTableSource.java     |  72 +++
 .../utils/python/PythonInputFormatTableSource.java |  68 ---
 .../flink/table/utils/python/PythonTableUtils.java | 512 +++++++++-----------
 .../org.apache.flink.table.factories.Factory}      |   2 +-
 .../table/legacyutils/ByteMaxAggFunction.java      |  76 ---
 .../flink/table/legacyutils/CustomAssigner.java    |  32 --
 .../flink/table/legacyutils/CustomExtractor.java   |  69 ---
 .../flink/table/legacyutils/MaxAccumulator.java    |  25 -
 .../apache/flink/table/legacyutils/RichFunc0.java  |  69 ---
 .../flink/table/legacyutils/RowCollector.java      |  79 ---
 .../apache/flink/table/legacyutils/RowSink.java    |  32 --
 .../apache/flink/table/legacyutils/TableFunc1.java |  42 --
 .../flink/table/legacyutils/TestAppendSink.java    |  73 ---
 .../legacyutils/TestCollectionTableFactory.java    | 307 ------------
 .../flink/table/legacyutils/TestRetractSink.java   |  63 ---
 .../flink/table/legacyutils/TestUpsertSink.java    |  87 ----
 .../table/utils/TestCollectionTableFactory.java    | 321 +++++++++++++
 .../flink/table/utils/TestingDescriptors.java      | 109 +++++
 .../apache/flink/table/utils/TestingFunctions.java | 164 +++++++
 .../flink/table/utils/TestingSinkTableFactory.java |  50 ++
 .../org/apache/flink/table/utils/TestingSinks.java | 149 ++++++
 ...ry => org.apache.flink.table.factories.Factory} |   3 +-
 .../functions/python/PythonScalarFunction.java     |  21 +-
 .../functions/python/PythonTableFunction.java      |  22 +-
 52 files changed, 2246 insertions(+), 1975 deletions(-)

diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py
index 8418f20bc01..7d5d9d4d4da 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -933,12 +933,11 @@ class StreamExecutionEnvironment(object):
             else:
                 j_objs = gateway.jvm.PythonBridgeUtils.readPythonObjects(temp_file.name)
                 out_put_type_info = type_info
-            # Since flink python module depends on table module, we can make use of utils of it when
-            # implementing python DataStream API.
-            PythonTableUtils = gateway.jvm\
-                .org.apache.flink.table.utils.python.PythonTableUtils
+
+            PythonTypeUtils = gateway.jvm\
+                .org.apache.flink.streaming.api.utils.PythonTypeUtils
             execution_config = self._j_stream_execution_environment.getConfig()
-            j_input_format = PythonTableUtils.getCollectionInputFormat(
+            j_input_format = PythonTypeUtils.getCollectionInputFormat(
                 j_objs,
                 out_put_type_info.get_java_type_info(),
                 execution_config
diff --git a/flink-python/pyflink/table/descriptors.py b/flink-python/pyflink/table/descriptors.py
index 1d1ea823330..c2c1e643a60 100644
--- a/flink-python/pyflink/table/descriptors.py
+++ b/flink-python/pyflink/table/descriptors.py
@@ -24,7 +24,7 @@ from typing import Dict, Union
 
 from pyflink.java_gateway import get_gateway
 from pyflink.table.table_schema import TableSchema
-from pyflink.table.types import _to_java_type, DataType
+from pyflink.table.types import DataType, _to_java_data_type
 
 __all__ = [
     'Rowtime',
@@ -219,7 +219,7 @@ class Schema(Descriptor):
         if isinstance(field_type, str):
             self._j_schema = self._j_schema.field(field_name, field_type)
         else:
-            self._j_schema = self._j_schema.field(field_name, _to_java_type(field_type))
+            self._j_schema = self._j_schema.field(field_name, _to_java_data_type(field_type))
         return self
 
     def fields(self, fields: Dict[str, Union[DataType, str]]) -> 'Schema':
diff --git a/flink-python/pyflink/table/expressions.py b/flink-python/pyflink/table/expressions.py
index d870f825e20..503caf852fa 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -20,8 +20,8 @@ from typing import Union
 from pyflink import add_version_doc
 from pyflink.java_gateway import get_gateway
 from pyflink.table.expression import Expression, _get_java_expression, TimePointUnit, JsonOnNull
-from pyflink.table.types import _to_java_data_type, DataType, _to_java_type
-from pyflink.table.udf import UserDefinedFunctionWrapper, UserDefinedTableFunctionWrapper
+from pyflink.table.types import _to_java_data_type, DataType
+from pyflink.table.udf import UserDefinedFunctionWrapper
 from pyflink.util.java_utils import to_jarray, load_java_class
 
 __all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 'or_', 'not_', 'UNBOUNDED_ROW',
@@ -767,21 +767,6 @@ def call(f: Union[str, UserDefinedFunctionWrapper], *args) -> Expression:
         return Expression(gateway.jvm.Expressions.call(
             f, to_jarray(gateway.jvm.Object, [_get_java_expression(arg) for arg in args])))
 
-    def get_function_definition(f):
-        if isinstance(f, UserDefinedTableFunctionWrapper):
-            """
-            TypeInference was not supported for TableFunction in the old planner. Use
-            TableFunctionDefinition to work around this issue.
-            """
-            j_result_types = to_jarray(gateway.jvm.TypeInformation,
-                                       [_to_java_type(i) for i in f._result_types])
-            j_result_type = gateway.jvm.org.apache.flink.api.java.typeutils.RowTypeInfo(
-                j_result_types)
-            return gateway.jvm.org.apache.flink.table.functions.TableFunctionDefinition(
-                'f', f._java_user_defined_function(), j_result_type)
-        else:
-            return f._java_user_defined_function()
-
     expressions_clz = load_java_class("org.apache.flink.table.api.Expressions")
     function_definition_clz = load_java_class('org.apache.flink.table.functions.FunctionDefinition')
     j_object_array_type = to_jarray(gateway.jvm.Object, []).getClass()
@@ -794,7 +779,7 @@ def call(f: Union[str, UserDefinedFunctionWrapper], *args) -> Expression:
     return Expression(api_call_method.invoke(
         None,
         to_jarray(gateway.jvm.Object,
-                  [get_function_definition(f),
+                  [f._java_user_defined_function(),
                    to_jarray(gateway.jvm.Object, [_get_java_expression(arg) for arg in args])])))
 
 
diff --git a/flink-python/pyflink/table/sinks.py b/flink-python/pyflink/table/sinks.py
index 26df8a040f9..e6091ab0265 100644
--- a/flink-python/pyflink/table/sinks.py
+++ b/flink-python/pyflink/table/sinks.py
@@ -17,7 +17,7 @@
 ################################################################################
 
 from pyflink.java_gateway import get_gateway
-from pyflink.table.types import _to_java_type
+from pyflink.table.types import _to_java_data_type
 from pyflink.util import java_utils
 
 __all__ = ['TableSink', 'CsvTableSink', 'WriteMode']
@@ -68,11 +68,11 @@ class CsvTableSink(TableSink):
             j_write_mode = None
         else:
             raise Exception('Unsupported write_mode: %s' % write_mode)
-        j_csv_table_sink = gateway.jvm.CsvTableSink(
-            path, field_delimiter, num_files, j_write_mode)
         j_field_names = java_utils.to_jarray(gateway.jvm.String, field_names)
         j_field_types = java_utils.to_jarray(
-            gateway.jvm.TypeInformation,
-            [_to_java_type(field_type) for field_type in field_types])
-        j_csv_table_sink = j_csv_table_sink.configure(j_field_names, j_field_types)
+            gateway.jvm.DataType,
+            [_to_java_data_type(field_type) for field_type in field_types])
+        j_csv_table_sink = gateway.jvm.CsvTableSink(
+            path, field_delimiter, num_files, j_write_mode, j_field_names, j_field_types)
+
         super(CsvTableSink, self).__init__(j_csv_table_sink)
diff --git a/flink-python/pyflink/table/sources.py b/flink-python/pyflink/table/sources.py
index cbce0be0d60..3e0df45e8ea 100644
--- a/flink-python/pyflink/table/sources.py
+++ b/flink-python/pyflink/table/sources.py
@@ -17,8 +17,7 @@
 ################################################################################
 
 from pyflink.java_gateway import get_gateway
-from pyflink.table.types import _to_java_type
-
+from pyflink.table.types import _to_java_data_type
 __all__ = ['TableSource', 'CsvTableSource']
 
 
@@ -82,7 +81,7 @@ class CsvTableSource(TableSource):
         builder.path(source_path)
 
         for (field_name, field_type) in zip(field_names, field_types):
-            builder.field(field_name, _to_java_type(field_type))
+            builder.field(field_name, _to_java_data_type(field_type))
 
         if field_delim is not None:
             builder.fieldDelimiter(field_delim)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 544b796bd0c..f7a49fe28c1 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -15,6 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+import atexit
 import os
 import sys
 import tempfile
@@ -40,7 +41,7 @@ from pyflink.table.statement_set import StatementSet
 from pyflink.table.table_config import TableConfig
 from pyflink.table.table_descriptor import TableDescriptor
 from pyflink.table.table_result import TableResult
-from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, DataType, \
+from pyflink.table.types import _create_type_verifier, RowType, DataType, \
     _infer_schema_from_data, _create_converter, from_arrow_type, RowField, create_arrow_schema, \
     _to_java_data_type
 from pyflink.table.udf import UserDefinedFunctionWrapper, AggregateFunction, udaf, \
@@ -1426,7 +1427,7 @@ class TableEnvironment(object):
         elements = [schema.to_sql_type(element) for element in elements]
         return self._from_elements(elements, schema)
 
-    def _from_elements(self, elements: List, schema: Union[DataType, List[str]]) -> Table:
+    def _from_elements(self, elements: List, schema: DataType) -> Table:
         """
         Creates a table from a collection of elements.
 
@@ -1439,22 +1440,15 @@ class TableEnvironment(object):
         try:
             with temp_file:
                 serializer.serialize(elements, temp_file)
-            row_type_info = _to_java_type(schema)
-            execution_config = self._get_j_env().getConfig()
+            j_schema = _to_java_data_type(schema)
             gateway = get_gateway()
-            j_objs = gateway.jvm.PythonBridgeUtils.readPythonObjects(temp_file.name, True)
             PythonTableUtils = gateway.jvm \
                 .org.apache.flink.table.utils.python.PythonTableUtils
-            PythonInputFormatTableSource = gateway.jvm \
-                .org.apache.flink.table.utils.python.PythonInputFormatTableSource
-            j_input_format = PythonTableUtils.getInputFormat(
-                j_objs, row_type_info, execution_config)
-            j_table_source = PythonInputFormatTableSource(
-                j_input_format, row_type_info)
-
-            return Table(self._j_tenv.fromTableSource(j_table_source), self)
+            j_table = PythonTableUtils.createTableFromElement(
+                self._j_tenv, temp_file.name, j_schema, True)
+            return Table(j_table, self)
         finally:
-            os.unlink(temp_file.name)
+            atexit.register(lambda: os.unlink(temp_file.name))
 
     def from_pandas(self, pdf,
                     schema: Union[RowType, List[str], Tuple[str], List[DataType],
@@ -1526,8 +1520,7 @@ class TableEnvironment(object):
                 serializer.serialize(data, temp_file)
             jvm = get_gateway().jvm
 
-            data_type = jvm.org.apache.flink.table.types.utils.TypeConversions\
-                .fromLegacyInfoToDataType(_to_java_type(result_type)).notNull()
+            data_type = _to_java_data_type(result_type).notNull()
             data_type = data_type.bridgedTo(
                 load_java_class('org.apache.flink.table.data.RowData'))
 
diff --git a/flink-python/pyflink/table/table_result.py b/flink-python/pyflink/table/table_result.py
index 58666e09e28..6c982676d43 100644
--- a/flink-python/pyflink/table/table_result.py
+++ b/flink-python/pyflink/table/table_result.py
@@ -25,7 +25,7 @@ from pyflink.common.job_client import JobClient
 from pyflink.java_gateway import get_gateway
 from pyflink.table.result_kind import ResultKind
 from pyflink.table.table_schema import TableSchema
-from pyflink.table.types import _from_java_type
+from pyflink.table.types import _from_java_data_type
 from pyflink.table.utils import pickled_bytes_to_python_converter
 
 __all__ = ['TableResult', 'CloseableIterator']
@@ -226,7 +226,7 @@ class CloseableIterator(object):
     def __init__(self, j_closeable_iterator, field_data_types):
         self._j_closeable_iterator = j_closeable_iterator
         self._j_field_data_types = field_data_types
-        self._data_types = [_from_java_type(j_field_data_type)
+        self._data_types = [_from_java_data_type(j_field_data_type)
                             for j_field_data_type in self._j_field_data_types]
 
     def __iter__(self):
diff --git a/flink-python/pyflink/table/table_schema.py b/flink-python/pyflink/table/table_schema.py
index 3c4b5eab0e3..fd2dbd941fd 100644
--- a/flink-python/pyflink/table/table_schema.py
+++ b/flink-python/pyflink/table/table_schema.py
@@ -18,7 +18,7 @@
 from typing import List, Optional, Union
 
 from pyflink.java_gateway import get_gateway
-from pyflink.table.types import _to_java_type, _from_java_type, DataType, RowType
+from pyflink.table.types import DataType, RowType, _to_java_data_type, _from_java_data_type
 from pyflink.util.java_utils import to_jarray
 
 __all__ = ['TableSchema']
@@ -34,9 +34,10 @@ class TableSchema(object):
         if j_table_schema is None:
             gateway = get_gateway()
             j_field_names = to_jarray(gateway.jvm.String, field_names)
-            j_data_types = to_jarray(gateway.jvm.TypeInformation,
-                                     [_to_java_type(item) for item in data_types])
-            self._j_table_schema = gateway.jvm.TableSchema(j_field_names, j_data_types)
+            j_data_types = to_jarray(gateway.jvm.DataType,
+                                     [_to_java_data_type(item) for item in data_types])
+            self._j_table_schema = gateway.jvm.TableSchema.builder()\
+                .fields(j_field_names, j_data_types).build()
         else:
             self._j_table_schema = j_table_schema
 
@@ -54,7 +55,7 @@ class TableSchema(object):
 
         :return: A list of all field data types.
         """
-        return [_from_java_type(item) for item in self._j_table_schema.getFieldDataTypes()]
+        return [_from_java_data_type(item) for item in self._j_table_schema.getFieldDataTypes()]
 
     def get_field_data_type(self, field: Union[int, str]) -> Optional[DataType]:
         """
@@ -67,7 +68,7 @@ class TableSchema(object):
             raise TypeError("Expected field index or field name, got %s" % type(field))
         optional_result = self._j_table_schema.getFieldDataType(field)
         if optional_result.isPresent():
-            return _from_java_type(optional_result.get())
+            return _from_java_data_type(optional_result.get())
         else:
             return None
 
@@ -107,7 +108,7 @@ class TableSchema(object):
 
         :return: The row data type.
         """
-        return _from_java_type(self._j_table_schema.toRowDataType())
+        return _from_java_data_type(self._j_table_schema.toRowDataType())
 
     def __repr__(self):
         return self._j_table_schema.toString()
diff --git a/flink-python/pyflink/table/tests/test_calc.py b/flink-python/pyflink/table/tests/test_calc.py
index 4f12a5c0104..4fc7955986a 100644
--- a/flink-python/pyflink/table/tests/test_calc.py
+++ b/flink-python/pyflink/table/tests/test_calc.py
@@ -85,8 +85,30 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
             list(map(lambda field_name, field_type: DataTypes.FIELD(field_name, field_type),
                      field_names,
                      field_types)))
-        table_sink = source_sink_utils.TestAppendSink(field_names, field_types)
-        t_env.register_table_sink("Results", table_sink)
+
+        sink_table_ddl = """
+            CREATE TABLE Results(
+            a BIGINT,
+            b DOUBLE,
+            c STRING,
+            d STRING,
+            e DATE,
+            f TIME,
+            g TIMESTAMP(3),
+            h INT,
+            i ARRAY<DOUBLE>,
+            j ARRAY<DOUBLE NOT NULL>,
+            k ARRAY<STRING>,
+            l ARRAY<DATE>,
+            m DECIMAL(38, 18),
+            n ROW<a BIGINT, b DOUBLE>,
+            o MAP<STRING, DOUBLE>,
+            p BYTES,
+            q ARRAY<DOUBLE NOT NULL>)
+            WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
+
         t = t_env.from_elements(
             [(1, 1.0, "hi", "hello", datetime.date(1970, 1, 2), datetime.time(1, 0, 0),
               datetime.datetime(1970, 1, 2, 0, 0),
@@ -98,8 +120,8 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
         t.execute_insert("Results").wait()
         actual = source_sink_utils.results()
 
-        expected = ['+I[1, 1.0, hi, hello, 1970-01-02, 01:00:00, 1970-01-02 00:00:00.0, '
-                    '86400000, [1.0, null], [1.0, 2.0], [abc], [1970-01-02], '
+        expected = ['+I[1, 1.0, hi, hello, 1970-01-02, 01:00, 1970-01-02T00:00, '
+                    '86400, [1.0, null], [1.0, 2.0], [abc], [1970-01-02], '
                     '1.000000000000000000, +I[1, 2.0], {key=1.0}, [65, 66, 67, 68], [3.0, 4.0]]']
         self.assert_equals(actual, expected)
 
@@ -113,8 +135,12 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
             list(map(lambda field_name, field_type: DataTypes.FIELD(field_name, field_type),
                      field_names,
                      field_types)))
-        table_sink = source_sink_utils.TestAppendSink(field_names, field_types)
-        t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+            CREATE TABLE Results(a BIGINT, b STRING, c FLOAT)
+            WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
+
         t = t_env.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)], schema)
         t.execute_insert("Results").wait()
         actual = source_sink_utils.results()
diff --git a/flink-python/pyflink/table/tests/test_correlate.py b/flink-python/pyflink/table/tests/test_correlate.py
index 6ac1d020166..f2616924c7c 100644
--- a/flink-python/pyflink/table/tests/test_correlate.py
+++ b/flink-python/pyflink/table/tests/test_correlate.py
@@ -23,8 +23,9 @@ class CorrelateTests(PyFlinkStreamTableTestCase):
 
     def test_join_lateral(self):
         t_env = self.t_env
-        t_env.create_java_temporary_system_function("split",
-                                                    "org.apache.flink.table.legacyutils.TableFunc1")
+        t_env.create_java_temporary_system_function(
+            "split",
+            "org.apache.flink.table.utils.TestingFunctions$TableFunc1")
         source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])
 
         result = source.join_lateral(expr.call('split', source.words).alias('word'))
@@ -36,8 +37,9 @@ class CorrelateTests(PyFlinkStreamTableTestCase):
 
     def test_join_lateral_with_join_predicate(self):
         t_env = self.t_env
-        t_env.create_java_temporary_system_function("split",
-                                                    "org.apache.flink.table.legacyutils.TableFunc1")
+        t_env.create_java_temporary_system_function(
+            "split",
+            "org.apache.flink.table.utils.TestingFunctions$TableFunc1")
         source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])
 
         result = source.join_lateral(expr.call('split', source.words).alias('word'),
@@ -51,8 +53,9 @@ class CorrelateTests(PyFlinkStreamTableTestCase):
 
     def test_left_outer_join_lateral(self):
         t_env = self.t_env
-        t_env.create_java_temporary_system_function("split",
-                                                    "org.apache.flink.table.legacyutils.TableFunc1")
+        t_env.create_java_temporary_system_function(
+            "split",
+            "org.apache.flink.table.utils.TestingFunctions$TableFunc1")
         source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])
 
         result = source.left_outer_join_lateral(expr.call('split', source.words).alias('word'))
@@ -64,8 +67,9 @@ class CorrelateTests(PyFlinkStreamTableTestCase):
 
     def test_left_outer_join_lateral_with_join_predicate(self):
         t_env = self.t_env
-        t_env.create_java_temporary_system_function("split",
-                                                    "org.apache.flink.table.legacyutils.TableFunc1")
+        t_env.create_java_temporary_system_function(
+            "split",
+            "org.apache.flink.table.utils.TestingFunctions$TableFunc1")
         source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])
 
         # only support "true" as the join predicate currently
diff --git a/flink-python/pyflink/table/tests/test_dependency.py b/flink-python/pyflink/table/tests/test_dependency.py
index 025eba9e884..0e5ca9491a5 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -56,9 +56,10 @@ class DependencyTests(object):
 
         self.t_env.create_temporary_system_function(
             "add_two", udf(plus_two, DataTypes.BIGINT(), DataTypes.BIGINT()))
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
         t.select(expr.call("add_two", t.a), t.a).execute_insert("Results").wait()
 
@@ -82,9 +83,12 @@ class DependencyTests(object):
         self.t_env.create_temporary_system_function("add_from_file",
                                                     udf(add_from_file, DataTypes.BIGINT(),
                                                         DataTypes.BIGINT()))
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+
+        sink_table_ddl = """
+        CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
+
         t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
         t.select(expr.call('add_from_file', t.a), t.a).execute_insert("Results").wait()
 
@@ -130,9 +134,11 @@ class StreamDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
         self.st_env.create_temporary_system_function(
             "check_requirements",
             udf(check_requirements, DataTypes.BIGINT(), DataTypes.BIGINT()))
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.st_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+                CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+                """
+        self.st_env.execute_sql(sink_table_ddl)
+
         t = self.st_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
         t.select(expr.call('check_requirements', t.a), t.a).execute_insert("Results").wait()
 
@@ -176,9 +182,10 @@ class StreamDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
         self.st_env.create_temporary_system_function(
             "add_one",
             udf(add_one, DataTypes.BIGINT(), DataTypes.BIGINT()))
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.st_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+        """
+        self.st_env.execute_sql(sink_table_ddl)
         t = self.st_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
         t.select(expr.call('add_one', t.a), t.a).execute_insert("Results").wait()
 
@@ -208,9 +215,10 @@ class StreamDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
             udf(check_pyflink_gateway_disabled, DataTypes.BIGINT(),
                 DataTypes.BIGINT()))
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.st_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+        """
+        self.st_env.execute_sql(sink_table_ddl)
         t = self.st_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
         t.select(
             expr.call('check_python_exec', t.a),
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py
index e63429bad08..9bbdf96c744 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -42,18 +42,18 @@ class RowTimeDescriptorTests(PyFlinkTestCase):
 
     def test_timestamps_from_extractor(self):
         rowtime = Rowtime().timestamps_from_extractor(
-            "org.apache.flink.table.legacyutils.CustomExtractor")
+            "org.apache.flink.table.utils.TestingDescriptors$CustomExtractor")
 
         properties = rowtime.to_properties()
         expected = {
             'rowtime.timestamps.type': 'custom',
             'rowtime.timestamps.class':
-                'org.apache.flink.table.legacyutils.CustomExtractor',
+                'org.apache.flink.table.utils.TestingDescriptors$CustomExtractor',
             'rowtime.timestamps.serialized':
-                'rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmxlZ2FjeXV0aWxzLkN1c3RvbUV4dHJhY3Rvcl4'
-                'ozwVLIwG6AgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cmluZzt4cgA-b3JnLmFwYWNoZS5mbGluay'
-                '50YWJsZS5zb3VyY2VzLnRzZXh0cmFjdG9ycy5UaW1lc3RhbXBFeHRyYWN0b3Jf1Y6piFNsGAIAAHhwd'
-                'AACdHM'}
+                'rO0ABXNyAD9vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnV0aWxzLlRlc3RpbmdEZXNjcmlwdG9ycyRDdXN0b2'
+                '1FeHRyYWN0b3K-MntVKO8Z7QIAAUwABWZpZWxkdAASTGphdmEvbGFuZy9TdHJpbmc7eHIAPm9yZy5hcGFj'
+                'aGUuZmxpbmsudGFibGUuc291cmNlcy50c2V4dHJhY3RvcnMuVGltZXN0YW1wRXh0cmFjdG9yX9WOqYhTbB'
+                'gCAAB4cHQAAnRz'}
         self.assertEqual(expected, properties)
 
     def test_watermarks_periodic_ascending(self):
@@ -80,18 +80,18 @@ class RowTimeDescriptorTests(PyFlinkTestCase):
 
     def test_watermarks_from_strategy(self):
         rowtime = Rowtime().watermarks_from_strategy(
-            "org.apache.flink.table.legacyutils.CustomAssigner")
+            "org.apache.flink.table.utils.TestingDescriptors$CustomAssigner")
 
         properties = rowtime.to_properties()
         expected = {
             'rowtime.watermarks.type': 'custom',
             'rowtime.watermarks.class':
-                'org.apache.flink.table.legacyutils.CustomAssigner',
+                'org.apache.flink.table.utils.TestingDescriptors$CustomAssigner',
             'rowtime.watermarks.serialized':
-                'rO0ABXNyADFvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmxlZ2FjeXV0aWxzLkN1c3RvbUFzc2lnbmVyu_8'
-                'TLNBQBsACAAB4cgBHb3JnLmFwYWNoZS5mbGluay50YWJsZS5zb3VyY2VzLndtc3RyYXRlZ2llcy5QdW'
-                '5jdHVhdGVkV2F0ZXJtYXJrQXNzaWduZXKBUc57oaWu9AIAAHhyAD1vcmcuYXBhY2hlLmZsaW5rLnRhY'
-                'mxlLnNvdXJjZXMud21zdHJhdGVnaWVzLldhdGVybWFya1N0cmF0ZWd53nt-g2OWaT4CAAB4cA'}
+                'rO0ABXNyAD5vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnV0aWxzLlRlc3RpbmdEZXNjcmlwdG9ycyRDdXN0b2'
+                '1Bc3NpZ25lcsY_Xt96bBjDAgAAeHIAR29yZy5hcGFjaGUuZmxpbmsudGFibGUuc291cmNlcy53bXN0cmF0'
+                'ZWdpZXMuUHVuY3R1YXRlZFdhdGVybWFya0Fzc2lnbmVygVHOe6GlrvQCAAB4cgA9b3JnLmFwYWNoZS5mbG'
+                'luay50YWJsZS5zb3VyY2VzLndtc3RyYXRlZ2llcy5XYXRlcm1hcmtTdHJhdGVned57foNjlmk-AgAAeHA'}
         self.assertEqual(expected, properties)
 
 
diff --git a/flink-python/pyflink/table/tests/test_pandas_conversion.py b/flink-python/pyflink/table/tests/test_pandas_conversion.py
index 43c3b5e2d2d..703fb9b512f 100644
--- a/flink-python/pyflink/table/tests/test_pandas_conversion.py
+++ b/flink-python/pyflink/table/tests/test_pandas_conversion.py
@@ -121,17 +121,34 @@ class PandasConversionITTests(PandasConversionTestBase):
         self.assertEqual(self.data_type, table.get_schema().to_row_data_type())
 
         table = table.filter(table.f2 < 2)
-        table_sink = source_sink_utils.TestAppendSink(
-            self.data_type.field_names(),
-            self.data_type.field_types())
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+            CREATE TABLE Results(
+            f1 TINYINT,
+            f2 SMALLINT,
+            f3 INT,
+            f4 BIGINT,
+            f5 BOOLEAN,
+            f6 FLOAT,
+            f7 DOUBLE,
+            f8 STRING,
+            f9 BYTES,
+            f10 DECIMAL(38, 18),
+            f11 DATE,
+            f12 TIME,
+            f13 TIMESTAMP(3),
+            f14 ARRAY<STRING>,
+            f15 ROW<a INT, b STRING, c TIMESTAMP(3), d ARRAY<INT>>)
+            WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
+
         table.execute_insert("Results").wait()
         actual = source_sink_utils.results()
         self.assert_equals(actual,
                            ["+I[1, 1, 1, 1, true, 1.1, 1.2, hello, [97, 97, 97], "
                             "1000000000000000000.010000000000000000, 2014-09-13, 01:00:01, "
-                            "1970-01-01 00:00:00.123, [hello, 中文], +I[1, hello, "
-                            "1970-01-01 00:00:00.123, [1, 2]]]"])
+                            "1970-01-01T00:00:00.123, [hello, 中文], +I[1, hello, "
+                            "1970-01-01T00:00:00.123, [1, 2]]]"])
 
     def test_to_pandas(self):
         table = self.t_env.from_pandas(self.pdf, self.data_type)
diff --git a/flink-python/pyflink/table/tests/test_pandas_udaf.py b/flink-python/pyflink/table/tests/test_pandas_udaf.py
index ce65d5715b1..71cd6380f41 100644
--- a/flink-python/pyflink/table/tests/test_pandas_udaf.py
+++ b/flink-python/pyflink/table/tests/test_pandas_udaf.py
@@ -45,13 +45,10 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
                  DataTypes.FIELD("b", DataTypes.SMALLINT()),
                  DataTypes.FIELD("c", DataTypes.INT())]))
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c'],
-            [DataTypes.TINYINT(), DataTypes.FLOAT(),
-             DataTypes.ROW(
-                 [DataTypes.FIELD("a", DataTypes.INT()),
-                  DataTypes.FIELD("b", DataTypes.INT())])])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TINYINT, b FLOAT,c ROW<a INT, b INT>) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         # general udf
         add = udf(lambda a: a + 1, result_type=DataTypes.INT())
         # pandas udf
@@ -78,12 +75,12 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
                  DataTypes.FIELD("b", DataTypes.SMALLINT()),
                  DataTypes.FIELD("c", DataTypes.INT())]))
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a'],
-            [DataTypes.INT()])
+        sink_table_ddl = """
+        CREATE TABLE Results(a INT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         min_add = udaf(lambda a, b, c: a.min() + b.min() + c.min(),
                        result_type=DataTypes.INT(), func_type="pandas")
-        self.t_env.register_table_sink("Results", table_sink)
         t.select(min_add(t.a, t.b, t.c)) \
             .execute_insert("Results") \
             .wait()
@@ -98,10 +95,11 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
                  DataTypes.FIELD("b", DataTypes.SMALLINT()),
                  DataTypes.FIELD("c", DataTypes.INT())]))
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd'],
-            [DataTypes.TINYINT(), DataTypes.INT(), DataTypes.FLOAT(), DataTypes.INT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TINYINT, b INT, c FLOAT, d INT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
+        self.t_env.get_config().get_configuration().set_string('python.metric.enabled', 'true')
         self.t_env.get_config().set('python.metric.enabled', 'true')
         self.t_env.register_function("max_add", udaf(MaxAdd(),
                                                      result_type=DataTypes.INT(),
@@ -133,15 +131,10 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
                  DataTypes.FIELD("b", DataTypes.SMALLINT()),
                  DataTypes.FIELD("c", DataTypes.INT()),
                  DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))
-
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c'],
-            [
-                DataTypes.TIMESTAMP(3),
-                DataTypes.TIMESTAMP(3),
-                DataTypes.FLOAT()
-            ])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TIMESTAMP(3), b TIMESTAMP(3), c FLOAT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         self.t_env.create_temporary_system_function("mean_udaf", mean_udaf)
         tumble_window = Tumble.over(lit(1).hours) \
             .on(col("rowtime")) \
@@ -154,8 +147,8 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
 
         actual = source_sink_utils.results()
         self.assert_equals(actual,
-                           ["+I[2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2.2]",
-                            "+I[2018-03-11 04:00:00.0, 2018-03-11 05:00:00.0, 8.0]"])
+                           ["+I[2018-03-11T03:00, 2018-03-11T04:00, 2.2]",
+                            "+I[2018-03-11T04:00, 2018-03-11T05:00, 8.0]"])
 
     def test_slide_group_window_aggregate_function(self):
         import datetime
@@ -174,17 +167,11 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
                  DataTypes.FIELD("b", DataTypes.SMALLINT()),
                  DataTypes.FIELD("c", DataTypes.INT()),
                  DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))
-
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd', 'e'],
-            [
-                DataTypes.TINYINT(),
-                DataTypes.TIMESTAMP(3),
-                DataTypes.TIMESTAMP(3),
-                DataTypes.FLOAT(),
-                DataTypes.INT()
-            ])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(
+        a TINYINT, b TIMESTAMP(3), c TIMESTAMP(3), d FLOAT, e INT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         self.t_env.register_function("max_add", udaf(MaxAdd(),
                                                      result_type=DataTypes.INT(),
                                                      func_type="pandas"))
@@ -204,15 +191,15 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
             .wait()
         actual = source_sink_utils.results()
         self.assert_equals(actual,
-                           ["+I[1, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 2.0, 6]",
-                            "+I[1, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2.5, 7]",
-                            "+I[1, 2018-03-11 03:30:00.0, 2018-03-11 04:30:00.0, 5.5, 14]",
-                            "+I[1, 2018-03-11 04:00:00.0, 2018-03-11 05:00:00.0, 8.0, 14]",
-                            "+I[2, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 1.0, 4]",
-                            "+I[2, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2.0, 10]",
-                            "+I[2, 2018-03-11 03:30:00.0, 2018-03-11 04:30:00.0, 3.0, 10]",
-                            "+I[3, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2.0, 7]",
-                            "+I[3, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 2.0, 7]"])
+                           ["+I[1, 2018-03-11T02:30, 2018-03-11T03:30, 2.0, 6]",
+                            "+I[1, 2018-03-11T03:00, 2018-03-11T04:00, 2.5, 7]",
+                            "+I[1, 2018-03-11T03:30, 2018-03-11T04:30, 5.5, 14]",
+                            "+I[1, 2018-03-11T04:00, 2018-03-11T05:00, 8.0, 14]",
+                            "+I[2, 2018-03-11T02:30, 2018-03-11T03:30, 1.0, 4]",
+                            "+I[2, 2018-03-11T03:00, 2018-03-11T04:00, 2.0, 10]",
+                            "+I[2, 2018-03-11T03:30, 2018-03-11T04:30, 3.0, 10]",
+                            "+I[3, 2018-03-11T03:00, 2018-03-11T04:00, 2.0, 7]",
+                            "+I[3, 2018-03-11T02:30, 2018-03-11T03:30, 2.0, 7]"])
 
     def test_over_window_aggregate_function(self):
         import datetime
@@ -230,13 +217,12 @@ class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
                  DataTypes.FIELD("b", DataTypes.SMALLINT()),
                  DataTypes.FIELD("c", DataTypes.INT()),
                  DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))
-
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'],
-            [DataTypes.TINYINT(), DataTypes.FLOAT(), DataTypes.INT(), DataTypes.FLOAT(),
-             DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(),
-             DataTypes.FLOAT(), DataTypes.FLOAT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+            CREATE TABLE Results(
+            a TINYINT, b FLOAT, c INT, d FLOAT, e FLOAT, f FLOAT, g FLOAT, h FLOAT, i FLOAT,
+            j FLOAT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         self.t_env.create_temporary_system_function("mean_udaf", mean_udaf)
         self.t_env.register_function("max_add", udaf(MaxAdd(),
                                                      result_type=DataTypes.INT(),
@@ -325,15 +311,11 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
         """ % source_path
         self.t_env.execute_sql(source_table)
         t = self.t_env.from_path("source_table")
-
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd'],
-            [
-                DataTypes.TINYINT(),
-                DataTypes.TIMESTAMP(3),
-                DataTypes.TIMESTAMP(3),
-                DataTypes.FLOAT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+            CREATE TABLE Results(a TINYINT, b TIMESTAMP(3), c TIMESTAMP(3), d FLOAT)
+            WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         t.window(Slide.over(lit(1).hours)
                  .every(lit(30).minutes)
                  .on(col("rowtime"))
@@ -344,15 +326,15 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
             .wait()
         actual = source_sink_utils.results()
         self.assert_equals(actual,
-                           ["+I[1, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 2.0]",
-                            "+I[1, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2.5]",
-                            "+I[1, 2018-03-11 03:30:00.0, 2018-03-11 04:30:00.0, 5.5]",
-                            "+I[1, 2018-03-11 04:00:00.0, 2018-03-11 05:00:00.0, 8.0]",
-                            "+I[2, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 1.0]",
-                            "+I[2, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2.0]",
-                            "+I[2, 2018-03-11 03:30:00.0, 2018-03-11 04:30:00.0, 3.0]",
-                            "+I[3, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2.0]",
-                            "+I[3, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 2.0]"])
+                           ["+I[1, 2018-03-11T02:30, 2018-03-11T03:30, 2.0]",
+                            "+I[1, 2018-03-11T03:00, 2018-03-11T04:00, 2.5]",
+                            "+I[1, 2018-03-11T03:30, 2018-03-11T04:30, 5.5]",
+                            "+I[1, 2018-03-11T04:00, 2018-03-11T05:00, 8.0]",
+                            "+I[2, 2018-03-11T02:30, 2018-03-11T03:30, 1.0]",
+                            "+I[2, 2018-03-11T03:00, 2018-03-11T04:00, 2.0]",
+                            "+I[2, 2018-03-11T03:30, 2018-03-11T04:30, 3.0]",
+                            "+I[3, 2018-03-11T03:00, 2018-03-11T04:00, 2.0]",
+                            "+I[3, 2018-03-11T02:30, 2018-03-11T03:30, 2.0]"])
         os.remove(source_path)
 
     def test_sliding_group_window_over_proctime(self):
@@ -429,13 +411,10 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
         """ % source_path
         self.t_env.execute_sql(source_table)
         t = self.t_env.from_path("source_table")
-
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'd'],
-            [
-                DataTypes.TINYINT(),
-                DataTypes.FLOAT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TINYINT, d FLOAT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         t.window(Slide.over(row_interval(2))
                  .every(row_interval(1))
                  .on(t.protime)
@@ -489,15 +468,12 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
         self.t_env.execute_sql(source_table)
         t = self.t_env.from_path("source_table")
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd', 'e'],
-            [
-                DataTypes.TINYINT(),
-                DataTypes.TIMESTAMP(3),
-                DataTypes.TIMESTAMP(3),
-                DataTypes.TIMESTAMP(3),
-                DataTypes.FLOAT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(
+        a TINYINT, b TIMESTAMP(3), c TIMESTAMP(3), d TIMESTAMP(3), e FLOAT)
+        WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         t.window(Tumble.over(lit(1).hours).on(t.rowtime).alias("w")) \
             .group_by(t.a, t.b, col("w")) \
             .select(t.a,
@@ -509,10 +485,10 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
             .wait()
         actual = source_sink_utils.results()
         self.assert_equals(actual, [
-            "+I[1, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2018-03-11 03:59:59.999, 2.5]",
-            "+I[1, 2018-03-11 04:00:00.0, 2018-03-11 05:00:00.0, 2018-03-11 04:59:59.999, 8.0]",
-            "+I[2, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2018-03-11 03:59:59.999, 2.0]",
-            "+I[3, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2018-03-11 03:59:59.999, 2.0]",
+            "+I[1, 2018-03-11T03:00, 2018-03-11T04:00, 2018-03-11T03:59:59.999, 2.5]",
+            "+I[1, 2018-03-11T04:00, 2018-03-11T05:00, 2018-03-11T04:59:59.999, 8.0]",
+            "+I[2, 2018-03-11T03:00, 2018-03-11T04:00, 2018-03-11T03:59:59.999, 2.0]",
+            "+I[3, 2018-03-11T03:00, 2018-03-11T04:00, 2018-03-11T03:59:59.999, 2.0]",
         ])
         os.remove(source_path)
 
@@ -559,12 +535,10 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
         self.t_env.execute_sql(source_table)
         t = self.t_env.from_path("source_table")
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'd'],
-            [
-                DataTypes.TINYINT(),
-                DataTypes.FLOAT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TINYINT, d FLOAT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         t.window(Tumble.over(row_interval(2)).on(t.protime).alias("w")) \
             .group_by(t.a, t.b, col("w")) \
             .select(t.a, mean_udaf(t.c).alias("b")) \
@@ -613,13 +587,10 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
             )
         """ % source_path
         self.t_env.execute_sql(source_table)
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c'],
-            [
-                DataTypes.TINYINT(),
-                DataTypes.FLOAT(),
-                DataTypes.SMALLINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TINYINT, b FLOAT, c SMALLINT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         self.t_env.execute_sql("""
             insert into Results
             select a,
@@ -681,13 +652,10 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
             )
         """ % source_path
         self.t_env.execute_sql(source_table)
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c'],
-            [
-                DataTypes.TINYINT(),
-                DataTypes.FLOAT(),
-                DataTypes.SMALLINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TINYINT, b FLOAT, c SMALLINT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         self.t_env.execute_sql("""
             insert into Results
             select a,
@@ -749,13 +717,10 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
             )
         """ % source_path
         self.t_env.execute_sql(source_table)
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c'],
-            [
-                DataTypes.TINYINT(),
-                DataTypes.FLOAT(),
-                DataTypes.SMALLINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TINYINT, b FLOAT, c SMALLINT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         self.t_env.execute_sql("""
             insert into Results
             select a,
diff --git a/flink-python/pyflink/table/tests/test_pandas_udf.py b/flink-python/pyflink/table/tests/test_pandas_udf.py
index 415b23d4690..71a3f1cb782 100644
--- a/flink-python/pyflink/table/tests/test_pandas_udf.py
+++ b/flink-python/pyflink/table/tests/test_pandas_udf.py
@@ -45,10 +45,10 @@ class PandasUDFITTests(object):
         # general Python UDF
         subtract_one = udf(SubtractOne(), DataTypes.BIGINT(), DataTypes.BIGINT())
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd'],
-            [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a BIGINT, b BIGINT, c BIGINT, d BIGINT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
 
         t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
         t.where(add_one(t.b) <= 3) \
@@ -208,17 +208,14 @@ class PandasUDFITTests(object):
                 'row_param.f4 of wrong type %s !' % type(row_param.f4[0])
             return row_param
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q',
-             'r', 's', 't', 'u'],
-            [DataTypes.TINYINT(), DataTypes.SMALLINT(), DataTypes.INT(), DataTypes.BIGINT(),
-             DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), DataTypes.FLOAT(), DataTypes.DOUBLE(),
-             DataTypes.STRING(), DataTypes.STRING(), DataTypes.BYTES(), DataTypes.DECIMAL(38, 18),
-             DataTypes.DECIMAL(38, 18), DataTypes.DATE(), DataTypes.TIME(), DataTypes.TIMESTAMP(3),
-             DataTypes.ARRAY(DataTypes.STRING()), DataTypes.ARRAY(DataTypes.TIMESTAMP(3)),
-             DataTypes.ARRAY(DataTypes.INT()),
-             DataTypes.ARRAY(DataTypes.STRING()), row_type])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(
+        a TINYINT, b SMALLINT, c INT, d BIGINT, e BOOLEAN, f BOOLEAN, g FLOAT, h DOUBLE, i STRING,
+        j StRING, k BYTES, l DECIMAL(38, 18), m DECIMAL(38, 18), n DATE, o TIME, p TIMESTAMP(3),
+        q ARRAY<STRING>, r ARRAY<TIMESTAMP(3)>, s ARRAY<INT>, t ARRAY<STRING>,
+        u ROW<f1 INT, f2 STRING, f3 TIMESTAMP(3), f4 ARRAY<INT>>) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
 
         t = self.t_env.from_elements(
             [(1, 32767, -2147483648, 1, True, False, 1.0, 1.0, 'hello', '中文',
@@ -279,8 +276,8 @@ class PandasUDFITTests(object):
             ["+I[1, 32767, -2147483648, 1, true, false, 1.0, 1.0, hello, 中文, "
              "[102, 108, 105, 110, 107], 1000000000000000000.050000000000000000, "
              "1000000000000000000.059999999999999999, 2014-09-13, 01:00:01, "
-             "1970-01-02 00:00:00.123, [hello, 中文, null], [1970-01-02 00:00:00.123], "
-             "[1, 2], [hello, 中文, null], +I[1, hello, 1970-01-02 00:00:00.123, [1, 2]]]"])
+             "1970-01-02T00:00:00.123, [hello, 中文, null], [1970-01-02T00:00:00.123], "
+             "[1, 2], [hello, 中文, null], +I[1, hello, 1970-01-02T00:00:00.123, [1, 2]]]"])
 
     def test_invalid_pandas_udf(self):
 
@@ -324,9 +321,10 @@ class PandasUDFITTests(object):
                 (local_zoned_timestamp_param[0], local_datetime)
             return local_zoned_timestamp_param
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a'], [DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TIMESTAMP_LTZ(3)) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
 
         t = self.t_env.from_elements(
             [(local_datetime,)],
diff --git a/flink-python/pyflink/table/tests/test_row_based_operation.py b/flink-python/pyflink/table/tests/test_row_based_operation.py
index 1c8734b1df8..8c99c381a99 100644
--- a/flink-python/pyflink/table/tests/test_row_based_operation.py
+++ b/flink-python/pyflink/table/tests/test_row_based_operation.py
@@ -15,6 +15,8 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+import unittest
+
 from pandas.util.testing import assert_frame_equal
 
 from pyflink.common import Row
@@ -35,10 +37,10 @@ class RowBasedOperationTests(object):
                  DataTypes.FIELD("b", DataTypes.SMALLINT()),
                  DataTypes.FIELD("c", DataTypes.INT())]))
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b'],
-            [DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
 
         func = udf(lambda x: Row(a=x + 1, b=x * x), result_type=DataTypes.ROW(
             [DataTypes.FIELD("a", DataTypes.BIGINT()),
@@ -66,10 +68,10 @@ class RowBasedOperationTests(object):
                                  DataTypes.ROW([DataTypes.FIELD("c", DataTypes.INT()),
                                                 DataTypes.FIELD("d", DataTypes.INT())]))]))
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b'],
-            [DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
 
         def func(x):
             import pandas as pd
@@ -113,11 +115,11 @@ class RowBasedOperationTests(object):
                 [DataTypes.FIELD("a", DataTypes.TINYINT()),
                  DataTypes.FIELD("b", DataTypes.STRING())]))
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd', 'e', 'f'],
-            [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.BIGINT(),
-             DataTypes.STRING(), DataTypes.BIGINT(), DataTypes.STRING()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a BIGINT, b STRING, c BIGINT, d STRING, e BIGINT, f STRING)
+        WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
 
         @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
         def split(x):
@@ -146,10 +148,10 @@ class BatchRowBasedOperationITTests(RowBasedOperationTests, PyFlinkBatchTableTes
                  DataTypes.FIELD("b", DataTypes.SMALLINT()),
                  DataTypes.FIELD("c", DataTypes.INT())]))
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c'],
-            [DataTypes.TINYINT(), DataTypes.FLOAT(), DataTypes.INT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TINYINT, b FLOAT, c INT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.a.max()),
                            result_type=DataTypes.ROW(
                                [DataTypes.FIELD("a", DataTypes.FLOAT()),
@@ -172,10 +174,10 @@ class BatchRowBasedOperationITTests(RowBasedOperationTests, PyFlinkBatchTableTes
                  DataTypes.FIELD("b", DataTypes.SMALLINT()),
                  DataTypes.FIELD("c", DataTypes.INT())]))
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b'],
-            [DataTypes.FLOAT(), DataTypes.INT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a FLOAT, b INT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         pandas_udaf = udaf(lambda pd: Row(pd.b.mean(), pd.b.max()),
                            result_type=DataTypes.ROW(
                                [DataTypes.FIELD("a", DataTypes.FLOAT()),
@@ -189,6 +191,7 @@ class BatchRowBasedOperationITTests(RowBasedOperationTests, PyFlinkBatchTableTes
         actual = source_sink_utils.results()
         self.assert_equals(actual, ["+I[3.8, 8]"])
 
+    @unittest.skip("Not supported yet")
     def test_window_aggregate_with_pandas_udaf(self):
         import datetime
         from pyflink.table.window import Tumble
@@ -207,14 +210,11 @@ class BatchRowBasedOperationITTests(RowBasedOperationTests, PyFlinkBatchTableTes
                  DataTypes.FIELD("c", DataTypes.INT()),
                  DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c'],
-            [
-                DataTypes.TIMESTAMP(3),
-                DataTypes.FLOAT(),
-                DataTypes.INT()
-            ])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TIMESTAMP(3), b FLOAT, c INT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
+        print(t.get_schema())
         pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
                            result_type=DataTypes.ROW(
                                [DataTypes.FIELD("a", DataTypes.FLOAT()),
diff --git a/flink-python/pyflink/table/tests/test_sql.py b/flink-python/pyflink/table/tests/test_sql.py
index 6158321efdb..5b51f62ee98 100644
--- a/flink-python/pyflink/table/tests/test_sql.py
+++ b/flink-python/pyflink/table/tests/test_sql.py
@@ -21,7 +21,7 @@ import subprocess
 
 from pyflink.find_flink_home import _find_flink_source_root
 from pyflink.java_gateway import get_gateway
-from pyflink.table import DataTypes, ResultKind
+from pyflink.table import ResultKind
 from pyflink.table import expressions as expr
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, \
@@ -42,11 +42,10 @@ class StreamSqlTests(PyFlinkStreamTableTestCase):
     def test_sql_query(self):
         t_env = self.t_env
         source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "sinks",
-            source_sink_utils.TestAppendSink(field_names, field_types))
+        sink_table_ddl = """
+        CREATE TABLE sinks(a BIGINT, b STRING, c STRING) WITH ('connector'='test-sink')
+        """
+        t_env.execute_sql(sink_table_ddl)
 
         result = t_env.sql_query("select a + 1, b, c from %s" % source)
         result.execute_insert("sinks").wait()
@@ -77,11 +76,10 @@ class StreamSqlTests(PyFlinkStreamTableTestCase):
         self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
         table_result.print()
 
-        field_names = ["k1", "k2", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.INT(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "sinks",
-            source_sink_utils.TestAppendSink(field_names, field_types))
+        sink_table_ddl = """
+        CREATE TABLE sinks(k1 BIGINT, k2 INT, c STRING) WITH ('connector'='test-sink')
+        """
+        t_env.execute_sql(sink_table_ddl)
         table_result = t_env.execute_sql("insert into sinks select * from tbl")
         from pyflink.common.job_status import JobStatus
         from py4j.protocol import Py4JJavaError
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index a3732ced477..4b3f47daadd 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -89,10 +89,11 @@ class TableEnvironmentTest(object):
             "python_scalar_func", udf(lambda i: i, result_type=DataTypes.INT()))
 
         t_env.register_java_function("scalar_func",
-                                     "org.apache.flink.table.legacyutils.RichFunc0")
+                                     "org.apache.flink.table.utils.TestingFunctions$RichFunc0")
         t_env.register_java_function(
-            "agg_func", "org.apache.flink.table.legacyutils.ByteMaxAggFunction")
-        t_env.register_java_function("table_func", "org.apache.flink.table.legacyutils.TableFunc1")
+            "agg_func", "org.apache.flink.table.utils.TestingFunctions$ByteMaxAggFunction")
+        t_env.register_java_function("table_func",
+                                     "org.apache.flink.table.utils.TestingFunctions$TableFunc1")
 
         actual = t_env.list_user_defined_functions()
         expected = ['python_scalar_func', 'scalar_func', 'agg_func', 'table_func']
@@ -157,11 +158,11 @@ class TableEnvironmentTest(object):
         t_env = self.t_env
 
         t_env.create_java_temporary_system_function(
-            "scalar_func", "org.apache.flink.table.legacyutils.RichFunc0")
+            "scalar_func", "org.apache.flink.table.utils.TestingFunctions$RichFunc0")
         t_env.create_java_function(
-            "agg_func", "org.apache.flink.table.legacyutils.ByteMaxAggFunction")
+            "agg_func", "org.apache.flink.table.utils.TestingFunctions$ByteMaxAggFunction")
         t_env.create_java_temporary_function(
-            "table_func", "org.apache.flink.table.legacyutils.TableFunc1")
+            "table_func", "org.apache.flink.table.utils.TestingFunctions$TableFunc1")
         self.assert_equals(t_env.list_user_defined_functions(),
                            ['scalar_func', 'agg_func', 'table_func'])
 
@@ -280,10 +281,14 @@ class DataStreamConversionTestCases(PyFlinkTestCase):
                                                            Types.STRING()]))
         t_env = self.t_env
         table = t_env.from_data_stream(ds)
-        field_names = ['a', 'b', 'c']
-        field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink("Sink",
-                                  source_sink_utils.TestAppendSink(field_names, field_types))
+        sink_table_ddl = """
+        CREATE TABLE Sink(a INT, b STRING, c STRING) WITH ('connector'='test-sink')
+        """
+        t_env.execute_sql(sink_table_ddl)
+        expr_sink_ddl = """
+        CREATE TABLE ExprSink(a INT, b STRING, c STRING) WITH ('connector'='test-sink')
+        """
+        t_env.execute_sql(expr_sink_ddl)
         table.execute_insert("Sink").wait()
         result = source_sink_utils.results()
         expected = ['+I[1, Hi, Hello]', '+I[2, Hello, Hi]']
@@ -292,8 +297,6 @@ class DataStreamConversionTestCases(PyFlinkTestCase):
         ds = ds.map(lambda x: x, Types.ROW([Types.INT(), Types.STRING(), Types.STRING()])) \
                .map(lambda x: x, Types.ROW([Types.INT(), Types.STRING(), Types.STRING()]))
         table = t_env.from_data_stream(ds, col('a'), col('b'), col('c'))
-        t_env.register_table_sink("ExprSink",
-                                  source_sink_utils.TestAppendSink(field_names, field_types))
         table.execute_insert("ExprSink").wait()
         result = source_sink_utils.results()
         self.assert_equals(result, expected)
@@ -631,13 +634,13 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
         t_env = self.t_env
 
         t_env.register_java_function(
-            "scalar_func", "org.apache.flink.table.legacyutils.RichFunc0")
+            "scalar_func", "org.apache.flink.table.utils.TestingFunctions$RichFunc0")
 
         t_env.register_java_function(
-            "agg_func", "org.apache.flink.table.legacyutils.ByteMaxAggFunction")
+            "agg_func", "org.apache.flink.table.utils.TestingFunctions$ByteMaxAggFunction")
 
         t_env.register_java_function(
-            "table_func", "org.apache.flink.table.legacyutils.TableFunc1")
+            "table_func", "org.apache.flink.table.utils.TestingFunctions$TableFunc1")
 
         actual = t_env.list_user_defined_functions()
         expected = ['scalar_func', 'agg_func', 'table_func']
@@ -709,11 +712,11 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
         t_env = self.t_env
 
         t_env.create_java_temporary_system_function(
-            "scalar_func", "org.apache.flink.table.legacyutils.RichFunc0")
+            "scalar_func", "org.apache.flink.table.utils.TestingFunctions$RichFunc0")
         t_env.create_java_function(
-            "agg_func", "org.apache.flink.table.legacyutils.ByteMaxAggFunction")
+            "agg_func", "org.apache.flink.table.utils.TestingFunctions$ByteMaxAggFunction")
         t_env.create_java_temporary_function(
-            "table_func", "org.apache.flink.table.legacyutils.TableFunc1")
+            "table_func", "org.apache.flink.table.utils.TestingFunctions$TableFunc1")
         self.assert_equals(t_env.list_user_defined_functions(),
                            ['scalar_func', 'agg_func', 'table_func'])
 
diff --git a/flink-python/pyflink/table/tests/test_types.py b/flink-python/pyflink/table/tests/test_types.py
index 7520fd68a7a..d3bd37c6ffd 100644
--- a/flink-python/pyflink/table/tests/test_types.py
+++ b/flink-python/pyflink/table/tests/test_types.py
@@ -34,8 +34,8 @@ from pyflink.table.types import (_infer_schema_from_data, _infer_type,
                                  _array_type_mappings, _merge_type,
                                  _create_type_verifier, UserDefinedType, DataTypes, Row, RowField,
                                  RowType, ArrayType, BigIntType, VarCharType, MapType, DataType,
-                                 _to_java_type, _from_java_type, ZonedTimestampType,
-                                 LocalZonedTimestampType)
+                                 _from_java_data_type, ZonedTimestampType,
+                                 LocalZonedTimestampType, _to_java_data_type)
 from pyflink.testing.test_case_utils import PyFlinkTestCase
 
 
@@ -816,9 +816,9 @@ class DataTypeConvertTests(PyFlinkTestCase):
                       DataTypes.TIME(),
                       DataTypes.TIMESTAMP(3)]
 
-        java_types = [_to_java_type(item) for item in test_types]
+        java_types = [_to_java_data_type(item) for item in test_types]
 
-        converted_python_types = [_from_java_type(item) for item in java_types]
+        converted_python_types = [_from_java_data_type(item) for item in java_types]
 
         self.assertEqual(test_types, converted_python_types)
 
@@ -833,7 +833,7 @@ class DataTypeConvertTests(PyFlinkTestCase):
                       JDataTypes.CHAR(50).notNull(),
                       JDataTypes.DECIMAL(20, 10).notNull()]
 
-        converted_python_types = [_from_java_type(item) for item in java_types]
+        converted_python_types = [_from_java_data_type(item) for item in java_types]
 
         expected = [DataTypes.TIME(3, False),
                     DataTypes.TIMESTAMP(3).not_null(),
@@ -844,22 +844,6 @@ class DataTypeConvertTests(PyFlinkTestCase):
                     DataTypes.DECIMAL(20, 10, False)]
         self.assertEqual(converted_python_types, expected)
 
-        # Legacy type tests
-        Types = gateway.jvm.org.apache.flink.table.api.Types
-        InternalBigDecimalTypeInfo = \
-            gateway.jvm.org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo
-
-        java_types = [Types.STRING(),
-                      Types.DECIMAL(),
-                      InternalBigDecimalTypeInfo(12, 5)]
-
-        converted_python_types = [_from_java_type(item) for item in java_types]
-
-        expected = [DataTypes.VARCHAR(2147483647),
-                    DataTypes.DECIMAL(38, 18),
-                    DataTypes.DECIMAL(12, 5)]
-        self.assertEqual(converted_python_types, expected)
-
     def test_array_type(self):
         # nullable/not_null flag will be lost during the conversion.
         test_types = [DataTypes.ARRAY(DataTypes.BIGINT()),
@@ -868,9 +852,9 @@ class DataTypeConvertTests(PyFlinkTestCase):
                       DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.BIGINT())),
                       DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING()))]
 
-        java_types = [_to_java_type(item) for item in test_types]
+        java_types = [_to_java_data_type(item) for item in test_types]
 
-        converted_python_types = [_from_java_type(item) for item in java_types]
+        converted_python_types = [_from_java_data_type(item) for item in java_types]
 
         self.assertEqual(test_types, converted_python_types)
 
@@ -880,9 +864,9 @@ class DataTypeConvertTests(PyFlinkTestCase):
                       DataTypes.MULTISET(DataTypes.MULTISET(DataTypes.BIGINT())),
                       DataTypes.MULTISET(DataTypes.MULTISET(DataTypes.STRING()))]
 
-        java_types = [_to_java_type(item) for item in test_types]
+        java_types = [_to_java_data_type(item) for item in test_types]
 
-        converted_python_types = [_from_java_type(item) for item in java_types]
+        converted_python_types = [_from_java_data_type(item) for item in java_types]
 
         self.assertEqual(test_types, converted_python_types)
 
@@ -894,9 +878,9 @@ class DataTypeConvertTests(PyFlinkTestCase):
                       DataTypes.MAP(DataTypes.STRING(),
                                     DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))]
 
-        java_types = [_to_java_type(item) for item in test_types]
+        java_types = [_to_java_data_type(item) for item in test_types]
 
-        converted_python_types = [_from_java_type(item) for item in java_types]
+        converted_python_types = [_from_java_data_type(item) for item in java_types]
 
         self.assertEqual(test_types, converted_python_types)
 
@@ -907,9 +891,9 @@ class DataTypeConvertTests(PyFlinkTestCase):
                                                          [DataTypes.FIELD("c",
                                                                           DataTypes.STRING())]))])]
 
-        java_types = [_to_java_type(item) for item in test_types]
+        java_types = [_to_java_data_type(item) for item in test_types]
 
-        converted_python_types = [_from_java_type(item) for item in java_types]
+        converted_python_types = [_from_java_data_type(item) for item in java_types]
 
         self.assertEqual(test_types, converted_python_types)
 
@@ -917,9 +901,19 @@ class DataTypeConvertTests(PyFlinkTestCase):
         test_types = [DataTypes.LIST_VIEW(DataTypes.BIGINT()),
                       DataTypes.LIST_VIEW(DataTypes.STRING())]
 
-        java_types = [_to_java_type(item) for item in test_types]
+        java_types = [_to_java_data_type(item) for item in test_types]
+
+        converted_python_types = [_from_java_data_type(item) for item in java_types]
+
+        self.assertEqual(test_types, converted_python_types)
+
+    def test_map_view_type(self):
+        test_types = [DataTypes.MAP_VIEW(DataTypes.STRING(), DataTypes.BIGINT()),
+                      DataTypes.MAP_VIEW(DataTypes.INT(), DataTypes.STRING())]
+
+        java_types = [_to_java_data_type(item) for item in test_types]
 
-        converted_python_types = [_from_java_type(item) for item in java_types]
+        converted_python_types = [_from_java_data_type(item) for item in java_types]
 
         self.assertEqual(test_types, converted_python_types)
 
diff --git a/flink-python/pyflink/table/tests/test_udaf.py b/flink-python/pyflink/table/tests/test_udaf.py
index 59615092f1e..e6f96527ff7 100644
--- a/flink-python/pyflink/table/tests/test_udaf.py
+++ b/flink-python/pyflink/table/tests/test_udaf.py
@@ -570,15 +570,11 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
         t = self.t_env.from_path("source_table")
 
         from pyflink.testing import source_sink_utils
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd', 'e'],
-            [
-                DataTypes.TINYINT(),
-                DataTypes.TIMESTAMP(3),
-                DataTypes.TIMESTAMP(3),
-                DataTypes.BIGINT(),
-                DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TINYINT, b TIMESTAMP(3), c TIMESTAMP(3), d BIGINT, e BIGINT)
+        WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         t.window(Tumble.over(lit(1).hours).on(t.rowtime).alias("w")) \
             .group_by(t.a, col("w")) \
             .select(t.a,
@@ -590,10 +586,10 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
             .wait()
         actual = source_sink_utils.results()
         self.assert_equals(actual,
-                           ["+I[2, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2, 1]",
-                            "+I[3, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 1, 1]",
-                            "+I[1, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2, 2]",
-                            "+I[1, 2018-03-11 04:00:00.0, 2018-03-11 05:00:00.0, 1, 1]"])
+                           ["+I[2, 2018-03-11T03:00, 2018-03-11T04:00, 2, 1]",
+                            "+I[3, 2018-03-11T03:00, 2018-03-11T04:00, 1, 1]",
+                            "+I[1, 2018-03-11T03:00, 2018-03-11T04:00, 2, 2]",
+                            "+I[1, 2018-03-11T04:00, 2018-03-11T05:00, 1, 1]"])
 
     def test_tumbling_group_window_over_count(self):
         self.t_env.get_config().set("parallelism.default", "1")
@@ -633,12 +629,11 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
         t = self.t_env.from_path("source_table")
 
         from pyflink.testing import source_sink_utils
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'd'],
-            [
-                DataTypes.TINYINT(),
-                DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TINYINT, d BIGINT)
+        WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         t.window(Tumble.over(row_interval(2)).on(t.protime).alias("w")) \
             .group_by(t.a, col("w")) \
             .select(t.a, call("my_sum", t.c).alias("b")) \
@@ -684,14 +679,11 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
         t = self.t_env.from_path("source_table")
 
         from pyflink.testing import source_sink_utils
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd'],
-            [
-                DataTypes.TINYINT(),
-                DataTypes.TIMESTAMP(3),
-                DataTypes.TIMESTAMP(3),
-                DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TINYINT, b TIMESTAMP(3), c TIMESTAMP(3), d BIGINT)
+        WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         t.window(Slide.over(lit(1).hours)
                  .every(lit(30).minutes)
                  .on(t.rowtime)
@@ -702,15 +694,15 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
             .wait()
         actual = source_sink_utils.results()
         self.assert_equals(actual,
-                           ["+I[1, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 2]",
-                            "+I[2, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 1]",
-                            "+I[3, 2018-03-11 02:30:00.0, 2018-03-11 03:30:00.0, 2]",
-                            "+I[1, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 5]",
-                            "+I[3, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2]",
-                            "+I[2, 2018-03-11 03:00:00.0, 2018-03-11 04:00:00.0, 2]",
-                            "+I[2, 2018-03-11 03:30:00.0, 2018-03-11 04:30:00.0, 1]",
-                            "+I[1, 2018-03-11 03:30:00.0, 2018-03-11 04:30:00.0, 11]",
-                            "+I[1, 2018-03-11 04:00:00.0, 2018-03-11 05:00:00.0, 8]"])
+                           ["+I[1, 2018-03-11T02:30, 2018-03-11T03:30, 2]",
+                            "+I[2, 2018-03-11T02:30, 2018-03-11T03:30, 1]",
+                            "+I[3, 2018-03-11T02:30, 2018-03-11T03:30, 2]",
+                            "+I[1, 2018-03-11T03:00, 2018-03-11T04:00, 5]",
+                            "+I[3, 2018-03-11T03:00, 2018-03-11T04:00, 2]",
+                            "+I[2, 2018-03-11T03:00, 2018-03-11T04:00, 2]",
+                            "+I[2, 2018-03-11T03:30, 2018-03-11T04:30, 1]",
+                            "+I[1, 2018-03-11T03:30, 2018-03-11T04:30, 11]",
+                            "+I[1, 2018-03-11T04:00, 2018-03-11T05:00, 8]"])
 
     def test_sliding_group_window_over_count(self):
         self.t_env.get_config().set("parallelism.default", "1")
@@ -750,12 +742,10 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
         t = self.t_env.from_path("source_table")
 
         from pyflink.testing import source_sink_utils
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'd'],
-            [
-                DataTypes.TINYINT(),
-                DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TINYINT, d BIGINT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         t.window(Slide.over(row_interval(2)).every(row_interval(1)).on(t.protime).alias("w")) \
             .group_by(t.a, col("w")) \
             .select(t.a, call("my_sum", t.c).alias("b")) \
@@ -801,14 +791,11 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
         t = self.t_env.from_path("source_table")
 
         from pyflink.testing import source_sink_utils
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd'],
-            [
-                DataTypes.TINYINT(),
-                DataTypes.TIMESTAMP(3),
-                DataTypes.TIMESTAMP(3),
-                DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TINYINT, b TIMESTAMP(3), c TIMESTAMP(3), d BIGINT)
+        WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         t.window(Session.with_gap(lit(30).minutes).on(t.rowtime).alias("w")) \
             .group_by(t.a, t.b, col("w")) \
             .select(t.a, col("w").start, col("w").end, call("my_count", t.c).alias("c")) \
@@ -816,10 +803,10 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
             .wait()
         actual = source_sink_utils.results()
         self.assert_equals(actual,
-                           ["+I[3, 2018-03-11 03:10:00.0, 2018-03-11 03:40:00.0, 1]",
-                            "+I[2, 2018-03-11 03:10:00.0, 2018-03-11 04:00:00.0, 2]",
-                            "+I[1, 2018-03-11 03:10:00.0, 2018-03-11 04:10:00.0, 2]",
-                            "+I[1, 2018-03-11 04:20:00.0, 2018-03-11 04:50:00.0, 1]"])
+                           ["+I[3, 2018-03-11T03:10, 2018-03-11T03:40, 1]",
+                            "+I[2, 2018-03-11T03:10, 2018-03-11T04:00, 2]",
+                            "+I[1, 2018-03-11T03:10, 2018-03-11T04:10, 2]",
+                            "+I[1, 2018-03-11T04:20, 2018-03-11T04:50, 1]"])
 
     @unittest.skip("Python UDFs are currently unsupported in JSON plan")
     def test_execute_group_aggregate_from_json_plan(self):
diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py
index 91e418e9083..c53f8cbe08c 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -60,11 +60,11 @@ class UserDefinedFunctionTests(object):
                 assert os.environ['_PYTHON_WORKER_MEMORY_LIMIT'] is not None
             return 1
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd', 'e', 'f', 'g'],
-            [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(),
-             DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+            CREATE TABLE Results(a BIGINT, b BIGINT, c BIGINT, d BIGINT, e BIGINT, f BIGINT,
+             g BIGINT) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
 
         execution_mode = self.t_env.get_config().get("python.execution-mode", "process")
 
@@ -80,10 +80,10 @@ class UserDefinedFunctionTests(object):
         add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
         subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c'],
-            [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.INT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+                CREATE TABLE Results(a BIGINT, b BIGINT, c INT) WITH ('connector'='test-sink')
+                """
+        self.t_env.execute_sql(sink_table_ddl)
 
         t = self.t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
         t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1)) \
@@ -97,11 +97,11 @@ class UserDefinedFunctionTests(object):
 
         f = udf(lambda i: i, result_type=DataTypes.BIGINT())
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd'],
-            [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.BIGINT(), DataTypes.STRING()])
-        self.t_env.register_table_sink("Results", table_sink)
-
+        sink_table_ddl = """
+            CREATE TABLE Results(a BIGINT, b STRING, c BIGINT, d StRING)
+            WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
         t1.join(t2).where(f(t1.a) == t2.c).execute_insert("Results").wait()
         actual = source_sink_utils.results()
         self.assert_equals(actual, ["+I[2, Hi, 2, Flink]"])
@@ -112,10 +112,10 @@ class UserDefinedFunctionTests(object):
 
         f = udf(lambda i: i, result_type=DataTypes.BIGINT())
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd'],
-            [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.BIGINT(), DataTypes.STRING()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a BIGINT, b STRING, c BIGINT, d STRING) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
 
         t1.join(t2).where(f(t1.a) == f(t2.c)).execute_insert("Results").wait()
         actual = source_sink_utils.results()
@@ -180,9 +180,10 @@ class UserDefinedFunctionTests(object):
             "udf_with_all_constant_params", udf(lambda i, j: i + j,
                                                 result_type=DataTypes.BIGINT()))
 
-        table_sink = source_sink_utils.TestAppendSink(['a', 'b'],
-                                                      [DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+                CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+                """
+        self.t_env.execute_sql(sink_table_ddl)
 
         t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
         self.t_env.register_table("test_table", t)
@@ -212,8 +213,10 @@ class UserDefinedFunctionTests(object):
             "plus", udf(lambda i, j: i + j - 1,
                         result_type=DataTypes.BIGINT()))
 
-        table_sink = source_sink_utils.TestAppendSink(['a'], [DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+                        CREATE TABLE Results(a BIGINT) WITH ('connector'='test-sink')
+                        """
+        self.t_env.execute_sql(sink_table_ddl)
 
         t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
         t.select(t.a + t.b).execute_insert("Results").wait()
@@ -229,9 +232,10 @@ class UserDefinedFunctionTests(object):
         else:
             subtract = udf(Subtract(), result_type=DataTypes.BIGINT())
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+                        CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+                        """
+        self.t_env.execute_sql(sink_table_ddl)
 
         t = self.t_env.from_elements([(1, 2), (2, 5), (3, 4)], ['a', 'b'])
         t.select(t.a, subtract(t.b)).execute_insert("Results").wait()
@@ -242,9 +246,10 @@ class UserDefinedFunctionTests(object):
         one = udf(lambda: 1, result_type=DataTypes.BIGINT(), deterministic=True)
         two = udf(lambda: 2, result_type=DataTypes.BIGINT(), deterministic=False)
 
-        table_sink = source_sink_utils.TestAppendSink(['a', 'b'],
-                                                      [DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+                        CREATE TABLE Results(a BIGINT, b BIGINT) WITH ('connector'='test-sink')
+                        """
+        self.t_env.execute_sql(sink_table_ddl)
 
         t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
         t.select(one(), two()).execute_insert("Results").wait()
@@ -360,16 +365,13 @@ class UserDefinedFunctionTests(object):
                 'decimal_param is wrong value %s !' % decimal_param
             return decimal_param
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q'],
-            [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.TINYINT(),
-             DataTypes.BOOLEAN(), DataTypes.SMALLINT(), DataTypes.INT(),
-             DataTypes.FLOAT(), DataTypes.DOUBLE(), DataTypes.BYTES(),
-             DataTypes.STRING(), DataTypes.DATE(), DataTypes.TIME(),
-             DataTypes.TIMESTAMP(3), DataTypes.ARRAY(DataTypes.BIGINT()),
-             DataTypes.MAP(DataTypes.BIGINT(), DataTypes.STRING()),
-             DataTypes.DECIMAL(38, 18), DataTypes.DECIMAL(38, 18)])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+            CREATE TABLE Results(
+            a BIGINT, b BIGINT, c TINYINT, d BOOLEAN, e SMALLINT, f INT, g FLOAT, h DOUBLE, i BYTES,
+            j STRING, k DATE, l TIME, m TIMESTAMP(3), n ARRAY<BIGINT>, o MAP<BIGINT, STRING>,
+            p DECIMAL(38, 18), q DECIMAL(38, 18)) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
 
         import datetime
         import decimal
@@ -422,8 +424,8 @@ class UserDefinedFunctionTests(object):
         # Currently the sink result precision of DataTypes.TIME(precision) only supports 0.
         self.assert_equals(actual,
                            ["+I[1, null, 1, true, 32767, -2147483648, 1.23, 1.98932, "
-                            "[102, 108, 105, 110, 107], pyflink, 2014-09-13, "
-                            "12:00:00, 2018-03-11 03:00:00.123, [1, 2, 3], "
+                            "[102, 108, 105, 110, 107], pyflink, 2014-09-13, 12:00:00.123, "
+                            "2018-03-11T03:00:00.123, [1, 2, 3], "
                             "{1=flink, 2=pyflink}, 1000000000000000000.050000000000000000, "
                             "1000000000000000000.059999999999999999]"])
 
@@ -571,16 +573,14 @@ class UserDefinedFunctionTests(object):
         self.t_env.register_function(
             "decimal_cut_func", udf(decimal_cut_func, result_type=DataTypes.DECIMAL(38, 18)))
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q'],
-            [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.TINYINT(),
-             DataTypes.BOOLEAN(), DataTypes.SMALLINT(), DataTypes.INT(),
-             DataTypes.FLOAT(), DataTypes.DOUBLE(), DataTypes.BYTES(),
-             DataTypes.STRING(), DataTypes.DATE(), DataTypes.TIME(),
-             DataTypes.TIMESTAMP(3), DataTypes.ARRAY(DataTypes.BIGINT()),
-             DataTypes.MAP(DataTypes.BIGINT(), DataTypes.STRING()),
-             DataTypes.DECIMAL(38, 18), DataTypes.DECIMAL(38, 18)])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+            CREATE TABLE Results(
+            a BIGINT, b BIGINT, c TINYINT, d BOOLEAN, e SMALLINT, f INT, g FLOAT, h DOUBLE,
+            i BYTES, j STRING, k DATE, l TIME, m TIMESTAMP(3), n ARRAY<BIGINT>,
+            o MAP<BIGINT, STRING>, p DECIMAL(38, 18), q DECIMAL(38, 18))
+            WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
 
         import datetime
         import decimal
@@ -625,7 +625,7 @@ class UserDefinedFunctionTests(object):
         self.assert_equals(actual,
                            ["+I[1, null, 1, true, 32767, -2147483648, 1.23, 1.98932, "
                             "[102, 108, 105, 110, 107], pyflink, 2014-09-13, "
-                            "12:00:00, 2018-03-11 03:00:00.123, [1, 2, 3], "
+                            "12:00:00.123, 2018-03-11T03:00:00.123, [1, 2, 3], "
                             "{1=flink, 2=pyflink}, 1000000000000000000.050000000000000000, "
                             "1000000000000000000.059999999999999999]"])
 
@@ -726,9 +726,10 @@ class PyFlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
                 'local_zoned_timestamp_param is wrong value %s !' % local_zoned_timestamp_param
             return local_zoned_timestamp_param
 
-        table_sink = source_sink_utils.TestAppendSink(
-            ['a'], [DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
-        self.t_env.register_table_sink("Results", table_sink)
+        sink_table_ddl = """
+        CREATE TABLE Results(a TIMESTAMP_LTZ(3)) WITH ('connector'='test-sink')
+        """
+        self.t_env.execute_sql(sink_table_ddl)
 
         t = self.t_env.from_elements(
             [(local_datetime,)],
diff --git a/flink-python/pyflink/table/tests/test_udtf.py b/flink-python/pyflink/table/tests/test_udtf.py
index 9b1a011995f..087d56de200 100644
--- a/flink-python/pyflink/table/tests/test_udtf.py
+++ b/flink-python/pyflink/table/tests/test_udtf.py
@@ -28,9 +28,8 @@ from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, \
 class UserDefinedTableFunctionTests(object):
 
     def test_table_function(self):
-        self._register_table_sink(
-            ['a', 'b', 'c'],
-            [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()])
+        self._register_table_sink("""CREATE TABLE Results(a BIGINT, b BIGINT, c BIGINT)
+                WITH ('connector'='test-sink')""")
 
         multi_emit = udtf(MultiEmit(), result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()])
         multi_num = udf(MultiNum(), result_type=DataTypes.BIGINT())
@@ -48,9 +47,8 @@ class UserDefinedTableFunctionTests(object):
                             "+I[3, 1, 2]", "+I[3, 2, 2]", "+I[3, 3, null]"])
 
     def test_table_function_with_sql_query(self):
-        self._register_table_sink(
-            ['a', 'b', 'c'],
-            [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()])
+        self._register_table_sink("""CREATE TABLE Results(a BIGINT, b BIGINT, c BIGINT)
+                        WITH ('connector'='test-sink')""")
 
         self.t_env.create_temporary_system_function(
             "multi_emit", udtf(MultiEmit(), result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()]))
@@ -63,9 +61,8 @@ class UserDefinedTableFunctionTests(object):
         actual = self._get_output(t)
         self.assert_equals(actual, ["+I[1, 1, 0]", "+I[2, 2, 0]", "+I[3, 3, 0]", "+I[3, 3, 1]"])
 
-    def _register_table_sink(self, field_names: list, field_types: list):
-        table_sink = source_sink_utils.TestAppendSink(field_names, field_types)
-        self.t_env.register_table_sink("Results", table_sink)
+    def _register_table_sink(self, ddl: str):
+        self.t_env.execute_sql(ddl)
 
     def _get_output(self, t):
         t.execute_insert("Results").wait()
diff --git a/flink-python/pyflink/table/types.py b/flink-python/pyflink/table/types.py
index 44458da9b3c..add5b042f7d 100644
--- a/flink-python/pyflink/table/types.py
+++ b/flink-python/pyflink/table/types.py
@@ -1681,161 +1681,15 @@ _primitive_array_element_types = {BooleanType, TinyIntType, SmallIntType, IntTyp
                                   FloatType, DoubleType}
 
 
-def _to_java_type(data_type):
+def _from_java_data_type(j_data_type):
     """
-    Converts Python type to Java TypeInformation.
-    """
-
-    global _python_java_types_mapping
-    global _python_java_types_mapping_lock
-
-    gateway = get_gateway()
-    Types = gateway.jvm.org.apache.flink.table.api.Types
-
-    if _python_java_types_mapping is None:
-        with _python_java_types_mapping_lock:
-            _python_java_types_mapping = {
-                BooleanType: Types.BOOLEAN(),
-                TinyIntType: Types.BYTE(),
-                SmallIntType: Types.SHORT(),
-                IntType: Types.INT(),
-                BigIntType: Types.LONG(),
-                FloatType: Types.FLOAT(),
-                DoubleType: Types.DOUBLE(),
-                DateType: Types.SQL_DATE(),
-            }
-
-    # basic types
-    if type(data_type) in _python_java_types_mapping:
-        return _python_java_types_mapping[type(data_type)]
-
-    # DecimalType
-    elif isinstance(data_type, DecimalType):
-        if data_type.precision == 38 and data_type.scale == 18:
-            return Types.DECIMAL()
-        else:
-            raise TypeError("The precision must be 38 and the scale must be 18 for DecimalType, "
-                            "got %s" % repr(data_type))
-
-    # TimeType
-    elif isinstance(data_type, TimeType):
-        if data_type.precision == 0:
-            return Types.SQL_TIME()
-        else:
-            raise TypeError("The precision must be 0 for TimeType, got %s" % repr(data_type))
-
-    # TimestampType
-    elif isinstance(data_type, TimestampType):
-        if data_type.precision == 3:
-            return Types.SQL_TIMESTAMP()
-        else:
-            raise TypeError("The precision must be 3 for TimestampType, got %s" % repr(data_type))
-
-    # LocalZonedTimestampType
-    elif isinstance(data_type, LocalZonedTimestampType):
-        if data_type.precision == 3:
-            return gateway.jvm.org.apache.flink.api.common.typeinfo.Types.INSTANT
-        else:
-            raise TypeError("The precision must be 3 for LocalZonedTimestampType, got %s"
-                            % repr(data_type))
-
-    # VarCharType
-    elif isinstance(data_type, VarCharType):
-        if data_type.length == 0x7fffffff:
-            return Types.STRING()
-        else:
-            raise TypeError("The length limit must be 0x7fffffff(2147483647) for VarCharType, "
-                            "got %s" % repr(data_type))
-
-    # VarBinaryType
-    elif isinstance(data_type, VarBinaryType):
-        if data_type.length == 0x7fffffff:
-            return Types.PRIMITIVE_ARRAY(Types.BYTE())
-        else:
-            raise TypeError("The length limit must be 0x7fffffff(2147483647) for VarBinaryType, "
-                            "got %s" % repr(data_type))
-
-    # YearMonthIntervalType
-    elif isinstance(data_type, YearMonthIntervalType):
-        if data_type.resolution == YearMonthIntervalType.YearMonthResolution.MONTH and \
-                data_type.precision == 2:
-            return Types.INTERVAL_MONTHS()
-        else:
-            raise TypeError("The resolution must be YearMonthResolution.MONTH and the precision "
-                            "must be 2 for YearMonthIntervalType, got %s" % repr(data_type))
-
-    # DayTimeIntervalType
-    elif isinstance(data_type, DayTimeIntervalType):
-        if data_type.resolution == DayTimeIntervalType.DayTimeResolution.SECOND and \
-                data_type.day_precision == 2 and data_type.fractional_precision == 3:
-            return Types.INTERVAL_MILLIS()
-        else:
-            raise TypeError("The resolution must be DayTimeResolution.SECOND, the day_precision "
-                            "must be 2 and the fractional_precision must be 3 for "
-                            "DayTimeIntervalType, got %s" % repr(data_type))
-
-    # ArrayType
-    elif isinstance(data_type, ArrayType):
-        return Types.OBJECT_ARRAY(_to_java_type(data_type.element_type))
-
-    # ListViewType
-    elif isinstance(data_type, ListViewType):
-        return gateway.jvm.org.apache.flink.table.dataview.ListViewTypeInfo(_to_java_type(
-            data_type._element_type))
-
-    # MapType
-    elif isinstance(data_type, MapType):
-        return Types.MAP(_to_java_type(data_type.key_type), _to_java_type(data_type.value_type))
-
-    # MapViewType
-    elif isinstance(data_type, MapViewType):
-        return gateway.jvm.org.apache.flink.table.dataview.MapViewTypeInfo(
-            _to_java_type(data_type._key_type), _to_java_type(data_type._value_type))
-
-    # MultisetType
-    elif isinstance(data_type, MultisetType):
-        return Types.MULTISET(_to_java_type(data_type.element_type))
-
-    # RowType
-    elif isinstance(data_type, RowType):
-        return Types.ROW(
-            to_jarray(gateway.jvm.String, data_type.field_names()),
-            to_jarray(gateway.jvm.TypeInformation,
-                      [_to_java_type(f.data_type) for f in data_type.fields]))
-
-    # UserDefinedType
-    elif isinstance(data_type, UserDefinedType):
-        if data_type.java_udt():
-            return gateway.jvm.org.apache.flink.util.InstantiationUtil.instantiate(
-                gateway.jvm.Class.forName(
-                    data_type.java_udt(),
-                    True,
-                    gateway.jvm.Thread.currentThread().getContextClassLoader()))
-        else:
-            return _to_java_type(data_type.sql_type())
-
-    else:
-        raise TypeError("Not supported type: %s" % repr(data_type))
-
-
-def _from_java_type(j_data_type):
-    """
-    Converts Java TypeInformation to Python DataType.
+    Converts Java DataType to Python DataType.
     """
     gateway = get_gateway()
 
-    if is_instance_of(j_data_type, gateway.jvm.TypeInformation):
-        # input is TypeInformation
-        LegacyTypeInfoDataTypeConverter = \
-            gateway.jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
-        java_data_type = LegacyTypeInfoDataTypeConverter.toDataType(j_data_type)
-    else:
-        # input is DataType
-        java_data_type = j_data_type
-
     # Atomic Type with parameters.
-    if is_instance_of(java_data_type, gateway.jvm.AtomicDataType):
-        logical_type = java_data_type.getLogicalType()
+    if is_instance_of(j_data_type, gateway.jvm.AtomicDataType):
+        logical_type = j_data_type.getLogicalType()
         if is_instance_of(logical_type, gateway.jvm.CharType):
             data_type = DataTypes.CHAR(logical_type.getLength(), logical_type.isNullable())
         elif is_instance_of(logical_type, gateway.jvm.VarCharType):
@@ -1892,12 +1746,12 @@ def _from_java_type(j_data_type):
                 data_type = DataTypes.DECIMAL(type_info.precision(), type_info.scale())
             elif type_info.getClass() == \
                     get_java_class(gateway.jvm.org.apache.flink.table.dataview.ListViewTypeInfo):
-                data_type = DataTypes.LIST_VIEW(_from_java_type(type_info.getElementType()))
+                data_type = DataTypes.LIST_VIEW(_from_java_data_type(type_info.getElementType()))
             elif type_info.getClass() == \
                     get_java_class(gateway.jvm.org.apache.flink.table.dataview.MapViewTypeInfo):
                 data_type = DataTypes.MAP_VIEW(
-                    _from_java_type(type_info.getKeyType()),
-                    _from_java_type(type_info.getValueType()))
+                    _from_java_data_type(type_info.getKeyType()),
+                    _from_java_data_type(type_info.getValueType()))
             else:
                 raise TypeError("Unsupported type: %s, it is recognized as a legacy type."
                                 % type_info)
@@ -1910,13 +1764,14 @@ def _from_java_type(j_data_type):
         return data_type
 
     # Array Type, MultiSet Type.
-    elif is_instance_of(java_data_type, gateway.jvm.CollectionDataType):
-        logical_type = java_data_type.getLogicalType()
-        element_type = java_data_type.getElementDataType()
+    elif is_instance_of(j_data_type, gateway.jvm.CollectionDataType):
+        logical_type = j_data_type.getLogicalType()
+        element_type = j_data_type.getElementDataType()
         if is_instance_of(logical_type, gateway.jvm.ArrayType):
-            data_type = DataTypes.ARRAY(_from_java_type(element_type), logical_type.isNullable())
+            data_type = DataTypes.ARRAY(_from_java_data_type(element_type),
+                                        logical_type.isNullable())
         elif is_instance_of(logical_type, gateway.jvm.MultisetType):
-            data_type = DataTypes.MULTISET(_from_java_type(element_type),
+            data_type = DataTypes.MULTISET(_from_java_data_type(element_type),
                                            logical_type.isNullable())
         else:
             raise TypeError("Unsupported collection data type: %s" % j_data_type)
@@ -1924,14 +1779,14 @@ def _from_java_type(j_data_type):
         return data_type
 
     # Map Type.
-    elif is_instance_of(java_data_type, gateway.jvm.KeyValueDataType):
-        logical_type = java_data_type.getLogicalType()
-        key_type = java_data_type.getKeyDataType()
-        value_type = java_data_type.getValueDataType()
+    elif is_instance_of(j_data_type, gateway.jvm.KeyValueDataType):
+        logical_type = j_data_type.getLogicalType()
+        key_type = j_data_type.getKeyDataType()
+        value_type = j_data_type.getValueDataType()
         if is_instance_of(logical_type, gateway.jvm.MapType):
             data_type = DataTypes.MAP(
-                _from_java_type(key_type),
-                _from_java_type(value_type),
+                _from_java_data_type(key_type),
+                _from_java_data_type(value_type),
                 logical_type.isNullable())
         else:
             raise TypeError("Unsupported map data type: %s" % j_data_type)
@@ -1939,13 +1794,21 @@ def _from_java_type(j_data_type):
         return data_type
 
     # Row Type.
-    elif is_instance_of(java_data_type, gateway.jvm.FieldsDataType):
-        logical_type = java_data_type.getLogicalType()
-        field_data_types = java_data_type.getChildren()
+    elif is_instance_of(j_data_type, gateway.jvm.FieldsDataType):
+        logical_type = j_data_type.getLogicalType()
+        field_data_types = j_data_type.getChildren()
         if is_instance_of(logical_type, gateway.jvm.RowType):
-            fields = [DataTypes.FIELD(name, _from_java_type(field_data_types[idx]))
+            fields = [DataTypes.FIELD(name, _from_java_data_type(field_data_types[idx]))
                       for idx, name in enumerate(logical_type.getFieldNames())]
             data_type = DataTypes.ROW(fields, logical_type.isNullable())
+        elif j_data_type.getConversionClass().isAssignableFrom(
+                gateway.jvm.org.apache.flink.table.api.dataview.ListView._java_lang_class):
+            array_type = _from_java_data_type(field_data_types[0])
+            data_type = DataTypes.LIST_VIEW(array_type.element_type)
+        elif j_data_type.getConversionClass().isAssignableFrom(
+                gateway.jvm.org.apache.flink.table.api.dataview.MapView._java_lang_class):
+            map_type = _from_java_data_type(field_data_types[0])
+            data_type = DataTypes.MAP_VIEW(map_type.key_type, map_type.value_type)
         else:
             raise TypeError("Unsupported row data type: %s" % j_data_type)
 
@@ -2007,6 +1870,15 @@ def _to_java_data_type(data_type: DataType):
         fields = [JDataTypes.FIELD(f.name, _to_java_data_type(f.data_type))
                   for f in data_type.fields]
         j_data_type = JDataTypes.ROW(to_jarray(JDataTypes.Field, fields))
+    elif isinstance(data_type, UserDefinedType):
+        if data_type.java_udt():
+            return gateway.jvm.org.apache.flink.util.InstantiationUtil.instantiate(
+                gateway.jvm.Class.forName(
+                    data_type.java_udt(),
+                    True,
+                    gateway.jvm.Thread.currentThread().getContextClassLoader()))
+        else:
+            return _to_java_data_type(data_type.sql_type())
     elif isinstance(data_type, MultisetType):
         j_data_type = JDataTypes.MULTISET(_to_java_data_type(data_type.element_type))
     elif isinstance(data_type, NullType):
diff --git a/flink-python/pyflink/table/udf.py b/flink-python/pyflink/table/udf.py
index 5d38617bc03..50ff1839b51 100644
--- a/flink-python/pyflink/table/udf.py
+++ b/flink-python/pyflink/table/udf.py
@@ -24,7 +24,7 @@ from typing import Union, List, Type, Callable, TypeVar, Generic, Iterable
 from pyflink.java_gateway import get_gateway
 from pyflink.metrics import MetricGroup
 from pyflink.table import Expression
-from pyflink.table.types import DataType, _to_java_type, _to_java_data_type
+from pyflink.table.types import DataType, _to_java_data_type
 from pyflink.util import java_utils
 
 __all__ = ['FunctionContext', 'AggregateFunction', 'ScalarFunction', 'TableFunction',
@@ -378,7 +378,7 @@ class UserDefinedFunctionWrapper(object):
 
             if self._input_types is not None:
                 j_input_types = java_utils.to_jarray(
-                    gateway.jvm.TypeInformation, [_to_java_type(i) for i in self._input_types])
+                    gateway.jvm.DataType, [_to_java_data_type(i) for i in self._input_types])
             else:
                 j_input_types = None
             j_function_kind = get_python_function_kind()
@@ -416,7 +416,7 @@ class UserDefinedScalarFunctionWrapper(UserDefinedFunctionWrapper):
 
     def _create_judf(self, serialized_func, j_input_types, j_function_kind):
         gateway = get_gateway()
-        j_result_type = _to_java_type(self._result_type)
+        j_result_type = _to_java_data_type(self._result_type)
         PythonScalarFunction = gateway.jvm \
             .org.apache.flink.table.functions.python.PythonScalarFunction
         j_scalar_function = PythonScalarFunction(
@@ -458,9 +458,9 @@ class UserDefinedTableFunctionWrapper(UserDefinedFunctionWrapper):
 
     def _create_judf(self, serialized_func, j_input_types, j_function_kind):
         gateway = get_gateway()
-        j_result_types = java_utils.to_jarray(gateway.jvm.TypeInformation,
-                                              [_to_java_type(i) for i in self._result_types])
-        j_result_type = gateway.jvm.org.apache.flink.api.java.typeutils.RowTypeInfo(j_result_types)
+        j_result_types = java_utils.to_jarray(gateway.jvm.DataType,
+                                              [_to_java_data_type(i) for i in self._result_types])
+        j_result_type = gateway.jvm.DataTypes.ROW(j_result_types)
         PythonTableFunction = gateway.jvm \
             .org.apache.flink.table.functions.python.PythonTableFunction
         j_table_function = PythonTableFunction(
diff --git a/flink-python/pyflink/testing/source_sink_utils.py b/flink-python/pyflink/testing/source_sink_utils.py
index e37fed9eb70..3f713bf49e1 100644
--- a/flink-python/pyflink/testing/source_sink_utils.py
+++ b/flink-python/pyflink/testing/source_sink_utils.py
@@ -24,7 +24,7 @@ from py4j.java_gateway import java_import
 from pyflink.find_flink_home import _find_flink_source_root
 from pyflink.java_gateway import get_gateway
 from pyflink.table.sinks import TableSink
-from pyflink.table.types import _to_java_type
+from pyflink.table.types import _to_java_data_type
 from pyflink.util import java_utils
 
 
@@ -35,13 +35,7 @@ class TestTableSink(TableSink):
 
     _inited = False
 
-    def __init__(self, j_table_sink, field_names, field_types):
-        gateway = get_gateway()
-        j_field_names = java_utils.to_jarray(gateway.jvm.String, field_names)
-        j_field_types = java_utils.to_jarray(
-            gateway.jvm.TypeInformation,
-            [_to_java_type(field_type) for field_type in field_types])
-        j_table_sink = j_table_sink.configure(j_field_names, j_field_types)
+    def __init__(self, j_table_sink):
         super(TestTableSink, self).__init__(j_table_sink)
 
     @classmethod
@@ -57,10 +51,8 @@ class TestTableSink(TableSink):
                 "'flink-python*-tests.jar' is not available. Will skip the related tests.")
 
         gateway = get_gateway()
-        java_import(gateway.jvm, "org.apache.flink.table.legacyutils.TestAppendSink")
-        java_import(gateway.jvm, "org.apache.flink.table.legacyutils.TestRetractSink")
-        java_import(gateway.jvm, "org.apache.flink.table.legacyutils.TestUpsertSink")
-        java_import(gateway.jvm, "org.apache.flink.table.legacyutils.RowCollector")
+        java_import(gateway.jvm, "org.apache.flink.table.utils.TestingSinks$TestAppendingSink")
+        java_import(gateway.jvm, "org.apache.flink.table.utils.TestingSinks$RowCollector")
 
         TestTableSink._inited = True
 
@@ -72,10 +64,13 @@ class TestAppendSink(TestTableSink):
 
     def __init__(self, field_names, field_types):
         TestTableSink._ensure_initialized()
-
         gateway = get_gateway()
-        super(TestAppendSink, self).__init__(
-            gateway.jvm.TestAppendSink(), field_names, field_types)
+        j_field_names = java_utils.to_jarray(gateway.jvm.String, field_names)
+        j_field_types = java_utils.to_jarray(
+            gateway.jvm.DataType,
+            [_to_java_data_type(field_type) for field_type in field_types])
+        super(TestAppendSink, self).__init__(gateway.jvm.org.apache.flink.table.utils.TestingSinks
+                                             .TestAppendingSink(j_field_names, j_field_types))
 
 
 class TestRetractSink(TestTableSink):
@@ -85,10 +80,13 @@ class TestRetractSink(TestTableSink):
 
     def __init__(self, field_names, field_types):
         TestTableSink._ensure_initialized()
-
         gateway = get_gateway()
-        super(TestRetractSink, self).__init__(
-            gateway.jvm.TestRetractSink(), field_names, field_types)
+        j_field_names = java_utils.to_jarray(gateway.jvm.String, field_names)
+        j_field_types = java_utils.to_jarray(
+            gateway.jvm.DataType,
+            [_to_java_data_type(field_type) for field_type in field_types])
+        super(TestRetractSink, self).__init__(gateway.jvm.org.apache.flink.table.utils.TestingSinks
+                                              .TestAppendingSink(j_field_names, j_field_types))
 
 
 class TestUpsertSink(TestTableSink):
@@ -120,8 +118,9 @@ def retract_results():
     Retrieves the results from a retract table sink.
     """
     gateway = get_gateway()
-    results = gateway.jvm.RowCollector.getAndClearValues()
-    return gateway.jvm.RowCollector.retractResults(results)
+    results = gateway.jvm.org.apache.flink.table.utils.TestingSinks.RowCollector.getAndClearValues()
+    return gateway.jvm\
+        .org.apache.flink.table.utils.TestingSinks.RowCollector.retractResults(results)
 
 
 def upsert_results(keys):
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
index 595167dd694..5bb30ae8f82 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
@@ -20,11 +20,13 @@ package org.apache.flink.streaming.api.utils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.BigIntSerializer;
 import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
@@ -47,6 +49,7 @@ import org.apache.flink.api.common.typeutils.base.array.FloatPrimitiveArraySeria
 import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer;
 import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer;
+import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.ListTypeInfo;
 import org.apache.flink.api.java.typeutils.MapTypeInfo;
@@ -55,6 +58,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 import org.apache.flink.streaming.api.typeinfo.python.PickledByteArrayTypeInfo;
@@ -65,14 +70,34 @@ import org.apache.flink.table.runtime.typeutils.serializers.python.StringSeriali
 import org.apache.flink.table.runtime.typeutils.serializers.python.TimeSerializer;
 import org.apache.flink.table.runtime.typeutils.serializers.python.TimestampSerializer;
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
 
+import java.io.IOException;
+import java.lang.reflect.Array;
 import java.lang.reflect.Field;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /** A util class for converting the given TypeInformation to other objects. */
 @Internal
@@ -590,4 +615,510 @@ public class PythonTypeUtils {
                             typeInformation.toString()));
         }
     }
+
+    /**
+     * Wrap the unpickled python data with an InputFormat. It will be passed to
+     * StreamExecutionEnvironment.creatInput() to create an InputFormat later.
+     *
+     * @param data The unpickled python data.
+     * @param dataType The python data type.
+     * @param config The execution config used to create serializer.
+     * @return An InputFormat containing the python data.
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> InputFormat<T, ?> getCollectionInputFormat(
+            final List<T> data, final TypeInformation<T> dataType, final ExecutionConfig config) {
+        Function<Object, Object> converter = converter(dataType, config);
+        return new CollectionInputFormat<>(
+                data.stream()
+                        .map(objects -> (T) converter.apply(objects))
+                        .collect(Collectors.toList()),
+                dataType.createSerializer(config));
+    }
+
+    private static BiFunction<Integer, Function<Integer, Object>, Object> arrayConstructor(
+            final TypeInformation<?> elementType, final boolean primitiveArray) {
+        if (elementType.equals(BasicTypeInfo.BOOLEAN_TYPE_INFO)) {
+            if (primitiveArray) {
+                return (length, elementGetter) -> {
+                    boolean[] array = new boolean[length];
+                    for (int i = 0; i < length; i++) {
+                        array[i] = (boolean) elementGetter.apply(i);
+                    }
+                    return array;
+                };
+            } else {
+                return (length, elementGetter) -> {
+                    Boolean[] array = new Boolean[length];
+                    for (int i = 0; i < length; i++) {
+                        array[i] = (Boolean) elementGetter.apply(i);
+                    }
+                    return array;
+                };
+            }
+        }
+        if (elementType.equals(BasicTypeInfo.BYTE_TYPE_INFO)) {
+            if (primitiveArray) {
+                return (length, elementGetter) -> {
+                    byte[] array = new byte[length];
+                    for (int i = 0; i < length; i++) {
+                        array[i] = (byte) elementGetter.apply(i);
+                    }
+                    return array;
+                };
+            } else {
+                return (length, elementGetter) -> {
+                    Byte[] array = new Byte[length];
+                    for (int i = 0; i < length; i++) {
+                        array[i] = (Byte) elementGetter.apply(i);
+                    }
+                    return array;
+                };
+            }
+        }
+        if (elementType.equals(BasicTypeInfo.SHORT_TYPE_INFO)) {
+            if (primitiveArray) {
+                return (length, elementGetter) -> {
+                    short[] array = new short[length];
+                    for (int i = 0; i < length; i++) {
+                        array[i] = (short) elementGetter.apply(i);
+                    }
+                    return array;
+                };
+            } else {
+                return (length, elementGetter) -> {
+                    Short[] array = new Short[length];
+                    for (int i = 0; i < length; i++) {
+                        array[i] = (Short) elementGetter.apply(i);
+                    }
+                    return array;
+                };
+            }
+        }
+        if (elementType.equals(BasicTypeInfo.INT_TYPE_INFO)) {
+            if (primitiveArray) {
+                return (length, elementGetter) -> {
+                    int[] array = new int[length];
+                    for (int i = 0; i < length; i++) {
+                        array[i] = (int) elementGetter.apply(i);
+                    }
+                    return array;
+                };
+            } else {
+                return (length, elementGetter) -> {
+                    Integer[] array = new Integer[length];
+                    for (int i = 0; i < length; i++) {
+                        array[i] = (Integer) elementGetter.apply(i);
+                    }
+                    return array;
+                };
+            }
+        }
+        if (elementType.equals(BasicTypeInfo.LONG_TYPE_INFO)) {
+            if (primitiveArray) {
+                return (length, elementGetter) -> {
+                    long[] array = new long[length];
+                    for (int i = 0; i < length; i++) {
+                        array[i] = (long) elementGetter.apply(i);
+                    }
+                    return array;
+                };
+            } else {
+                return (length, elementGetter) -> {
+                    Long[] array = new Long[length];
+                    for (int i = 0; i < length; i++) {
+                        array[i] = (Long) elementGetter.apply(i);
+                    }
+                    return array;
+                };
+            }
+        }
+        if (elementType.equals(BasicTypeInfo.FLOAT_TYPE_INFO)) {
+            if (primitiveArray) {
+                return (length, elementGetter) -> {
+                    float[] array = new float[length];
+                    for (int i = 0; i < length; i++) {
+                        array[i] = (float) elementGetter.apply(i);
+                    }
+                    return array;
+                };
+            } else {
+                return (length, elementGetter) -> {
+                    Float[] array = new Float[length];
+                    for (int i = 0; i < length; i++) {
+                        array[i] = (Float) elementGetter.apply(i);
+                    }
+                    return array;
+                };
+            }
+        }
+        if (elementType.equals(BasicTypeInfo.DOUBLE_TYPE_INFO)) {
+            if (primitiveArray) {
+                return (length, elementGetter) -> {
+                    double[] array = new double[length];
+                    for (int i = 0; i < length; i++) {
+                        array[i] = (double) elementGetter.apply(i);
+                    }
+                    return array;
+                };
+            } else {
+                return (length, elementGetter) -> {
+                    Double[] array = new Double[length];
+                    for (int i = 0; i < length; i++) {
+                        array[i] = (Double) elementGetter.apply(i);
+                    }
+                    return array;
+                };
+            }
+        }
+        if (elementType.equals(BasicTypeInfo.STRING_TYPE_INFO)) {
+            return (length, elementGetter) -> {
+                String[] array = new String[length];
+                for (int i = 0; i < length; i++) {
+                    array[i] = (String) elementGetter.apply(i);
+                }
+                return array;
+            };
+        }
+        return (length, elementGetter) -> {
+            Object[] array = new Object[length];
+            for (int i = 0; i < length; i++) {
+                array[i] = elementGetter.apply(i);
+            }
+            return array;
+        };
+    }
+
+    private static Function<Object, Object> converter(
+            final TypeInformation<?> dataType, final ExecutionConfig config) {
+        if (dataType.equals(Types.BOOLEAN)) {
+            return b -> b instanceof Boolean ? b : null;
+        }
+        if (dataType.equals(Types.BYTE)) {
+            return c -> {
+                if (c instanceof Byte) {
+                    return c;
+                }
+                if (c instanceof Short) {
+                    return ((Short) c).byteValue();
+                }
+                if (c instanceof Integer) {
+                    return ((Integer) c).byteValue();
+                }
+                if (c instanceof Long) {
+                    return ((Long) c).byteValue();
+                }
+                return null;
+            };
+        }
+        if (dataType.equals(Types.SHORT)) {
+            return c -> {
+                if (c instanceof Byte) {
+                    return ((Byte) c).shortValue();
+                }
+                if (c instanceof Short) {
+                    return c;
+                }
+                if (c instanceof Integer) {
+                    return ((Integer) c).shortValue();
+                }
+                if (c instanceof Long) {
+                    return ((Long) c).shortValue();
+                }
+                return null;
+            };
+        }
+        if (dataType.equals(Types.INT)) {
+            return c -> {
+                if (c instanceof Byte) {
+                    return ((Byte) c).intValue();
+                }
+                if (c instanceof Short) {
+                    return ((Short) c).intValue();
+                }
+                if (c instanceof Integer) {
+                    return c;
+                }
+                if (c instanceof Long) {
+                    return ((Long) c).intValue();
+                }
+                return null;
+            };
+        }
+        if (dataType.equals(Types.LONG)) {
+            return c -> {
+                if (c instanceof Byte) {
+                    return ((Byte) c).longValue();
+                }
+                if (c instanceof Short) {
+                    return ((Short) c).longValue();
+                }
+                if (c instanceof Integer) {
+                    return ((Integer) c).longValue();
+                }
+                if (c instanceof Long) {
+                    return c;
+                }
+                return null;
+            };
+        }
+        if (dataType.equals(Types.FLOAT)) {
+            return c -> {
+                if (c instanceof Float) {
+                    return c;
+                }
+                if (c instanceof Double) {
+                    return ((Double) c).floatValue();
+                }
+                return null;
+            };
+        }
+        if (dataType.equals(Types.DOUBLE)) {
+            return c -> {
+                if (c instanceof Float) {
+                    return ((Float) c).doubleValue();
+                }
+                if (c instanceof Double) {
+                    return c;
+                }
+                return null;
+            };
+        }
+        if (dataType.equals(Types.BIG_DEC)) {
+            return c -> c instanceof BigDecimal ? c : null;
+        }
+        if (dataType.equals(Types.SQL_DATE)) {
+            return c -> {
+                if (c instanceof Integer) {
+                    long millisLocal = ((Integer) c).longValue() * 86400000;
+                    long millisUtc = millisLocal - getOffsetFromLocalMillis(millisLocal);
+                    return new Date(millisUtc);
+                }
+                return null;
+            };
+        }
+
+        if (dataType.equals(Types.SQL_TIME)) {
+            return c ->
+                    c instanceof Integer || c instanceof Long
+                            ? new Time(((Number) c).longValue() / 1000)
+                            : null;
+        }
+
+        if (dataType.equals(Types.SQL_TIMESTAMP)) {
+            return c ->
+                    c instanceof Integer || c instanceof Long
+                            ? new Timestamp(((Number) c).longValue() / 1000)
+                            : null;
+        }
+
+        if (dataType.equals(Types.LOCAL_DATE)) {
+            return c -> {
+                if (c instanceof Integer) {
+                    long millisLocal = ((Integer) c).longValue() * 86400000;
+                    long millisUtc = millisLocal - getOffsetFromLocalMillis(millisLocal);
+                    return Instant.ofEpochMilli(millisUtc)
+                            .atZone(ZoneId.systemDefault())
+                            .toLocalDate();
+                }
+                return null;
+            };
+        }
+
+        if (dataType.equals(Types.LOCAL_TIME)) {
+            return c ->
+                    c instanceof Integer || c instanceof Long
+                            ? Instant.ofEpochMilli(((Number) c).longValue() / 1000)
+                                    .atZone(ZoneId.systemDefault())
+                                    .toLocalTime()
+                            : null;
+        }
+
+        if (dataType.equals(Types.LOCAL_DATE_TIME)) {
+            return c ->
+                    c instanceof Integer || c instanceof Long
+                            ? Instant.ofEpochMilli(((Number) c).longValue() / 1000)
+                                    .atZone(ZoneId.systemDefault())
+                                    .toLocalDateTime()
+                            : null;
+        }
+
+        if (dataType.equals(org.apache.flink.api.common.typeinfo.Types.INSTANT)) {
+            return c ->
+                    c instanceof Integer || c instanceof Long
+                            ? Instant.ofEpochMilli(((Number) c).longValue() / 1000)
+                            : null;
+        }
+        if (dataType.equals(TimeIntervalTypeInfo.INTERVAL_MILLIS)) {
+            return c ->
+                    c instanceof Integer || c instanceof Long
+                            ? ((Number) c).longValue() / 1000
+                            : null;
+        }
+        if (dataType.equals(Types.STRING)) {
+            return c -> c != null ? c.toString() : null;
+        }
+        if (dataType.equals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) {
+            return c -> {
+                if (c instanceof String) {
+                    return ((String) c).getBytes(StandardCharsets.UTF_8);
+                }
+                if (c instanceof byte[]) {
+                    return c;
+                }
+                return null;
+            };
+        }
+        if (dataType instanceof PrimitiveArrayTypeInfo
+                || dataType instanceof BasicArrayTypeInfo
+                || dataType instanceof ObjectArrayTypeInfo) {
+            TypeInformation<?> elementType =
+                    dataType instanceof PrimitiveArrayTypeInfo
+                            ? ((PrimitiveArrayTypeInfo<?>) dataType).getComponentType()
+                            : dataType instanceof BasicArrayTypeInfo
+                                    ? ((BasicArrayTypeInfo<?, ?>) dataType).getComponentInfo()
+                                    : ((ObjectArrayTypeInfo<?, ?>) dataType).getComponentInfo();
+            boolean primitive = dataType instanceof PrimitiveArrayTypeInfo;
+            Function<Object, Object> elementConverter = converter(elementType, config);
+            BiFunction<Integer, Function<Integer, Object>, Object> arrayConstructor =
+                    arrayConstructor(elementType, primitive);
+            return c -> {
+                int length = -1;
+                Function<Integer, Object> elementGetter = null;
+                if (c instanceof List) {
+                    length = ((List<?>) c).size();
+                    elementGetter = i -> elementConverter.apply(((List<?>) c).get(i));
+                }
+                if (c != null && c.getClass().isArray()) {
+                    length = Array.getLength(c);
+                    elementGetter = i -> elementConverter.apply(Array.get(c, i));
+                }
+                if (elementGetter != null) {
+                    return arrayConstructor.apply(length, elementGetter);
+                }
+                return null;
+            };
+        }
+        if (dataType instanceof MapTypeInfo) {
+            Function<Object, Object> keyConverter =
+                    converter(((MapTypeInfo<?, ?>) dataType).getKeyTypeInfo(), config);
+            Function<Object, Object> valueConverter =
+                    converter(((MapTypeInfo<?, ?>) dataType).getValueTypeInfo(), config);
+            return c ->
+                    c instanceof Map
+                            ? ((Map<?, ?>) c)
+                                    .entrySet().stream()
+                                            .collect(
+                                                    Collectors.toMap(
+                                                            e -> keyConverter.apply(e.getKey()),
+                                                            e ->
+                                                                    valueConverter.apply(
+                                                                            e.getValue())))
+                            : null;
+        }
+        if (dataType instanceof RowTypeInfo) {
+            TypeInformation<?>[] fieldTypes = ((RowTypeInfo) dataType).getFieldTypes();
+            List<Function<Object, Object>> fieldConverters =
+                    Arrays.stream(fieldTypes)
+                            .map(x -> converter(x, config))
+                            .collect(Collectors.toList());
+            return c -> {
+                if (c != null && c.getClass().isArray()) {
+                    int length = Array.getLength(c);
+                    if (length - 1 != fieldTypes.length) {
+                        throw new IllegalStateException(
+                                "Input row doesn't have expected number of values required by the schema. "
+                                        + fieldTypes.length
+                                        + " fields are required while "
+                                        + (length - 1)
+                                        + " values are provided.");
+                    }
+
+                    Row row = new Row(length - 1);
+                    row.setKind(RowKind.fromByteValue(((Number) Array.get(c, 0)).byteValue()));
+
+                    for (int i = 0; i < row.getArity(); i++) {
+                        row.setField(i, fieldConverters.get(i).apply(Array.get(c, i + 1)));
+                    }
+
+                    return row;
+                }
+                return null;
+            };
+        }
+        if (dataType instanceof TupleTypeInfo) {
+            TypeInformation<?>[] fieldTypes = ((TupleTypeInfo<?>) dataType).getFieldTypes();
+            List<Function<Object, Object>> fieldConverters =
+                    Arrays.stream(fieldTypes)
+                            .map(x -> converter(x, config))
+                            .collect(Collectors.toList());
+            return c -> {
+                if (c != null && c.getClass().isArray()) {
+                    int length = Array.getLength(c);
+                    if (length != fieldTypes.length) {
+                        throw new IllegalStateException(
+                                "Input tuple doesn't have expected number of values required by the schema. "
+                                        + fieldTypes.length
+                                        + " fields are required while "
+                                        + length
+                                        + " values are provided.");
+                    }
+
+                    Tuple tuple = Tuple.newInstance(length);
+                    for (int i = 0; i < tuple.getArity(); i++) {
+                        tuple.setField(fieldConverters.get(i).apply(Array.get(c, i)), i);
+                    }
+
+                    return tuple;
+                }
+                return null;
+            };
+        }
+
+        return c -> {
+            if (c == null
+                    || c.getClass() != byte[].class
+                    || dataType instanceof PickledByteArrayTypeInfo) {
+                return c;
+            }
+
+            // other typeinfos will use the corresponding serializer to deserialize data.
+            byte[] b = (byte[]) c;
+            TypeSerializer<?> dataSerializer = dataType.createSerializer(config);
+            ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos();
+            DataInputViewStreamWrapper baisWrapper = new DataInputViewStreamWrapper(bais);
+            bais.setBuffer(b, 0, b.length);
+            try {
+                return dataSerializer.deserialize(baisWrapper);
+            } catch (IOException e) {
+                throw new IllegalStateException(
+                        "Failed to deserialize the object with datatype " + dataType, e);
+            }
+        };
+    }
+
+    private static int getOffsetFromLocalMillis(final long millisLocal) {
+        TimeZone localZone = TimeZone.getDefault();
+        int result = localZone.getRawOffset();
+        // the actual offset should be calculated based on milliseconds in UTC
+        int offset = localZone.getOffset(millisLocal - (long) result);
+        if (offset != result) {
+            // DayLight Saving Time
+            result = localZone.getOffset(millisLocal - (long) offset);
+            if (result != offset) {
+                // fallback to do the reverse lookup using java.time.LocalDateTime
+                // this should only happen near the start or end of DST
+                LocalDate localDate = LocalDate.ofEpochDay(millisLocal / 86400000L);
+                LocalTime localTime =
+                        LocalTime.ofNanoOfDay(
+                                Math.floorMod(millisLocal, 86400000L) * 1000L * 1000L);
+                LocalDateTime localDateTime = LocalDateTime.of(localDate, localTime);
+                long millisEpoch =
+                        localDateTime.atZone(localZone.toZoneId()).toInstant().toEpochMilli();
+                result = (int) (millisLocal - millisEpoch);
+            }
+        }
+
+        return result;
+    }
 }
diff --git a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableFactory.java b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableFactory.java
new file mode 100644
index 00000000000..31012f9815a
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils.python;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.table.utils.python.PythonDynamicTableOptions.BATCH_MODE;
+import static org.apache.flink.table.utils.python.PythonDynamicTableOptions.INPUT_FILE_PATH;
+
+/** Table source factory for PythonDynamicTableSource. */
+public class PythonDynamicTableFactory implements DynamicTableSourceFactory {
+
+    public static final String IDENTIFIER = "python-input-format";
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+        ReadableConfig tableOptions = helper.getOptions();
+
+        String inputFilePath = tableOptions.get(INPUT_FILE_PATH);
+        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
+        boolean batched = tableOptions.get(BATCH_MODE);
+        return new PythonDynamicTableSource(inputFilePath, batched, schema.toPhysicalRowDataType());
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(INPUT_FILE_PATH);
+        options.add(BATCH_MODE);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return Collections.emptySet();
+    }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableOptions.java b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableOptions.java
new file mode 100644
index 00000000000..68d748842d2
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils.python;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Options for PythonDynamicTableSource. */
+public class PythonDynamicTableOptions {
+
+    public static final ConfigOption<String> INPUT_FILE_PATH =
+            ConfigOptions.key("file-path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The path of the input file.");
+
+    public static final ConfigOption<Boolean> BATCH_MODE =
+            ConfigOptions.key("batched")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Whether the values are serialized in batch.");
+}
diff --git a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableSource.java b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableSource.java
new file mode 100644
index 00000000000..4a306803ea0
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonDynamicTableSource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils.python;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.python.PythonBridgeUtils;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.InputFormatProvider;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import java.io.IOException;
+
+/** Implementation of {@link ScanTableSource} for python elements table. */
+public class PythonDynamicTableSource implements ScanTableSource {
+    private final String filePath;
+    private final boolean batched;
+    private final DataType producedDataType;
+
+    public PythonDynamicTableSource(String filePath, boolean batched, DataType producedDataType) {
+        this.filePath = filePath;
+        this.batched = batched;
+        this.producedDataType = producedDataType;
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        return new PythonDynamicTableSource(filePath, batched, producedDataType);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Python Table Source";
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+        try {
+            InputFormat<RowData, ?> inputFormat =
+                    PythonTableUtils.getInputFormat(
+                            PythonBridgeUtils.readPythonObjects(filePath, batched),
+                            producedDataType);
+            return InputFormatProvider.of(inputFormat);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    String.format("Failed to read input data from %s.", filePath), e);
+        }
+    }
+}
diff --git a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonInputFormatTableSource.java b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonInputFormatTableSource.java
deleted file mode 100644
index c6d6ff9887d..00000000000
--- a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonInputFormatTableSource.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.utils.python;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.sources.InputFormatTableSource;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-
-/** An {@link InputFormatTableSource} created by python 'from_element' method. */
-@Internal
-public class PythonInputFormatTableSource extends InputFormatTableSource<Row> {
-
-    /**
-     * The input format which contains the python data collection, usually created by {@link
-     * PythonTableUtils#getInputFormat(List, TypeInformation, ExecutionConfig)} method.
-     */
-    private final InputFormat<Row, ? extends InputSplit> inputFormat;
-
-    /**
-     * The row type info of the python data. It is generated by the python 'from_element' method.
-     */
-    private final RowTypeInfo rowTypeInfo;
-
-    public PythonInputFormatTableSource(
-            InputFormat<Row, ? extends InputSplit> inputFormat, RowTypeInfo rowTypeInfo) {
-        this.inputFormat = inputFormat;
-        this.rowTypeInfo = rowTypeInfo;
-    }
-
-    @Override
-    public InputFormat<Row, ?> getInputFormat() {
-        return inputFormat;
-    }
-
-    @Override
-    public TableSchema getTableSchema() {
-        return TableSchema.fromTypeInfo(rowTypeInfo);
-    }
-
-    @Override
-    public TypeInformation<Row> getReturnType() {
-        return rowTypeInfo;
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java
index f406c475edf..1d1ed0aab20 100644
--- a/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java
@@ -19,40 +19,64 @@
 package org.apache.flink.table.utils.python;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.io.CollectionInputFormat;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.streaming.api.typeinfo.python.PickledByteArrayTypeInfo;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.types.Row;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.dataview.ListView;
+import org.apache.flink.table.api.dataview.MapView;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
 import org.apache.flink.types.RowKind;
 
-import java.io.IOException;
 import java.lang.reflect.Array;
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.ZoneId;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.TimeZone;
 import java.util.function.BiFunction;
 import java.util.function.Function;
@@ -65,205 +89,131 @@ public final class PythonTableUtils {
     private PythonTableUtils() {}
 
     /**
-     * Wrap the unpickled python data with an InputFormat. It will be passed to
-     * PythonInputFormatTableSource later.
+     * Create a table from {@link PythonDynamicTableSource} that read data from input file with
+     * specific {@link DataType}.
      *
-     * @param data The unpickled python data.
-     * @param dataType The python data type.
-     * @param config The execution config used to create serializer.
-     * @return An InputFormat containing the python data.
+     * @param tEnv The TableEnvironment to create table.
+     * @param filePath the file path of the input data.
+     * @param schema The python data type.
+     * @param batched Whether to read data in a batch
+     * @return Table with InputFormat.
      */
-    public static InputFormat<Row, ?> getInputFormat(
-            final List<Object[]> data,
-            final TypeInformation<Row> dataType,
-            final ExecutionConfig config) {
-        Function<Object, Object> converter = converter(dataType, config);
-        return new CollectionInputFormat<>(
-                data.stream()
-                        .map(objects -> (Row) converter.apply(objects))
-                        .collect(Collectors.toList()),
-                dataType.createSerializer(config));
+    public static Table createTableFromElement(
+            TableEnvironment tEnv, String filePath, DataType schema, boolean batched) {
+        TableDescriptor.Builder builder =
+                TableDescriptor.forConnector(PythonDynamicTableFactory.IDENTIFIER)
+                        .option(PythonDynamicTableOptions.INPUT_FILE_PATH, filePath)
+                        .option(PythonDynamicTableOptions.BATCH_MODE, batched)
+                        .schema(Schema.newBuilder().fromRowDataType(schema).build());
+        return tEnv.from(builder.build());
     }
 
     /**
      * Wrap the unpickled python data with an InputFormat. It will be passed to
-     * StreamExecutionEnvironment.creatInput() to create an InputFormat later.
+     * PythonDynamicTableSource later.
      *
      * @param data The unpickled python data.
      * @param dataType The python data type.
-     * @param config The execution config used to create serializer.
      * @return An InputFormat containing the python data.
      */
-    @SuppressWarnings("unchecked")
-    public static <T> InputFormat<T, ?> getCollectionInputFormat(
-            final List<T> data, final TypeInformation<T> dataType, final ExecutionConfig config) {
-        Function<Object, Object> converter = converter(dataType, config);
-        return new CollectionInputFormat<>(
+    public static InputFormat<RowData, ?> getInputFormat(
+            final List<Object[]> data, final DataType dataType) {
+        Function<Object, Object> converter = converter(dataType.getLogicalType());
+        Collection<RowData> dataCollection =
                 data.stream()
-                        .map(objects -> (T) converter.apply(objects))
-                        .collect(Collectors.toList()),
-                dataType.createSerializer(config));
+                        .map(objects -> (RowData) converter.apply(objects))
+                        .collect(Collectors.toList());
+        return new CollectionInputFormat<>(
+                dataCollection, InternalSerializers.create(dataType.getLogicalType()));
     }
 
     private static BiFunction<Integer, Function<Integer, Object>, Object> arrayConstructor(
-            final TypeInformation<?> elementType, final boolean primitiveArray) {
-        if (elementType.equals(BasicTypeInfo.BOOLEAN_TYPE_INFO)) {
-            if (primitiveArray) {
-                return (length, elementGetter) -> {
-                    boolean[] array = new boolean[length];
-                    for (int i = 0; i < length; i++) {
-                        array[i] = (boolean) elementGetter.apply(i);
-                    }
-                    return array;
-                };
-            } else {
-                return (length, elementGetter) -> {
-                    Boolean[] array = new Boolean[length];
-                    for (int i = 0; i < length; i++) {
-                        array[i] = (Boolean) elementGetter.apply(i);
-                    }
-                    return array;
-                };
-            }
-        }
-        if (elementType.equals(BasicTypeInfo.BYTE_TYPE_INFO)) {
-            if (primitiveArray) {
-                return (length, elementGetter) -> {
-                    byte[] array = new byte[length];
-                    for (int i = 0; i < length; i++) {
-                        array[i] = (byte) elementGetter.apply(i);
-                    }
-                    return array;
-                };
-            } else {
-                return (length, elementGetter) -> {
-                    Byte[] array = new Byte[length];
-                    for (int i = 0; i < length; i++) {
-                        array[i] = (Byte) elementGetter.apply(i);
-                    }
-                    return array;
-                };
-            }
+            final LogicalType elementType) {
+        if (elementType instanceof BooleanType) {
+            return (length, elementGetter) -> {
+                Boolean[] array = new Boolean[length];
+                for (int i = 0; i < length; i++) {
+                    array[i] = (Boolean) elementGetter.apply(i);
+                }
+                return new GenericArrayData(array);
+            };
         }
-        if (elementType.equals(BasicTypeInfo.SHORT_TYPE_INFO)) {
-            if (primitiveArray) {
-                return (length, elementGetter) -> {
-                    short[] array = new short[length];
-                    for (int i = 0; i < length; i++) {
-                        array[i] = (short) elementGetter.apply(i);
-                    }
-                    return array;
-                };
-            } else {
-                return (length, elementGetter) -> {
-                    Short[] array = new Short[length];
-                    for (int i = 0; i < length; i++) {
-                        array[i] = (Short) elementGetter.apply(i);
-                    }
-                    return array;
-                };
-            }
+        if (elementType instanceof TinyIntType) {
+            return (length, elementGetter) -> {
+                Byte[] array = new Byte[length];
+                for (int i = 0; i < length; i++) {
+                    array[i] = (Byte) elementGetter.apply(i);
+                }
+                return new GenericArrayData(array);
+            };
         }
-        if (elementType.equals(BasicTypeInfo.INT_TYPE_INFO)) {
-            if (primitiveArray) {
-                return (length, elementGetter) -> {
-                    int[] array = new int[length];
-                    for (int i = 0; i < length; i++) {
-                        array[i] = (int) elementGetter.apply(i);
-                    }
-                    return array;
-                };
-            } else {
-                return (length, elementGetter) -> {
-                    Integer[] array = new Integer[length];
-                    for (int i = 0; i < length; i++) {
-                        array[i] = (Integer) elementGetter.apply(i);
-                    }
-                    return array;
-                };
-            }
+
+        if (elementType instanceof IntType) {
+            return (length, elementGetter) -> {
+                Integer[] array = new Integer[length];
+                for (int i = 0; i < length; i++) {
+                    array[i] = (Integer) elementGetter.apply(i);
+                }
+                return new GenericArrayData(array);
+            };
         }
-        if (elementType.equals(BasicTypeInfo.LONG_TYPE_INFO)) {
-            if (primitiveArray) {
-                return (length, elementGetter) -> {
-                    long[] array = new long[length];
-                    for (int i = 0; i < length; i++) {
-                        array[i] = (long) elementGetter.apply(i);
-                    }
-                    return array;
-                };
-            } else {
-                return (length, elementGetter) -> {
-                    Long[] array = new Long[length];
-                    for (int i = 0; i < length; i++) {
-                        array[i] = (Long) elementGetter.apply(i);
-                    }
-                    return array;
-                };
-            }
+        if (elementType instanceof BigIntType) {
+            return (length, elementGetter) -> {
+                Long[] array = new Long[length];
+                for (int i = 0; i < length; i++) {
+                    array[i] = (Long) elementGetter.apply(i);
+                }
+                return new GenericArrayData(array);
+            };
         }
-        if (elementType.equals(BasicTypeInfo.FLOAT_TYPE_INFO)) {
-            if (primitiveArray) {
-                return (length, elementGetter) -> {
-                    float[] array = new float[length];
-                    for (int i = 0; i < length; i++) {
-                        array[i] = (float) elementGetter.apply(i);
-                    }
-                    return array;
-                };
-            } else {
-                return (length, elementGetter) -> {
-                    Float[] array = new Float[length];
-                    for (int i = 0; i < length; i++) {
-                        array[i] = (Float) elementGetter.apply(i);
-                    }
-                    return array;
-                };
-            }
+        if (elementType instanceof FloatType) {
+            return (length, elementGetter) -> {
+                Float[] array = new Float[length];
+                for (int i = 0; i < length; i++) {
+                    array[i] = (Float) elementGetter.apply(i);
+                }
+                return new GenericArrayData(array);
+            };
         }
-        if (elementType.equals(BasicTypeInfo.DOUBLE_TYPE_INFO)) {
-            if (primitiveArray) {
-                return (length, elementGetter) -> {
-                    double[] array = new double[length];
-                    for (int i = 0; i < length; i++) {
-                        array[i] = (double) elementGetter.apply(i);
-                    }
-                    return array;
-                };
-            } else {
-                return (length, elementGetter) -> {
-                    Double[] array = new Double[length];
-                    for (int i = 0; i < length; i++) {
-                        array[i] = (Double) elementGetter.apply(i);
-                    }
-                    return array;
-                };
-            }
+        if (elementType instanceof DoubleType) {
+            return (length, elementGetter) -> {
+                Double[] array = new Double[length];
+                for (int i = 0; i < length; i++) {
+                    array[i] = (Double) elementGetter.apply(i);
+                }
+                return new GenericArrayData(array);
+            };
         }
-        if (elementType.equals(BasicTypeInfo.STRING_TYPE_INFO)) {
+        if (elementType instanceof CharType || elementType instanceof VarCharType) {
             return (length, elementGetter) -> {
-                String[] array = new String[length];
+                StringData[] array = new StringData[length];
                 for (int i = 0; i < length; i++) {
-                    array[i] = (String) elementGetter.apply(i);
+                    array[i] = (StringData) elementGetter.apply(i);
                 }
-                return array;
+                return new GenericArrayData(array);
             };
         }
+
         return (length, elementGetter) -> {
             Object[] array = new Object[length];
             for (int i = 0; i < length; i++) {
                 array[i] = elementGetter.apply(i);
             }
-            return array;
+            return new GenericArrayData(array);
         };
     }
 
-    private static Function<Object, Object> converter(
-            final TypeInformation<?> dataType, final ExecutionConfig config) {
-        if (dataType.equals(Types.BOOLEAN())) {
+    private static Function<Object, Object> converter(LogicalType logicalType) {
+
+        if (logicalType instanceof NullType) {
+            return n -> null;
+        }
+
+        if (logicalType instanceof BooleanType) {
             return b -> b instanceof Boolean ? b : null;
         }
-        if (dataType.equals(Types.BYTE())) {
+
+        if (logicalType instanceof TinyIntType) {
             return c -> {
                 if (c instanceof Byte) {
                     return c;
@@ -280,7 +230,8 @@ public final class PythonTableUtils {
                 return null;
             };
         }
-        if (dataType.equals(Types.SHORT())) {
+
+        if (logicalType instanceof SmallIntType) {
             return c -> {
                 if (c instanceof Byte) {
                     return ((Byte) c).shortValue();
@@ -297,7 +248,8 @@ public final class PythonTableUtils {
                 return null;
             };
         }
-        if (dataType.equals(Types.INT())) {
+
+        if (logicalType instanceof IntType) {
             return c -> {
                 if (c instanceof Byte) {
                     return ((Byte) c).intValue();
@@ -314,7 +266,8 @@ public final class PythonTableUtils {
                 return null;
             };
         }
-        if (dataType.equals(Types.LONG())) {
+
+        if (logicalType instanceof BigIntType) {
             return c -> {
                 if (c instanceof Byte) {
                     return ((Byte) c).longValue();
@@ -331,7 +284,8 @@ public final class PythonTableUtils {
                 return null;
             };
         }
-        if (dataType.equals(Types.FLOAT())) {
+
+        if (logicalType instanceof FloatType) {
             return c -> {
                 if (c instanceof Float) {
                     return c;
@@ -342,7 +296,7 @@ public final class PythonTableUtils {
                 return null;
             };
         }
-        if (dataType.equals(Types.DOUBLE())) {
+        if (logicalType instanceof DoubleType) {
             return c -> {
                 if (c instanceof Float) {
                     return ((Float) c).doubleValue();
@@ -353,48 +307,81 @@ public final class PythonTableUtils {
                 return null;
             };
         }
-        if (dataType.equals(Types.DECIMAL())) {
-            return c -> c instanceof BigDecimal ? c : null;
+
+        if (logicalType instanceof DecimalType) {
+            int precision = ((DecimalType) logicalType).getPrecision();
+            int scale = ((DecimalType) logicalType).getScale();
+            return c ->
+                    c instanceof BigDecimal
+                            ? DecimalData.fromBigDecimal((BigDecimal) c, precision, scale)
+                            : null;
         }
-        if (dataType.equals(Types.SQL_DATE())) {
+
+        if (logicalType instanceof DateType) {
             return c -> {
                 if (c instanceof Integer) {
-                    long millisLocal = ((Integer) c).longValue() * 86400000;
-                    long millisUtc =
-                            millisLocal - PythonTableUtils.getOffsetFromLocalMillis(millisLocal);
-                    return new Date(millisUtc);
+                    return (Integer) c;
                 }
                 return null;
             };
         }
-        if (dataType.equals(Types.SQL_TIME())) {
+
+        if (logicalType instanceof TimeType) {
+            return c -> {
+                if (c instanceof Integer || c instanceof Long) {
+                    long millisLocal = ((Number) c).longValue() / 1000;
+                    long millisUtc = millisLocal + getOffsetFromLocalMillis(millisLocal);
+                    return (int) millisUtc;
+                }
+                return null;
+            };
+        }
+
+        if (logicalType instanceof TimestampType) {
+            return c ->
+                    c instanceof Integer || c instanceof Long
+                            ? TimestampData.fromLocalDateTime(
+                                    Instant.ofEpochMilli(((Number) c).longValue() / 1000)
+                                            .atZone(ZoneId.systemDefault())
+                                            .toLocalDateTime())
+                            : null;
+        }
+
+        if (logicalType instanceof ZonedTimestampType) {
             return c ->
                     c instanceof Integer || c instanceof Long
-                            ? new Time(((Number) c).longValue() / 1000)
+                            ? TimestampData.fromInstant(
+                                    Instant.ofEpochMilli(((Number) c).longValue() / 1000))
                             : null;
         }
-        if (dataType.equals(Types.SQL_TIMESTAMP())) {
+
+        if (logicalType instanceof LocalZonedTimestampType) {
             return c ->
                     c instanceof Integer || c instanceof Long
-                            ? new Timestamp(((Number) c).longValue() / 1000)
+                            ? TimestampData.fromInstant(
+                                    Instant.ofEpochMilli(((Number) c).longValue() / 1000))
                             : null;
         }
-        if (dataType.equals(org.apache.flink.api.common.typeinfo.Types.INSTANT)) {
+
+        if (logicalType instanceof DayTimeIntervalType) {
             return c ->
                     c instanceof Integer || c instanceof Long
-                            ? Instant.ofEpochMilli(((Number) c).longValue() / 1000)
+                            ? ((Number) c).longValue() / 1000
                             : null;
         }
-        if (dataType.equals(Types.INTERVAL_MILLIS())) {
+
+        if (logicalType instanceof YearMonthIntervalType) {
             return c ->
                     c instanceof Integer || c instanceof Long
                             ? ((Number) c).longValue() / 1000
                             : null;
         }
-        if (dataType.equals(Types.STRING())) {
-            return c -> c != null ? c.toString() : null;
+
+        if (logicalType instanceof CharType || logicalType instanceof VarCharType) {
+            return c -> c != null ? StringData.fromString(c.toString()) : null;
         }
-        if (dataType.equals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) {
+
+        if (logicalType instanceof BinaryType || logicalType instanceof VarBinaryType) {
             return c -> {
                 if (c instanceof String) {
                     return ((String) c).getBytes(StandardCharsets.UTF_8);
@@ -405,19 +392,12 @@ public final class PythonTableUtils {
                 return null;
             };
         }
-        if (dataType instanceof PrimitiveArrayTypeInfo
-                || dataType instanceof BasicArrayTypeInfo
-                || dataType instanceof ObjectArrayTypeInfo) {
-            TypeInformation<?> elementType =
-                    dataType instanceof PrimitiveArrayTypeInfo
-                            ? ((PrimitiveArrayTypeInfo<?>) dataType).getComponentType()
-                            : dataType instanceof BasicArrayTypeInfo
-                                    ? ((BasicArrayTypeInfo<?, ?>) dataType).getComponentInfo()
-                                    : ((ObjectArrayTypeInfo<?, ?>) dataType).getComponentInfo();
-            boolean primitive = dataType instanceof PrimitiveArrayTypeInfo;
-            Function<Object, Object> elementConverter = converter(elementType, config);
+
+        if (logicalType instanceof ArrayType) {
+            LogicalType elementType = ((ArrayType) logicalType).getElementType();
+            Function<Object, Object> elementConverter = converter(elementType);
             BiFunction<Integer, Function<Integer, Object>, Object> arrayConstructor =
-                    arrayConstructor(elementType, primitive);
+                    arrayConstructor(elementType);
             return c -> {
                 int length = -1;
                 Function<Integer, Object> elementGetter = null;
@@ -435,28 +415,39 @@ public final class PythonTableUtils {
                 return null;
             };
         }
-        if (dataType instanceof MapTypeInfo) {
-            Function<Object, Object> keyConverter =
-                    converter(((MapTypeInfo<?, ?>) dataType).getKeyTypeInfo(), config);
+
+        if (logicalType instanceof MultisetType) {
+            return c -> c;
+        }
+
+        if (logicalType instanceof MapType) {
+            Function<Object, Object> keyConverter = converter(((MapType) logicalType).getKeyType());
             Function<Object, Object> valueConverter =
-                    converter(((MapTypeInfo<?, ?>) dataType).getValueTypeInfo(), config);
-            return c ->
-                    c instanceof Map
-                            ? ((Map<?, ?>) c)
+                    converter(((MapType) logicalType).getValueType());
+
+            return c -> {
+                if (c instanceof Map) {
+                    Map<?, ?> mapData =
+                            ((Map<?, ?>) c)
                                     .entrySet().stream()
                                             .collect(
                                                     Collectors.toMap(
                                                             e -> keyConverter.apply(e.getKey()),
                                                             e ->
                                                                     valueConverter.apply(
-                                                                            e.getValue())))
-                            : null;
+                                                                            e.getValue())));
+                    return new GenericMapData(mapData);
+                } else {
+                    return null;
+                }
+            };
         }
-        if (dataType instanceof RowTypeInfo) {
-            TypeInformation<?>[] fieldTypes = ((RowTypeInfo) dataType).getFieldTypes();
+
+        if (logicalType instanceof RowType) {
+            LogicalType[] fieldTypes = logicalType.getChildren().toArray(new LogicalType[0]);
             List<Function<Object, Object>> fieldConverters =
                     Arrays.stream(fieldTypes)
-                            .map(x -> PythonTableUtils.converter(x, config))
+                            .map(PythonTableUtils::converter)
                             .collect(Collectors.toList());
             return c -> {
                 if (c != null && c.getClass().isArray()) {
@@ -470,8 +461,8 @@ public final class PythonTableUtils {
                                         + " values are provided.");
                     }
 
-                    Row row = new Row(length - 1);
-                    row.setKind(RowKind.fromByteValue(((Number) Array.get(c, 0)).byteValue()));
+                    GenericRowData row = new GenericRowData(length - 1);
+                    row.setRowKind(RowKind.fromByteValue(((Number) Array.get(c, 0)).byteValue()));
 
                     for (int i = 0; i < row.getArity(); i++) {
                         row.setField(i, fieldConverters.get(i).apply(Array.get(c, i + 1)));
@@ -481,56 +472,19 @@ public final class PythonTableUtils {
                 }
                 return null;
             };
-        }
-        if (dataType instanceof TupleTypeInfo) {
-            TypeInformation<?>[] fieldTypes = ((TupleTypeInfo<?>) dataType).getFieldTypes();
-            List<Function<Object, Object>> fieldConverters =
-                    Arrays.stream(fieldTypes)
-                            .map(x -> PythonTableUtils.converter(x, config))
-                            .collect(Collectors.toList());
-            return c -> {
-                if (c != null && c.getClass().isArray()) {
-                    int length = Array.getLength(c);
-                    if (length != fieldTypes.length) {
-                        throw new IllegalStateException(
-                                "Input tuple doesn't have expected number of values required by the schema. "
-                                        + fieldTypes.length
-                                        + " fields are required while "
-                                        + length
-                                        + " values are provided.");
-                    }
-
-                    Tuple tuple = Tuple.newInstance(length);
-                    for (int i = 0; i < tuple.getArity(); i++) {
-                        tuple.setField(fieldConverters.get(i).apply(Array.get(c, i)), i);
-                    }
-
-                    return tuple;
-                }
-                return null;
-            };
-        }
-
-        return c -> {
-            if (c == null
-                    || c.getClass() != byte[].class
-                    || dataType instanceof PickledByteArrayTypeInfo) {
-                return c;
+        } else if (logicalType instanceof StructuredType) {
+            Optional<Class<?>> implClass = ((StructuredType) logicalType).getImplementationClass();
+            if (implClass.isPresent()
+                    && (implClass.get() == ListView.class || implClass.get() == MapView.class)) {
+                return converter(logicalType.getChildren().get(0));
             }
+            throw new IllegalStateException(
+                    "Failed to get the data converter for StructuredType with implementation "
+                            + "class: "
+                            + implClass.orElse(null));
+        }
 
-            // other typeinfos will use the corresponding serializer to deserialize data.
-            byte[] b = (byte[]) c;
-            TypeSerializer<?> dataSerializer = dataType.createSerializer(config);
-            ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos();
-            DataInputViewStreamWrapper baisWrapper = new DataInputViewStreamWrapper(bais);
-            bais.setBuffer(b, 0, b.length);
-            try {
-                return dataSerializer.deserialize(baisWrapper);
-            } catch (IOException e) {
-                throw new IllegalStateException(
-                        "Failed to deserialize the object with datatype " + dataType, e);
-            }
-        };
+        throw new IllegalStateException("Failed to get converter for LogicalType: " + logicalType);
     }
 
     private static int getOffsetFromLocalMillis(final long millisLocal) {
diff --git a/flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-python/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
similarity index 92%
copy from flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
copy to flink-python/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 511a31a088b..a1a1ce73c4a 100644
--- a/flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-python/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.table.legacyutils.TestCollectionTableFactory
+org.apache.flink.table.utils.python.PythonDynamicTableFactory
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/ByteMaxAggFunction.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/ByteMaxAggFunction.java
deleted file mode 100644
index ad9ebb44fad..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/ByteMaxAggFunction.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.table.functions.AggregateFunction;
-
-/** {@link AggregateFunction} for {@link Byte}. */
-public class ByteMaxAggFunction extends AggregateFunction<Byte, MaxAccumulator<Byte>> {
-
-    private static final long serialVersionUID = 1233840393767061909L;
-
-    @Override
-    public MaxAccumulator<Byte> createAccumulator() {
-        final MaxAccumulator<Byte> acc = new MaxAccumulator<>();
-        resetAccumulator(acc);
-        return acc;
-    }
-
-    public void accumulate(MaxAccumulator<Byte> acc, Byte value) {
-        if (value != null) {
-            if (!acc.f1 || Byte.compare(acc.f0, value) < 0) {
-                acc.f0 = value;
-                acc.f1 = true;
-            }
-        }
-    }
-
-    @Override
-    public Byte getValue(MaxAccumulator<Byte> acc) {
-        if (acc.f1) {
-            return acc.f0;
-        } else {
-            return null;
-        }
-    }
-
-    public void merge(MaxAccumulator<Byte> acc, Iterable<MaxAccumulator<Byte>> its) {
-        its.forEach(
-                a -> {
-                    if (a.f1) {
-                        accumulate(acc, a.f0);
-                    }
-                });
-    }
-
-    public void resetAccumulator(MaxAccumulator<Byte> acc) {
-        acc.f0 = 0;
-        acc.f1 = false;
-    }
-
-    @Override
-    public TypeInformation<MaxAccumulator<Byte>> getAccumulatorType() {
-        return new TupleTypeInfo(
-                MaxAccumulator.class,
-                BasicTypeInfo.BYTE_TYPE_INFO,
-                BasicTypeInfo.BOOLEAN_TYPE_INFO);
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomAssigner.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomAssigner.java
deleted file mode 100644
index d69cce51acf..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomAssigner.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;
-import org.apache.flink.types.Row;
-
-/** A watermark assigner that throws an exception if a watermark is requested. */
-public class CustomAssigner extends PunctuatedWatermarkAssigner {
-    private static final long serialVersionUID = -4900176786361416000L;
-
-    @Override
-    public Watermark getWatermark(Row row, long timestamp) {
-        throw new UnsupportedOperationException();
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomExtractor.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomExtractor.java
deleted file mode 100644
index cf6673315aa..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/CustomExtractor.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.expressions.ApiExpressionUtils;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.FieldReferenceExpression;
-import org.apache.flink.table.expressions.ResolvedFieldReference;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.util.Preconditions;
-
-/** A timestamp extractor that looks for the SQL_TIMESTAMP "ts" field. */
-public class CustomExtractor extends TimestampExtractor {
-    private static final long serialVersionUID = 6784900460276023738L;
-
-    private final String field = "ts";
-
-    @Override
-    public String[] getArgumentFields() {
-        return new String[] {field};
-    }
-
-    @Override
-    public void validateArgumentFields(TypeInformation<?>[] argumentFieldTypes) {
-        if (argumentFieldTypes[0] != Types.SQL_TIMESTAMP) {
-            throw new ValidationException(
-                    String.format(
-                            "Field 'ts' must be of type Timestamp but is of type %s.",
-                            argumentFieldTypes[0]));
-        }
-    }
-
-    @Override
-    public Expression getExpression(ResolvedFieldReference[] fieldAccesses) {
-        ResolvedFieldReference fieldAccess = fieldAccesses[0];
-        Preconditions.checkArgument(fieldAccess.resultType() == Types.SQL_TIMESTAMP);
-        FieldReferenceExpression fieldReferenceExpr =
-                new FieldReferenceExpression(
-                        fieldAccess.name(),
-                        TypeConversions.fromLegacyInfoToDataType(fieldAccess.resultType()),
-                        0,
-                        fieldAccess.fieldIndex());
-        return ApiExpressionUtils.unresolvedCall(
-                BuiltInFunctionDefinitions.CAST,
-                fieldReferenceExpr,
-                ApiExpressionUtils.typeLiteral(DataTypes.BIGINT()));
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/MaxAccumulator.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/MaxAccumulator.java
deleted file mode 100644
index 191aa8754a4..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/MaxAccumulator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/** Utility class to make working with tuples more readable. */
-public class MaxAccumulator<T> extends Tuple2<T, Boolean> {
-    private static final long serialVersionUID = 6089142148200600733L;
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RichFunc0.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RichFunc0.java
deleted file mode 100644
index 0b28990908e..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RichFunc0.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.table.functions.FunctionContext;
-import org.apache.flink.table.functions.ScalarFunction;
-
-import static org.assertj.core.api.Assertions.fail;
-
-/**
- * Testing scalar function to verify that lifecycle methods are called in the expected order and
- * only once.
- */
-public class RichFunc0 extends ScalarFunction {
-    private static final long serialVersionUID = 931156471687322386L;
-
-    private boolean openCalled = false;
-    private boolean closeCalled = false;
-
-    @Override
-    public void open(FunctionContext context) throws Exception {
-        super.open(context);
-        if (openCalled) {
-            fail("Open called more than once.");
-        } else {
-            openCalled = true;
-        }
-        if (closeCalled) {
-            fail("Close called before open.");
-        }
-    }
-
-    public void eval(int index) {
-        if (!openCalled) {
-            fail("Open was not called before eval.");
-        }
-        if (closeCalled) {
-            fail("Close called before eval.");
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        super.close();
-        if (closeCalled) {
-            fail("Close called more than once.");
-        } else {
-            closeCalled = true;
-        }
-        if (!openCalled) {
-            fail("Open was not called before close.");
-        }
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowCollector.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowCollector.java
deleted file mode 100644
index 9fed5168e25..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowCollector.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.Row;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-/** A collector for storing results in memory. */
-class RowCollector {
-    private static final ArrayDeque<Tuple2<Boolean, Row>> sink = new ArrayDeque<>();
-
-    public static void addValue(Tuple2<Boolean, Row> value) {
-        synchronized (sink) {
-            sink.add(value.copy());
-        }
-    }
-
-    public static List<Tuple2<Boolean, Row>> getAndClearValues() {
-        final ArrayList<Tuple2<Boolean, Row>> out = new ArrayList<>(sink);
-        sink.clear();
-        return out;
-    }
-
-    public static List<String> retractResults(List<Tuple2<Boolean, Row>> results) {
-        final Map<String, Integer> retracted =
-                results.stream()
-                        .collect(
-                                Collectors.groupingBy(
-                                        r -> r.f1.toString(),
-                                        Collectors.mapping(
-                                                r -> r.f0 ? 1 : -1,
-                                                Collectors.reducing(
-                                                        0, (left, right) -> left + right))));
-
-        if (retracted.values().stream().anyMatch(c -> c < 0)) {
-            throw new AssertionError("Received retracted rows which have not been accumulated.");
-        }
-
-        return retracted.entrySet().stream()
-                .flatMap(e -> IntStream.range(0, e.getValue()).mapToObj(i -> e.getKey()))
-                .collect(Collectors.toList());
-    }
-
-    public static List<String> upsertResults(List<Tuple2<Boolean, Row>> results, int[] keys) {
-        final HashMap<Row, String> upserted = new HashMap<>();
-        for (Tuple2<Boolean, Row> r : results) {
-            final Row key = Row.project(r.f1, keys);
-            if (r.f0) {
-                upserted.put(key, r.f1.toString());
-            } else {
-                upserted.remove(key);
-            }
-        }
-        return new ArrayList<>(upserted.values());
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowSink.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowSink.java
deleted file mode 100644
index 0051d9ceb6f..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/RowSink.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.types.Row;
-
-/** A sink that stores data in the {@link RowCollector}. */
-public class RowSink implements SinkFunction<Tuple2<Boolean, Row>> {
-    private static final long serialVersionUID = -7264802354440479084L;
-
-    @Override
-    public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
-        RowCollector.addValue(value);
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TableFunc1.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TableFunc1.java
deleted file mode 100644
index 4e34e65106b..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TableFunc1.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.table.functions.TableFunction;
-
-/** A function that splits strings, optionally adding a prefix. */
-public class TableFunc1 extends TableFunction<String> {
-
-    private static final long serialVersionUID = -5471603822898040617L;
-
-    public void eval(String str) {
-        if (str.contains("#")) {
-            for (String s : str.split("#")) {
-                collect(s);
-            }
-        }
-    }
-
-    public void eval(String str, String prefix) {
-        if (str.contains("#")) {
-            for (String s : str.split("#")) {
-                collect(prefix + s);
-            }
-        }
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestAppendSink.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestAppendSink.java
deleted file mode 100644
index cde54670581..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestAppendSink.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-
-/** Testing append sink. */
-public class TestAppendSink implements AppendStreamTableSink<Row> {
-
-    private String[] fNames = null;
-    private TypeInformation<?>[] fTypes = null;
-
-    @Override
-    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
-        return dataStream
-                .map(
-                        new MapFunction<Row, Tuple2<Boolean, Row>>() {
-                            private static final long serialVersionUID = 4671583708680989488L;
-
-                            @Override
-                            public Tuple2<Boolean, Row> map(Row value) throws Exception {
-                                return Tuple2.of(true, value);
-                            }
-                        })
-                .addSink(new RowSink());
-    }
-
-    @Override
-    public TypeInformation<Row> getOutputType() {
-        return new RowTypeInfo(fTypes, fNames);
-    }
-
-    @Override
-    public String[] getFieldNames() {
-        return fNames;
-    }
-
-    @Override
-    public TypeInformation<?>[] getFieldTypes() {
-        return fTypes;
-    }
-
-    @Override
-    public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
-        final TestAppendSink copy = new TestAppendSink();
-        copy.fNames = fieldNames;
-        copy.fTypes = fieldTypes;
-        return copy;
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestCollectionTableFactory.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestCollectionTableFactory.java
deleted file mode 100644
index dfa2db4149e..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestCollectionTableFactory.java
+++ /dev/null
@@ -1,307 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.io.CollectionInputFormat;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.Schema;
-import org.apache.flink.table.factories.StreamTableSinkFactory;
-import org.apache.flink.table.factories.StreamTableSourceFactory;
-import org.apache.flink.table.functions.AsyncTableFunction;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.StreamTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.sources.LookupableTableSource;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/** Factory for the testing sinks. */
-public class TestCollectionTableFactory
-        implements StreamTableSourceFactory<Row>, StreamTableSinkFactory<Row> {
-
-    private static boolean isStreaming = true;
-
-    private static final List<Row> SOURCE_DATA = new LinkedList<>();
-    private static final List<Row> DIM_DATA = new LinkedList<>();
-    private static final List<Row> RESULT = new LinkedList<>();
-
-    private long emitIntervalMS = -1L;
-
-    @Override
-    public TableSource<Row> createTableSource(Map<String, String> properties) {
-        return getCollectionSource(properties, isStreaming);
-    }
-
-    @Override
-    public TableSink<Row> createTableSink(Map<String, String> properties) {
-        return getCollectionSink(properties);
-    }
-
-    @Override
-    public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
-        return getCollectionSource(properties, true);
-    }
-
-    @Override
-    public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
-        return getCollectionSink(properties);
-    }
-
-    private CollectionTableSource getCollectionSource(
-            Map<String, String> props, boolean isStreaming) {
-        final DescriptorProperties properties = new DescriptorProperties();
-        properties.putProperties(props);
-        final TableSchema schema = properties.getTableSchema(Schema.SCHEMA);
-        final Optional<Integer> parallelism = properties.getOptionalInt("parallelism");
-        return new CollectionTableSource(emitIntervalMS, schema, isStreaming, parallelism);
-    }
-
-    private CollectionTableSink getCollectionSink(Map<String, String> props) {
-        final DescriptorProperties properties = new DescriptorProperties();
-        properties.putProperties(props);
-        final TableSchema schema = properties.getTableSchema(Schema.SCHEMA);
-        return new CollectionTableSink((RowTypeInfo) schema.toRowType());
-    }
-
-    @Override
-    public Map<String, String> requiredContext() {
-        final HashMap<String, String> context = new HashMap<>();
-        context.put(ConnectorDescriptorValidator.CONNECTOR, "COLLECTION");
-        return context;
-    }
-
-    @Override
-    public List<String> supportedProperties() {
-        return Arrays.asList("*");
-    }
-
-    private static class CollectionTableSource
-            implements StreamTableSource<Row>, LookupableTableSource<Row> {
-        private final long emitIntervalMs;
-        private final TableSchema schema;
-        private final boolean isStreaming;
-        private final Optional<Integer> parallelism;
-        private final TypeInformation<Row> rowType;
-
-        private CollectionTableSource(
-                long emitIntervalMs,
-                TableSchema schema,
-                boolean isStreaming,
-                Optional<Integer> parallelism) {
-            this.emitIntervalMs = emitIntervalMs;
-            this.schema = schema;
-            this.isStreaming = isStreaming;
-            this.parallelism = parallelism;
-            this.rowType = schema.toRowType();
-        }
-
-        @Override
-        public boolean isBounded() {
-            return !isStreaming;
-        }
-
-        @Override
-        public DataStream<Row> getDataStream(StreamExecutionEnvironment streamEnv) {
-            final DataStreamSource<Row> dataStream =
-                    streamEnv.createInput(
-                            new TestCollectionInputFormat<>(
-                                    emitIntervalMs,
-                                    SOURCE_DATA,
-                                    rowType.createSerializer(new ExecutionConfig())),
-                            rowType);
-            if (parallelism.isPresent()) {
-                dataStream.setParallelism(parallelism.get());
-            }
-            return dataStream;
-        }
-
-        @Override
-        public TypeInformation<Row> getReturnType() {
-            return rowType;
-        }
-
-        @Override
-        public TableSchema getTableSchema() {
-            return schema;
-        }
-
-        @Override
-        public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
-            final String[] schemaFieldNames = schema.getFieldNames();
-            final int[] keys =
-                    Arrays.stream(lookupKeys)
-                            .map(
-                                    k -> {
-                                        for (int x = 0; x < schemaFieldNames.length; x++) {
-                                            if (k.equals(schemaFieldNames[x])) {
-                                                return x;
-                                            }
-                                        }
-                                        throw new IllegalStateException();
-                                    })
-                            .mapToInt(i -> i)
-                            .toArray();
-
-            return new TemporalTableFetcher(DIM_DATA, keys);
-        }
-
-        @Override
-        public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys) {
-            return null;
-        }
-
-        @Override
-        public boolean isAsyncEnabled() {
-            return false;
-        }
-    }
-
-    private static class CollectionTableSink implements AppendStreamTableSink<Row> {
-        private final RowTypeInfo outputType;
-
-        private CollectionTableSink(RowTypeInfo outputType) {
-            this.outputType = outputType;
-        }
-
-        @Override
-        public RowTypeInfo getOutputType() {
-            return outputType;
-        }
-
-        @Override
-        public String[] getFieldNames() {
-            return outputType.getFieldNames();
-        }
-
-        @Override
-        public TypeInformation<?>[] getFieldTypes() {
-            return outputType.getFieldTypes();
-        }
-
-        @Override
-        public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
-            return dataStream.addSink(new UnsafeMemorySinkFunction(outputType)).setParallelism(1);
-        }
-
-        @Override
-        public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
-            return this;
-        }
-    }
-
-    private static class UnsafeMemorySinkFunction extends RichSinkFunction<Row> {
-        private static final long serialVersionUID = -7880686562734099699L;
-
-        private final TypeInformation<Row> outputType;
-        private TypeSerializer<Row> serializer = null;
-
-        private UnsafeMemorySinkFunction(TypeInformation<Row> outputType) {
-            this.outputType = outputType;
-        }
-
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            serializer = outputType.createSerializer(new ExecutionConfig());
-        }
-
-        @Override
-        public void invoke(Row row, Context context) throws Exception {
-            RESULT.add(serializer.copy(row));
-        }
-    }
-
-    private static class TestCollectionInputFormat<T> extends CollectionInputFormat<T> {
-
-        private static final long serialVersionUID = -3222731547793350189L;
-
-        private final long emitIntervalMs;
-
-        public TestCollectionInputFormat(
-                long emitIntervalMs, Collection<T> dataSet, TypeSerializer<T> serializer) {
-            super(dataSet, serializer);
-            this.emitIntervalMs = emitIntervalMs;
-        }
-
-        @Override
-        public boolean reachedEnd() throws IOException {
-            if (emitIntervalMs > 0) {
-                try {
-                    Thread.sleep(emitIntervalMs);
-                } catch (InterruptedException e) {
-                }
-            }
-            return super.reachedEnd();
-        }
-    }
-
-    private static class TemporalTableFetcher extends TableFunction<Row> {
-        private static final long serialVersionUID = 6248306950388784015L;
-
-        private final List<Row> dimData;
-        private final int[] keys;
-
-        private TemporalTableFetcher(List<Row> dimData, int[] keys) {
-            this.dimData = dimData;
-            this.keys = keys;
-        }
-
-        public void eval(Row values) {
-            for (Row data : dimData) {
-                boolean matched = true;
-                int idx = 0;
-                while (matched && idx < keys.length) {
-                    final Object dimField = data.getField(keys[idx]);
-                    final Object inputField = values.getField(idx);
-                    matched = dimField.equals(inputField);
-                    idx += 1;
-                }
-                if (matched) {
-                    // copy the row data
-                    final Row ret = new Row(data.getArity());
-                    for (int x = 0; x < data.getArity(); x++) {
-                        ret.setField(x, data.getField(x));
-                    }
-                    collect(ret);
-                }
-            }
-        }
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestRetractSink.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestRetractSink.java
deleted file mode 100644
index 449aea756e4..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestRetractSink.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.table.sinks.RetractStreamTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-
-/** Testing retract sink. */
-public class TestRetractSink implements RetractStreamTableSink<Row> {
-
-    private String[] fNames = null;
-    private TypeInformation<?>[] fTypes = null;
-
-    @Override
-    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
-        return dataStream.addSink(new RowSink());
-    }
-
-    @Override
-    public TypeInformation<Row> getRecordType() {
-        return new RowTypeInfo(fTypes, fNames);
-    }
-
-    @Override
-    public String[] getFieldNames() {
-        return fNames;
-    }
-
-    @Override
-    public TypeInformation<?>[] getFieldTypes() {
-        return fTypes;
-    }
-
-    @Override
-    public TableSink<Tuple2<Boolean, Row>> configure(
-            String[] fieldNames, TypeInformation<?>[] fieldTypes) {
-        final TestRetractSink copy = new TestRetractSink();
-        copy.fNames = fieldNames;
-        copy.fTypes = fieldTypes;
-        return copy;
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestUpsertSink.java b/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestUpsertSink.java
deleted file mode 100644
index 1740d5380a8..00000000000
--- a/flink-python/src/test/java/org/apache/flink/table/legacyutils/TestUpsertSink.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.legacyutils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.sinks.UpsertStreamTableSink;
-import org.apache.flink.types.Row;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** testing upsert sink. */
-public class TestUpsertSink implements UpsertStreamTableSink<Row> {
-
-    private String[] fNames = null;
-    private TypeInformation<?>[] fTypes = null;
-
-    private final String[] expectedKeys;
-    private final boolean expectedIsAppendOnly;
-
-    public TestUpsertSink(String[] expectedKeys, boolean expectedIsAppendOnly) {
-        this.expectedKeys = expectedKeys;
-        this.expectedIsAppendOnly = expectedIsAppendOnly;
-    }
-
-    @Override
-    public void setKeyFields(String[] keys) {
-        assertThat(keys)
-                .as("Provided key fields do not match expected keys")
-                .containsExactlyInAnyOrder(expectedKeys);
-    }
-
-    @Override
-    public void setIsAppendOnly(Boolean isAppendOnly) {
-        assertThat(isAppendOnly)
-                .as("Provided isAppendOnly does not match expected isAppendOnly")
-                .isEqualTo(expectedIsAppendOnly);
-    }
-
-    @Override
-    public TypeInformation<Row> getRecordType() {
-        return new RowTypeInfo(fTypes, fNames);
-    }
-
-    @Override
-    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> s) {
-        return s.addSink(new RowSink());
-    }
-
-    @Override
-    public String[] getFieldNames() {
-        return fNames;
-    }
-
-    @Override
-    public TypeInformation<?>[] getFieldTypes() {
-        return fTypes;
-    }
-
-    @Override
-    public TableSink<Tuple2<Boolean, Row>> configure(
-            String[] fieldNames, TypeInformation<?>[] fieldTypes) {
-        final TestUpsertSink copy = new TestUpsertSink(expectedKeys, expectedIsAppendOnly);
-        copy.fNames = fieldNames;
-        copy.fTypes = fieldTypes;
-        return copy;
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/utils/TestCollectionTableFactory.java b/flink-python/src/test/java/org/apache/flink/table/utils/TestCollectionTableFactory.java
new file mode 100644
index 00000000000..125c1317352
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/utils/TestCollectionTableFactory.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.io.CollectionInputFormat;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Testing CollectionTableFactory that creates collection DynamicTableSource and DynamicTableSink.
+ */
+public class TestCollectionTableFactory
+        implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+    public static boolean isStreaming = true;
+    private static final LinkedList<Row> SOURCE_DATA = new LinkedList<>();
+    private static final LinkedList<Row> DIM_DATA = new LinkedList<>();
+    private static final LinkedList<Row> RESULT = new LinkedList<>();
+
+    private static long emitIntervalMS = -1L;
+
+    public static void initData(List<Row> sourceData, List<Row> dimData, Long emitInterval) {
+        SOURCE_DATA.addAll(sourceData);
+        DIM_DATA.addAll(dimData);
+        emitIntervalMS = emitInterval == null ? -1L : emitInterval;
+    }
+
+    public static void reset() {
+        RESULT.clear();
+        SOURCE_DATA.clear();
+        DIM_DATA.clear();
+        emitIntervalMS = -1L;
+    }
+
+    public static CollectionTableSource getCollectionSource(
+            ResolvedCatalogTable catalogTable, boolean isStreaming) {
+        String parallelismProp = catalogTable.getOptions().getOrDefault("parallelism", null);
+        Optional<Integer> parallelism;
+        if (parallelismProp == null) {
+            parallelism = Optional.empty();
+        } else {
+            parallelism = Optional.of(Integer.parseInt(parallelismProp));
+        }
+        return new CollectionTableSource(
+                emitIntervalMS,
+                catalogTable.getResolvedSchema().toSourceRowDataType(),
+                isStreaming,
+                parallelism);
+    }
+
+    public static CollectionTableSink getCollectionSink(ResolvedCatalogTable catalogTable) {
+        return new CollectionTableSink(catalogTable.getResolvedSchema().toSinkRowDataType());
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        return getCollectionSource(
+                context.getCatalogTable(), TestCollectionTableFactory.isStreaming);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return "COLLECTION";
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return new HashSet<>();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return new HashSet<>();
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        return getCollectionSink(context.getCatalogTable());
+    }
+
+    static class CollectionTableSource implements ScanTableSource, LookupTableSource {
+
+        private final Long emitIntervalMS;
+        private final DataType rowType;
+        private final boolean isStreaming;
+        private final Optional<Integer> parallelism;
+
+        public CollectionTableSource(
+                Long emitIntervalMS,
+                DataType rowType,
+                boolean isStreaming,
+                Optional<Integer> parallelism) {
+            this.emitIntervalMS = emitIntervalMS;
+            this.rowType = rowType;
+            this.isStreaming = isStreaming;
+            this.parallelism = parallelism;
+        }
+
+        @Override
+        public DynamicTableSource copy() {
+            return new CollectionTableSource(emitIntervalMS, rowType, isStreaming, parallelism);
+        }
+
+        @Override
+        public String asSummaryString() {
+            return "CollectionTableSource";
+        }
+
+        @Override
+        public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+            int[] lookupIndices = Arrays.stream(context.getKeys()).mapToInt(k -> k[0]).toArray();
+            return TableFunctionProvider.of(new TemporalTableFetcher(DIM_DATA, lookupIndices));
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return ChangelogMode.insertOnly();
+        }
+
+        @Override
+        public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+            TypeInformation<RowData> type = runtimeProviderContext.createTypeInformation(rowType);
+            TypeSerializer<RowData> serializer = type.createSerializer(new ExecutionConfig());
+            DataStructureConverter converter =
+                    runtimeProviderContext.createDataStructureConverter(rowType);
+            List<RowData> rowData =
+                    SOURCE_DATA.stream()
+                            .map(row -> (RowData) converter.toInternal(row))
+                            .collect(Collectors.toList());
+
+            return new DataStreamScanProvider() {
+                @Override
+                public DataStream<RowData> produceDataStream(
+                        ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
+                    DataStreamSource<RowData> dataStream =
+                            execEnv.createInput(
+                                    new TestCollectionInputFormat<>(
+                                            emitIntervalMS, rowData, serializer),
+                                    type);
+                    parallelism.ifPresent(dataStream::setParallelism);
+                    return dataStream;
+                }
+
+                @Override
+                public boolean isBounded() {
+                    return !isStreaming;
+                }
+            };
+        }
+    }
+
+    static class CollectionTableSink implements DynamicTableSink {
+
+        private final DataType outputType;
+
+        public CollectionTableSink(DataType outputType) {
+            this.outputType = outputType;
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+            return requestedMode;
+        }
+
+        @Override
+        public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+            TypeInformation<Row> typeInformation = context.createTypeInformation(outputType);
+            DataStructureConverter converter = context.createDataStructureConverter(outputType);
+            return new DataStreamSinkProvider() {
+                @Override
+                public DataStreamSink<?> consumeDataStream(
+                        ProviderContext providerContext, DataStream<RowData> dataStream) {
+                    return dataStream
+                            .addSink(new UnsafeMemorySinkFunction(typeInformation, converter))
+                            .setParallelism(1);
+                }
+            };
+        }
+
+        @Override
+        public DynamicTableSink copy() {
+            return new CollectionTableSink(outputType);
+        }
+
+        @Override
+        public String asSummaryString() {
+            return String.format("CollectionTableSink(%s)", outputType);
+        }
+    }
+
+    static class TestCollectionInputFormat<T> extends CollectionInputFormat<T> {
+        private final long emitIntervalMs;
+
+        public TestCollectionInputFormat(
+                long emitIntervalMs, Collection<T> dataSet, TypeSerializer<T> serializer) {
+            super(dataSet, serializer);
+            this.emitIntervalMs = emitIntervalMs;
+        }
+
+        @Override
+        public boolean reachedEnd() throws IOException {
+            if (emitIntervalMs > 0) {
+                try {
+                    Thread.sleep(emitIntervalMs);
+                } catch (Exception ignored) {
+
+                }
+            }
+            return super.reachedEnd();
+        }
+    }
+
+    static class TemporalTableFetcher extends TableFunction<Row> {
+        private final LinkedList<Row> dimData;
+        private final int[] keyes;
+
+        public TemporalTableFetcher(LinkedList<Row> dimData, int[] keyes) {
+            this.dimData = dimData;
+            this.keyes = keyes;
+        }
+
+        public void eval(Object... values) {
+            for (Row data : dimData) {
+                boolean matched = true;
+                int idx = 0;
+                while (matched && idx < keyes.length) {
+                    Object dimField = data.getField(keyes[idx]);
+                    Object inputField = values[idx];
+                    if (dimField != null) {
+                        matched = dimField.equals(inputField);
+                    } else {
+                        matched = inputField == null;
+                    }
+                    idx += 1;
+                }
+
+                if (matched) {
+                    Row ret = new Row(data.getArity());
+                    for (int i = 0; i < data.getArity(); i++) {
+                        ret.setField(i, data.getField(i));
+                    }
+                    collect(ret);
+                }
+            }
+        }
+    }
+
+    static class UnsafeMemorySinkFunction extends RichSinkFunction<RowData> {
+        private TypeSerializer<Row> serializer;
+
+        private final TypeInformation<Row> outputType;
+        private final DynamicTableSink.DataStructureConverter converter;
+
+        public UnsafeMemorySinkFunction(
+                TypeInformation<Row> outputType,
+                DynamicTableSink.DataStructureConverter converter) {
+            this.outputType = outputType;
+            this.converter = converter;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            serializer = outputType.createSerializer(new ExecutionConfig());
+        }
+
+        @Override
+        public void invoke(RowData value, Context context) throws Exception {
+            RESULT.add(serializer.copy((Row) converter.toExternal(value)));
+        }
+    }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/utils/TestingDescriptors.java b/flink-python/src/test/java/org/apache/flink/table/utils/TestingDescriptors.java
new file mode 100644
index 00000000000..6b0a85b44ca
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/utils/TestingDescriptors.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
+import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Testing Descriptors for python tests. */
+public class TestingDescriptors {
+
+    /** CustomAssigner for testing. */
+    public static class CustomAssigner extends PunctuatedWatermarkAssigner {
+        @Override
+        public Watermark getWatermark(Row row, long timestamp) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    /** CustomExtractor for testing. */
+    public static class CustomExtractor extends TimestampExtractor {
+
+        private final String field;
+
+        public CustomExtractor(String field) {
+            this.field = field;
+        }
+
+        public CustomExtractor() {
+            this("ts");
+        }
+
+        @Override
+        public String[] getArgumentFields() {
+            return new String[] {field};
+        }
+
+        @Override
+        public void validateArgumentFields(TypeInformation<?>[] argumentFieldTypes) {
+            if (argumentFieldTypes[0] != Types.SQL_TIMESTAMP) {
+                throw new ValidationException(
+                        String.format(
+                                "Field 'ts' must be of type Timestamp " + "but is of type %s.",
+                                argumentFieldTypes[0]));
+            }
+        }
+
+        @Override
+        public Expression getExpression(ResolvedFieldReference[] fieldAccesses) {
+            ResolvedFieldReference fieldAccess = fieldAccesses[0];
+            checkState(fieldAccess.resultType() == Types.SQL_TIMESTAMP);
+            FieldReferenceExpression fieldReferenceExpr =
+                    new FieldReferenceExpression(
+                            fieldAccess.name(),
+                            TypeConversions.fromLegacyInfoToDataType(fieldAccess.resultType()),
+                            0,
+                            fieldAccess.fieldIndex());
+            return ApiExpressionUtils.unresolvedCall(
+                    BuiltInFunctionDefinitions.CAST,
+                    fieldReferenceExpr,
+                    ApiExpressionUtils.typeLiteral(DataTypes.BIGINT()));
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o instanceof CustomExtractor) {
+                return field.equals(((CustomExtractor) o).field);
+            } else {
+                return false;
+            }
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(field);
+        }
+    }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/utils/TestingFunctions.java b/flink-python/src/test/java/org/apache/flink/table/utils/TestingFunctions.java
new file mode 100644
index 00000000000..9bbbf570f53
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/utils/TestingFunctions.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+
+import org.junit.Assert;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+/** Testing Function for python udf tests. */
+public class TestingFunctions {
+
+    /** RichFunc0 for testing. */
+    public static class RichFunc0 extends ScalarFunction {
+        private boolean openCalled = false;
+        private boolean closeCalled = false;
+
+        @Override
+        public void open(FunctionContext context) throws Exception {
+            super.open(context);
+            if (openCalled) {
+                Assert.fail("Open called more than once.");
+            } else {
+                openCalled = true;
+            }
+
+            if (closeCalled) {
+                Assert.fail("Close called before open.");
+            }
+        }
+
+        public Integer eval(Integer index) {
+            if (!openCalled) {
+                Assert.fail("Open was not called before eval.");
+            }
+
+            if (closeCalled) {
+                Assert.fail("Close called before eval.");
+            }
+
+            return index + 1;
+        }
+
+        @Override
+        public void close() throws Exception {
+            super.close();
+            if (closeCalled) {
+                Assert.fail("Close called more than once.");
+            } else {
+                closeCalled = true;
+            }
+
+            if (!openCalled) {
+                Assert.fail("Open was not called before close.");
+            }
+        }
+    }
+
+    /** MaxAccumulator for testing. */
+    static class MaxAccumulator<T extends Comparable> extends Tuple2<T, Boolean> {}
+
+    /** MaxAggFunction for testing. */
+    abstract static class MaxAggFunction<T extends Comparable>
+            extends AggregateFunction<T, MaxAccumulator<T>> {
+        @Override
+        public T getValue(MaxAccumulator<T> accumulator) {
+            if (accumulator.f1) {
+                return accumulator.f0;
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        public MaxAccumulator<T> createAccumulator() {
+            MaxAccumulator<T> accumulator = new MaxAccumulator<>();
+            accumulator.f0 = getInitValue();
+            accumulator.f1 = false;
+            return accumulator;
+        }
+
+        public void accumulate(MaxAccumulator<T> acc, Object value) {
+            if (value != null) {
+                T t = (T) value;
+                if (!acc.f1 || acc.f0.compareTo(t) < 0) {
+                    acc.f0 = t;
+                    acc.f1 = true;
+                }
+            }
+        }
+
+        public void merge(MaxAccumulator<T> accumulator, Iterable<MaxAccumulator<T>> its) {
+            Iterator<MaxAccumulator<T>> iter = its.iterator();
+            while (iter.hasNext()) {
+                MaxAccumulator<T> a = iter.next();
+                if (a.f1) {
+                    accumulate(accumulator, a.f0);
+                }
+            }
+        }
+
+        public void resetAccumulator(MaxAccumulator<T> accumulator) {
+            accumulator.f0 = getInitValue();
+            accumulator.f1 = false;
+        }
+
+        abstract T getInitValue();
+
+        abstract TypeInformation<?> getValueTypeInfo();
+    }
+
+    /** ByteMaxAggFunction for testing. */
+    public static class ByteMaxAggFunction extends MaxAggFunction<Byte> {
+
+        @Override
+        Byte getInitValue() {
+            return 0;
+        }
+
+        @Override
+        TypeInformation<?> getValueTypeInfo() {
+            return Types.BYTE;
+        }
+    }
+
+    /** TableFunc1 for testing. */
+    public static class TableFunc1 extends TableFunction<String> {
+        public void eval(String str) {
+            if (str.contains("#")) {
+                Arrays.stream(str.split("#")).forEach(this::collect);
+            }
+        }
+
+        public void eval(String str, String prefix) {
+            if (str.contains("#")) {
+                Arrays.stream(str.split("#")).forEach(s -> collect(prefix + s));
+            }
+        }
+    }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinkTableFactory.java b/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinkTableFactory.java
new file mode 100644
index 00000000000..f27c0538425
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinkTableFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** TableFactory for TestingSinks. */
+public class TestingSinkTableFactory implements DynamicTableSinkFactory {
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        return new TestingSinks.TestAppendingSink(context.getPhysicalRowDataType());
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return "test-sink";
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return new HashSet<>();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return new HashSet<>();
+    }
+}
diff --git a/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinks.java b/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinks.java
new file mode 100644
index 00000000000..b7645a67a03
--- /dev/null
+++ b/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinks.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Testing Sinks that collects test output data for validation. */
+public class TestingSinks {
+
+    /** TestAppendingSink for testing. */
+    public static class TestAppendingSink implements DynamicTableSink {
+        private final DataType rowDataType;
+
+        public TestAppendingSink(DataType rowDataType) {
+            this.rowDataType = rowDataType;
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+            return requestedMode;
+        }
+
+        @Override
+        public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+            final DataStructureConverter converter =
+                    context.createDataStructureConverter(rowDataType);
+            return (DataStreamSinkProvider)
+                    (providerContext, dataStream) -> dataStream.addSink(new RowSink(converter));
+        }
+
+        @Override
+        public DynamicTableSink copy() {
+            return new TestAppendingSink(rowDataType);
+        }
+
+        @Override
+        public String asSummaryString() {
+            return String.format("TestingAppendSink(%s)", DataType.getFields(rowDataType));
+        }
+    }
+
+    /** RowSink for testing. */
+    static class RowSink implements SinkFunction<RowData> {
+        private final DynamicTableSink.DataStructureConverter converter;
+
+        public RowSink(DynamicTableSink.DataStructureConverter converter) {
+            this.converter = converter;
+        }
+
+        @Override
+        public void invoke(RowData value, Context context) {
+            RowKind rowKind = value.getRowKind();
+            Row data = (Row) converter.toExternal(value);
+            if (rowKind == RowKind.INSERT || rowKind == RowKind.UPDATE_AFTER) {
+                RowCollector.addValue(Tuple2.of(true, data));
+            } else {
+                RowCollector.addValue(Tuple2.of(false, data));
+            }
+        }
+    }
+
+    /** RowCollector for testing. */
+    public static class RowCollector {
+        private static final List<Tuple2<Boolean, Row>> SINK = new ArrayList<>();
+
+        public static void addValue(Tuple2<Boolean, Row> value) {
+            final Tuple2<Boolean, Row> copy = new Tuple2<>(value.f0, value.f1);
+            synchronized (SINK) {
+                SINK.add(copy);
+            }
+        }
+
+        public static List<Tuple2<Boolean, Row>> getAndClearValues() {
+            final List<Tuple2<Boolean, Row>> out = new ArrayList<>(SINK);
+            SINK.clear();
+            return out;
+        }
+
+        public static List<String> retractResults(List<Tuple2<Boolean, Row>> results) {
+            Map<String, Integer> retractedResult = new HashMap<>();
+            results.forEach(
+                    v -> {
+                        int cnt = retractedResult.getOrDefault(v.f1.toString(), 0);
+                        if (v.f0) {
+                            retractedResult.put(v.f1.toString(), cnt + 1);
+                        } else {
+                            retractedResult.put(v.f1.toString(), cnt - 1);
+                        }
+                    });
+
+            if (retractedResult.entrySet().stream().allMatch(entry -> entry.getValue() < 0)) {
+                throw new AssertionError(
+                        "Received retracted rows which have not been " + "accumulated.");
+            }
+            List<String> retractedString = new ArrayList<>();
+            retractedResult.forEach(
+                    (k, v) -> {
+                        for (int i = 0; i < v; i++) {
+                            retractedString.add(k);
+                        }
+                    });
+
+            return retractedString;
+        }
+
+        public static List<String> upsertResults(List<Tuple2<Boolean, Row>> results, int[] keys) {
+            Map<Row, String> upsertResult = new HashMap<>();
+            results.forEach(
+                    v -> {
+                        Row key = Row.project(v.f1, keys);
+                        if (v.f0) {
+                            upsertResult.put(key, v.f1.toString());
+                        } else {
+                            upsertResult.remove(key);
+                        }
+                    });
+            return new ArrayList<>(upsertResult.values());
+        }
+    }
+}
diff --git a/flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
similarity index 87%
rename from flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
rename to flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 511a31a088b..679e7f4a8c6 100644
--- a/flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-python/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.table.legacyutils.TestCollectionTableFactory
+org.apache.flink.table.utils.TestCollectionTableFactory
+org.apache.flink.table.utils.TestingSinkTableFactory
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
index c6fa42a98df..b3dfb12730e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonScalarFunction.java
@@ -39,8 +39,8 @@ public class PythonScalarFunction extends ScalarFunction implements PythonFuncti
 
     private final String name;
     private final byte[] serializedScalarFunction;
-    private final TypeInformation[] inputTypes;
-    private final TypeInformation resultType;
+    private final DataType[] inputTypes;
+    private final DataType resultType;
     private final PythonFunctionKind pythonFunctionKind;
     private final boolean deterministic;
     private final PythonEnv pythonEnv;
@@ -49,8 +49,8 @@ public class PythonScalarFunction extends ScalarFunction implements PythonFuncti
     public PythonScalarFunction(
             String name,
             byte[] serializedScalarFunction,
-            TypeInformation[] inputTypes,
-            TypeInformation resultType,
+            DataType[] inputTypes,
+            DataType resultType,
             PythonFunctionKind pythonFunctionKind,
             boolean deterministic,
             boolean takesRowAsInput,
@@ -98,7 +98,7 @@ public class PythonScalarFunction extends ScalarFunction implements PythonFuncti
     @Override
     public TypeInformation[] getParameterTypes(Class[] signature) {
         if (inputTypes != null) {
-            return inputTypes;
+            return TypeConversions.fromDataTypeToLegacyInfo(inputTypes);
         } else {
             return super.getParameterTypes(signature);
         }
@@ -106,7 +106,7 @@ public class PythonScalarFunction extends ScalarFunction implements PythonFuncti
 
     @Override
     public TypeInformation getResultType(Class[] signature) {
-        return resultType;
+        return TypeConversions.fromDataTypeToLegacyInfo(resultType);
     }
 
     @Override
@@ -114,15 +114,10 @@ public class PythonScalarFunction extends ScalarFunction implements PythonFuncti
         TypeInference.Builder builder = TypeInference.newBuilder();
         if (inputTypes != null) {
             final List<DataType> argumentDataTypes =
-                    Stream.of(inputTypes)
-                            .map(TypeConversions::fromLegacyInfoToDataType)
-                            .collect(Collectors.toList());
+                    Stream.of(inputTypes).collect(Collectors.toList());
             builder.typedArguments(argumentDataTypes);
         }
-        return builder.outputTypeStrategy(
-                        TypeStrategies.explicit(
-                                TypeConversions.fromLegacyInfoToDataType(resultType)))
-                .build();
+        return builder.outputTypeStrategy(TypeStrategies.explicit(resultType)).build();
     }
 
     @Override
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
index fc920aee7d9..29df9416efa 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.functions.python;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.types.DataType;
@@ -41,8 +40,8 @@ public class PythonTableFunction extends TableFunction<Row> implements PythonFun
 
     private final String name;
     private final byte[] serializedScalarFunction;
-    private final TypeInformation[] inputTypes;
-    private final RowTypeInfo resultType;
+    private final DataType[] inputTypes;
+    private final DataType resultType;
     private final PythonFunctionKind pythonFunctionKind;
     private final boolean deterministic;
     private final PythonEnv pythonEnv;
@@ -51,8 +50,8 @@ public class PythonTableFunction extends TableFunction<Row> implements PythonFun
     public PythonTableFunction(
             String name,
             byte[] serializedScalarFunction,
-            TypeInformation[] inputTypes,
-            RowTypeInfo resultType,
+            DataType[] inputTypes,
+            DataType resultType,
             PythonFunctionKind pythonFunctionKind,
             boolean deterministic,
             boolean takesRowAsInput,
@@ -100,7 +99,7 @@ public class PythonTableFunction extends TableFunction<Row> implements PythonFun
     @Override
     public TypeInformation[] getParameterTypes(Class[] signature) {
         if (inputTypes != null) {
-            return inputTypes;
+            return TypeConversions.fromDataTypeToLegacyInfo(inputTypes);
         } else {
             return super.getParameterTypes(signature);
         }
@@ -108,7 +107,7 @@ public class PythonTableFunction extends TableFunction<Row> implements PythonFun
 
     @Override
     public TypeInformation<Row> getResultType() {
-        return resultType;
+        return (TypeInformation<Row>) TypeConversions.fromDataTypeToLegacyInfo(resultType);
     }
 
     @Override
@@ -116,15 +115,10 @@ public class PythonTableFunction extends TableFunction<Row> implements PythonFun
         TypeInference.Builder builder = TypeInference.newBuilder();
         if (inputTypes != null) {
             final List<DataType> argumentDataTypes =
-                    Stream.of(inputTypes)
-                            .map(TypeConversions::fromLegacyInfoToDataType)
-                            .collect(Collectors.toList());
+                    Stream.of(inputTypes).collect(Collectors.toList());
             builder.typedArguments(argumentDataTypes);
         }
-        return builder.outputTypeStrategy(
-                        TypeStrategies.explicit(
-                                TypeConversions.fromLegacyInfoToDataType(resultType)))
-                .build();
+        return builder.outputTypeStrategy(TypeStrategies.explicit(resultType)).build();
     }
 
     @Override