You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/05/28 13:21:54 UTC

[flink] 01/02: [FLINK-22619][python] Drop usages of BatchTableEnvironment and old planner in Python

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9fe0ec7caa5fdc7d22e6655e557df49b6263ae76
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed May 19 09:20:31 2021 +0200

    [FLINK-22619][python] Drop usages of BatchTableEnvironment and old planner in Python
    
    This is a major cleanup of the Python module that drops support for
    BatchTableEnvironment and old planner.
    
    Removes usages of:
    - DataSet
    - BatchTableEnvironment
    - Legacy planner
    - ExecutionEnvironment
---
 .../docs/dev/python/table/table_environment.md     |   2 +-
 .../docs/dev/python/table/table_environment.md     |   2 +-
 .../flink-python-test/python/python_job.py         |  16 +-
 .../python/tests/FlinkBatchPythonUdfSqlJob.java    |  56 --
 .../python/tests/FlinkStreamPythonUdfSqlJob.java   |  63 ---
 .../test-scripts/test_pyflink.sh                   |  20 -
 flink-python/dev/integration_test.sh               |   3 -
 flink-python/dev/pip_test_code.py                  |  12 +-
 flink-python/docs/index.rst                        |   1 -
 flink-python/docs/pyflink.dataset.rst              |  28 -
 flink-python/docs/pyflink.rst                      |   1 -
 flink-python/pom.xml                               |   8 +-
 .../pyflink/common/tests/test_execution_config.py  |   8 +-
 flink-python/pyflink/dataset/__init__.py           |  27 -
 .../pyflink/dataset/execution_environment.py       | 197 -------
 flink-python/pyflink/dataset/tests/__init__.py     |  17 -
 .../dataset/tests/test_execution_environment.py    | 137 -----
 .../test_execution_environment_completeness.py     |  64 ---
 ...st_stream_execution_environment_completeness.py |   4 +-
 flink-python/pyflink/shell.py                      |  35 --
 flink-python/pyflink/table/__init__.py             |   4 +-
 .../pyflink/table/examples/batch/word_count.py     |   8 +-
 flink-python/pyflink/table/table.py                |   2 +-
 flink-python/pyflink/table/table_environment.py    | 200 +------
 flink-python/pyflink/table/tests/test_calc.py      |  46 +-
 .../pyflink/table/tests/test_dependency.py         |  33 +-
 .../pyflink/table/tests/test_descriptor.py         |  56 +-
 .../pyflink/table/tests/test_pandas_conversion.py  |   7 +-
 .../pyflink/table/tests/test_pandas_udf.py         |  30 +-
 .../pyflink/table/tests/test_set_operation.py      |   4 +-
 .../pyflink/table/tests/test_shell_example.py      |  34 --
 flink-python/pyflink/table/tests/test_sort.py      |   4 +-
 flink-python/pyflink/table/tests/test_sql.py       |  12 +-
 .../table/tests/test_table_environment_api.py      | 629 +--------------------
 flink-python/pyflink/table/tests/test_udf.py       |  23 +-
 flink-python/pyflink/table/tests/test_udtf.py      |  29 +-
 flink-python/pyflink/testing/test_case_utils.py    |  90 +--
 flink-python/setup.py                              |   1 -
 .../flink/table/runtime/arrow/ArrowUtils.java      |  82 +--
 .../AbstractPythonScalarFunctionFlatMap.java       | 119 ----
 .../AbstractPythonStatelessFunctionFlatMap.java    | 312 ----------
 .../python/PythonScalarFunctionFlatMap.java        |  94 ---
 .../python/PythonTableFunctionFlatMap.java         | 174 ------
 .../arrow/ArrowPythonScalarFunctionFlatMap.java    | 131 -----
 .../AbstractRowPythonScalarFunctionOperator.java   |  91 ---
 .../scalar/PythonScalarFunctionOperator.java       |  82 ---
 .../arrow/ArrowPythonScalarFunctionOperator.java   | 135 -----
 .../python/table/PythonTableFunctionOperator.java  | 142 -----
 .../utils/StreamRecordCRowWrappingCollector.java   |  53 --
 .../client/python/PythonFunctionFactoryTest.java   |  29 -
 .../scalar/PythonScalarFunctionOperatorTest.java   | 105 ----
 .../ArrowPythonScalarFunctionOperatorTest.java     | 103 ----
 .../table/PythonTableFunctionOperatorTest.java     |  92 ---
 flink-python/tox.ini                               |   2 +-
 54 files changed, 84 insertions(+), 3575 deletions(-)

diff --git a/docs/content.zh/docs/dev/python/table/table_environment.md b/docs/content.zh/docs/dev/python/table/table_environment.md
index 75c925f..0ba2738 100644
--- a/docs/content.zh/docs/dev/python/table/table_environment.md
+++ b/docs/content.zh/docs/dev/python/table/table_environment.md
@@ -48,7 +48,7 @@ table_env = TableEnvironment.create(env_settings)
 
 ```python
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import StreamTableEnvironment, BatchTableEnvironment, TableConfig
+from pyflink.table import StreamTableEnvironment
 
 # create a blink streaming TableEnvironment from a StreamExecutionEnvironment
 env = StreamExecutionEnvironment.get_execution_environment()
diff --git a/docs/content/docs/dev/python/table/table_environment.md b/docs/content/docs/dev/python/table/table_environment.md
index ea50ebd..d00e1d2 100644
--- a/docs/content/docs/dev/python/table/table_environment.md
+++ b/docs/content/docs/dev/python/table/table_environment.md
@@ -49,7 +49,7 @@ Alternatively, users can create a `StreamTableEnvironment` from an existing `Str
 
 ```python
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import StreamTableEnvironment, BatchTableEnvironment, TableConfig
+from pyflink.table import StreamTableEnvironment
 
 # create a blink streaming TableEnvironment from a StreamExecutionEnvironment
 env = StreamExecutionEnvironment.get_execution_environment()
diff --git a/flink-end-to-end-tests/flink-python-test/python/python_job.py b/flink-end-to-end-tests/flink-python-test/python/python_job.py
index a85e633..2afcf19 100644
--- a/flink-end-to-end-tests/flink-python-test/python/python_job.py
+++ b/flink-end-to-end-tests/flink-python-test/python/python_job.py
@@ -21,8 +21,7 @@ import shutil
 import sys
 import tempfile
 
-from pyflink.dataset import ExecutionEnvironment
-from pyflink.table import BatchTableEnvironment, TableConfig
+from pyflink.table import EnvironmentSettings, TableEnvironment
 
 
 def word_count():
@@ -34,9 +33,8 @@ def word_count():
               "License you may not use this file except in compliance " \
               "with the License"
 
-    t_config = TableConfig()
-    env = ExecutionEnvironment.get_execution_environment()
-    t_env = BatchTableEnvironment.create(env, t_config)
+    env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+    t_env = TableEnvironment.create(environment_settings=env_settings)
 
     # used to test pipeline.jars and pipleline.classpaths
     config_key = sys.argv[1]
@@ -68,9 +66,9 @@ def word_count():
             'connector.path' = '{}'
         )
         """.format(result_path)
-    t_env.sql_update(sink_ddl)
+    t_env.execute_sql(sink_ddl)
 
-    t_env.sql_update("create temporary system function add_one as 'add_one.add_one' language python")
+    t_env.execute_sql("create temporary system function add_one as 'add_one.add_one' language python")
     t_env.register_java_function("add_one_java", "org.apache.flink.python.tests.util.AddOne")
 
     elements = [(word, 0) for word in content.split(" ")]
@@ -78,9 +76,7 @@ def word_count():
         .select("word, add_one(count) as count, add_one_java(count) as count_java") \
         .group_by("word") \
         .select("word, count(count) as count, count(count_java) as count_java") \
-        .insert_into("Results")
-
-    t_env.execute("word_count")
+        .execute_insert("Results")
 
 
 if __name__ == '__main__':
diff --git a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkBatchPythonUdfSqlJob.java b/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkBatchPythonUdfSqlJob.java
deleted file mode 100644
index 3f6768c..0000000
--- a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkBatchPythonUdfSqlJob.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.python.tests;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
-import org.apache.flink.types.Row;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-/** A simple job used to test submitting the Python UDF job in flink batch mode. */
-public class FlinkBatchPythonUdfSqlJob {
-
-    public static void main(String[] args) {
-        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
-        tEnv.executeSql(
-                "create temporary system function add_one as 'add_one.add_one' language python");
-
-        tEnv.createTemporaryView("source", tEnv.fromDataSet(env.fromElements(1L, 2L, 3L)).as("a"));
-
-        Iterator<Row> result = tEnv.executeSql("select add_one(a) as a from source").collect();
-
-        List<Long> actual = new ArrayList<>();
-        while (result.hasNext()) {
-            Row r = result.next();
-            actual.add((Long) r.getField(0));
-        }
-
-        List<Long> expected = Arrays.asList(2L, 3L, 4L);
-        if (!actual.equals(expected)) {
-            throw new AssertionError(
-                    String.format(
-                            "The output result: %s is not as expected: %s!", actual, expected));
-        }
-    }
-}
diff --git a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkStreamPythonUdfSqlJob.java b/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkStreamPythonUdfSqlJob.java
deleted file mode 100644
index f90767e..0000000
--- a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkStreamPythonUdfSqlJob.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.python.tests;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-/** A simple job used to test submitting the Python UDF job in flink stream mode. */
-public class FlinkStreamPythonUdfSqlJob {
-
-    public static void main(String[] args) throws Exception {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-        StreamTableEnvironment tEnv =
-                StreamTableEnvironment.create(
-                        env,
-                        EnvironmentSettings.newInstance()
-                                .useOldPlanner()
-                                .inStreamingMode()
-                                .build());
-        tEnv.executeSql(
-                "create temporary system function add_one as 'add_one.add_one' language python");
-
-        tEnv.createTemporaryView("source", tEnv.fromValues(1L, 2L, 3L).as("a"));
-
-        Iterator<Row> result = tEnv.executeSql("select add_one(a) as a from source").collect();
-
-        List<Long> actual = new ArrayList<>();
-        while (result.hasNext()) {
-            Row r = result.next();
-            actual.add((Long) r.getField(0));
-        }
-
-        List<Long> expected = Arrays.asList(2L, 3L, 4L);
-        if (!actual.equals(expected)) {
-            throw new AssertionError(
-                    String.format(
-                            "The output result: %s is not as expected: %s!", actual, expected));
-        }
-    }
-}
diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink.sh b/flink-end-to-end-tests/test-scripts/test_pyflink.sh
index 24ffc59..27cfd0a 100755
--- a/flink-end-to-end-tests/test-scripts/test_pyflink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_pyflink.sh
@@ -155,26 +155,6 @@ PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
     -c org.apache.flink.python.tests.BlinkBatchPythonUdfSqlJob \
     "${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
 
-echo "Test flink stream python udf sql job:\n"
-PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-    -p 2 \
-    -pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-    -pyreq "${REQUIREMENTS_PATH}" \
-    -pyarch "${TEST_DATA_DIR}/venv.zip" \
-    -pyexec "venv.zip/.conda/bin/python" \
-    -c org.apache.flink.python.tests.FlinkStreamPythonUdfSqlJob \
-    "${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
-
-echo "Test flink batch python udf sql job:\n"
-PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-    -p 2 \
-    -pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-    -pyreq "${REQUIREMENTS_PATH}" \
-    -pyarch "${TEST_DATA_DIR}/venv.zip" \
-    -pyexec "venv.zip/.conda/bin/python" \
-    -c org.apache.flink.python.tests.FlinkBatchPythonUdfSqlJob \
-    "${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
-
 echo "Test using python udf in sql client:\n"
 SQL_CONF=$TEST_DATA_DIR/sql-client-session.conf
 
diff --git a/flink-python/dev/integration_test.sh b/flink-python/dev/integration_test.sh
index d2f10e5..c598d09 100755
--- a/flink-python/dev/integration_test.sh
+++ b/flink-python/dev/integration_test.sh
@@ -36,9 +36,6 @@ FLINK_PYTHON_DIR=$(dirname "$CURRENT_DIR")
 # test common module
 test_module "common"
 
-# test dataset module
-test_module "dataset"
-
 # test datastream module
 test_module "datastream"
 
diff --git a/flink-python/dev/pip_test_code.py b/flink-python/dev/pip_test_code.py
index 0904b53..3bd1062 100755
--- a/flink-python/dev/pip_test_code.py
+++ b/flink-python/dev/pip_test_code.py
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 # test pyflink shell environment
-from pyflink.shell import b_env, bt_env, FileSystem, OldCsv, DataTypes, Schema
+from pyflink.shell import s_env, st_env, FileSystem, OldCsv, DataTypes, Schema
 
 import tempfile
 import os
@@ -28,9 +28,9 @@ if os.path.exists(sink_path):
         os.remove(sink_path)
     else:
         shutil.rmtree(sink_path)
-b_env.set_parallelism(1)
-t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
-bt_env.connect(FileSystem().path(sink_path)) \
+s_env.set_parallelism(1)
+t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
+st_env.connect(FileSystem().path(sink_path)) \
     .with_format(OldCsv()
                  .field_delimiter(',')
                  .field("a", DataTypes.BIGINT())
@@ -40,9 +40,9 @@ bt_env.connect(FileSystem().path(sink_path)) \
                  .field("a", DataTypes.BIGINT())
                  .field("b", DataTypes.STRING())
                  .field("c", DataTypes.STRING())) \
-    .create_temporary_table("batch_sink")
+    .create_temporary_table("csv_sink")
 
-t.select("a + 1, b, c").execute_insert("batch_sink").wait()
+t.select("a + 1, b, c").execute_insert("csv_sink").wait()
 
 with open(sink_path, 'r') as f:
     lines = f.read()
diff --git a/flink-python/docs/index.rst b/flink-python/docs/index.rst
index 3d6f538..7a174fd 100644
--- a/flink-python/docs/index.rst
+++ b/flink-python/docs/index.rst
@@ -26,7 +26,6 @@ Welcome to Flink Python API Docs!
    pyflink
    pyflink.common
    pyflink.table
-   pyflink.dataset
    pyflink.datastream
    pyflink.metrics
 
diff --git a/flink-python/docs/pyflink.dataset.rst b/flink-python/docs/pyflink.dataset.rst
deleted file mode 100644
index dc42fcd..0000000
--- a/flink-python/docs/pyflink.dataset.rst
+++ /dev/null
@@ -1,28 +0,0 @@
-.. ################################################################################
-     Licensed to the Apache Software Foundation (ASF) under one
-     or more contributor license agreements.  See the NOTICE file
-     distributed with this work for additional information
-     regarding copyright ownership.  The ASF licenses this file
-     to you under the Apache License, Version 2.0 (the
-     "License"); you may not use this file except in compliance
-     with the License.  You may obtain a copy of the License at
-
-         http://www.apache.org/licenses/LICENSE-2.0
-
-     Unless required by applicable law or agreed to in writing, software
-     distributed under the License is distributed on an "AS IS" BASIS,
-     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-     See the License for the specific language governing permissions and
-    limitations under the License.
-   ################################################################################
-
-pyflink.dataset package
-=======================
-
-Module contents
----------------
-
-.. automodule:: pyflink.dataset
-    :members:
-    :undoc-members:
-    :show-inheritance:
diff --git a/flink-python/docs/pyflink.rst b/flink-python/docs/pyflink.rst
index ff17b2a..81ba9f7 100644
--- a/flink-python/docs/pyflink.rst
+++ b/flink-python/docs/pyflink.rst
@@ -27,7 +27,6 @@ Subpackages
 
     pyflink.common
     pyflink.table
-    pyflink.dataset
     pyflink.datastream
 
 .. automodule:: pyflink
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 9bb15ec..7199b2a 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -70,7 +70,7 @@ under the License.
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -80,12 +80,6 @@ under the License.
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
 
 		<!-- Beam dependencies -->
 
diff --git a/flink-python/pyflink/common/tests/test_execution_config.py b/flink-python/pyflink/common/tests/test_execution_config.py
index 8f3ba67..33433df 100644
--- a/flink-python/pyflink/common/tests/test_execution_config.py
+++ b/flink-python/pyflink/common/tests/test_execution_config.py
@@ -15,7 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-from pyflink.dataset import ExecutionEnvironment
+from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.common import (ExecutionConfig, RestartStrategies, ExecutionMode)
 from pyflink.java_gateway import get_gateway
 from pyflink.testing.test_case_utils import PyFlinkTestCase
@@ -24,7 +24,7 @@ from pyflink.testing.test_case_utils import PyFlinkTestCase
 class ExecutionConfigTests(PyFlinkTestCase):
 
     def setUp(self):
-        self.env = ExecutionEnvironment.get_execution_environment()
+        self.env = StreamExecutionEnvironment.get_execution_environment()
         self.execution_config = self.env.get_config()
 
     def test_constant(self):
@@ -253,9 +253,9 @@ class ExecutionConfigTests(PyFlinkTestCase):
 
     def test_equals_and_hash(self):
 
-        config1 = ExecutionEnvironment.get_execution_environment().get_config()
+        config1 = StreamExecutionEnvironment.get_execution_environment().get_config()
 
-        config2 = ExecutionEnvironment.get_execution_environment().get_config()
+        config2 = StreamExecutionEnvironment.get_execution_environment().get_config()
 
         self.assertEqual(config1, config2)
 
diff --git a/flink-python/pyflink/dataset/__init__.py b/flink-python/pyflink/dataset/__init__.py
deleted file mode 100644
index 3bd64cd..0000000
--- a/flink-python/pyflink/dataset/__init__.py
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-"""
-Important classes of Flink Batch API:
-
-    - :class:`ExecutionEnvironment`:
-      The ExecutionEnvironment is the context in which a batch program is executed.
-"""
-from pyflink.dataset.execution_environment import ExecutionEnvironment
-
-__all__ = ['ExecutionEnvironment']
diff --git a/flink-python/pyflink/dataset/execution_environment.py b/flink-python/pyflink/dataset/execution_environment.py
deleted file mode 100644
index 821189c..0000000
--- a/flink-python/pyflink/dataset/execution_environment.py
+++ /dev/null
@@ -1,197 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from pyflink.common.execution_config import ExecutionConfig
-from pyflink.common.job_execution_result import JobExecutionResult
-from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
-from pyflink.java_gateway import get_gateway
-from pyflink.util.java_utils import load_java_class
-
-
-class ExecutionEnvironment(object):
-    """
-    The ExecutionEnvironment is the context in which a program is executed.
-
-    The environment provides methods to control the job execution (such as setting the parallelism)
-    and to interact with the outside world (data access).
-    """
-
-    def __init__(self, j_execution_environment):
-        self._j_execution_environment = j_execution_environment
-
-    def get_parallelism(self) -> int:
-        """
-        Gets the parallelism with which operation are executed by default.
-
-        :return: The parallelism.
-        """
-        return self._j_execution_environment.getParallelism()
-
-    def set_parallelism(self, parallelism: int):
-        """
-        Sets the parallelism for operations executed through this environment.
-        Setting a parallelism of x here will cause all operators to run with
-        x parallel instances.
-
-        :param parallelism: The parallelism.
-        """
-        self._j_execution_environment.setParallelism(parallelism)
-
-    def get_default_local_parallelism(self) -> int:
-        """
-        Gets the default parallelism that will be used for the local execution environment.
-
-        :return: The parallelism.
-        """
-        return self._j_execution_environment.getDefaultLocalParallelism()
-
-    def set_default_local_parallelism(self, parallelism: int):
-        """
-        Sets the default parallelism that will be used for the local execution environment.
-
-        :param parallelism: The parallelism.
-        """
-        self._j_execution_environment.setDefaultLocalParallelism(parallelism)
-
-    def get_config(self) -> ExecutionConfig:
-        """
-        Gets the config object that defines execution parameters.
-
-        :return: An :class:`ExecutionConfig` object, the environment's execution configuration.
-        """
-        return ExecutionConfig(self._j_execution_environment.getConfig())
-
-    def set_restart_strategy(self, restart_strategy_configuration: RestartStrategyConfiguration):
-        """
-        Sets the restart strategy configuration. The configuration specifies which restart strategy
-        will be used for the execution graph in case of a restart.
-
-        Example:
-        ::
-
-            >>> env.set_restart_strategy(RestartStrategies.no_restart())
-
-        :param restart_strategy_configuration: Restart strategy configuration to be set.
-        """
-        self._j_execution_environment.setRestartStrategy(
-            restart_strategy_configuration._j_restart_strategy_configuration)
-
-    def get_restart_strategy(self) -> RestartStrategyConfiguration:
-        """
-        Returns the specified restart strategy configuration.
-
-        :return: The restart strategy configuration to be used.
-        """
-        return RestartStrategies._from_j_restart_strategy(
-            self._j_execution_environment.getRestartStrategy())
-
-    def add_default_kryo_serializer(self, type_class_name: str, serializer_class_name: str):
-        """
-        Adds a new Kryo default serializer to the Runtime.
-
-        Example:
-        ::
-
-            >>> env.add_default_kryo_serializer("com.aaa.bbb.TypeClass", "com.aaa.bbb.Serializer")
-
-        :param type_class_name: The full-qualified java class name of the types serialized with the
-                                given serializer.
-        :param serializer_class_name: The full-qualified java class name of the serializer to use.
-        """
-        type_clz = load_java_class(type_class_name)
-        j_serializer_clz = load_java_class(serializer_class_name)
-        self._j_execution_environment.addDefaultKryoSerializer(type_clz, j_serializer_clz)
-
-    def register_type_with_kryo_serializer(self, type_class_name: str, serializer_class_name: str):
-        """
-        Registers the given Serializer via its class as a serializer for the given type at the
-        KryoSerializer.
-
-        Example:
-        ::
-
-            >>> env.register_type_with_kryo_serializer("com.aaa.bbb.TypeClass",
-            ...                                        "com.aaa.bbb.Serializer")
-
-        :param type_class_name: The full-qualified java class name of the types serialized with
-                                the given serializer.
-        :param serializer_class_name: The full-qualified java class name of the serializer to use.
-        """
-        type_clz = load_java_class(type_class_name)
-        j_serializer_clz = load_java_class(serializer_class_name)
-        self._j_execution_environment.registerTypeWithKryoSerializer(type_clz, j_serializer_clz)
-
-    def register_type(self, type_class_name: str):
-        """
-        Registers the given type with the serialization stack. If the type is eventually
-        serialized as a POJO, then the type is registered with the POJO serializer. If the
-        type ends up being serialized with Kryo, then it will be registered at Kryo to make
-        sure that only tags are written.
-
-        Example:
-        ::
-
-            >>> env.register_type("com.aaa.bbb.TypeClass")
-
-        :param type_class_name: The full-qualified java class name of the type to register.
-        """
-        type_clz = load_java_class(type_class_name)
-        self._j_execution_environment.registerType(type_clz)
-
-    def execute(self, job_name: str = None) -> JobExecutionResult:
-        """
-        Triggers the program execution. The environment will execute all parts of the program that
-        have resulted in a "sink" operation.
-
-        The program execution will be logged and displayed with the given job name.
-
-        :param job_name: Desired name of the job, optional.
-        :return: The result of the job execution, containing elapsed time and accumulators.
-        """
-        if job_name is None:
-            return JobExecutionResult(self._j_execution_environment.execute())
-        else:
-            return JobExecutionResult(self._j_execution_environment.execute(job_name))
-
-    def get_execution_plan(self) -> str:
-        """
-        Creates the plan with which the system will execute the program, and returns it as
-        a String using a JSON representation of the execution data flow graph.
-        Note that this needs to be called, before the plan is executed.
-
-        If the compiler could not be instantiated, or the master could not
-        be contacted to retrieve information relevant to the execution planning,
-        an exception will be thrown.
-
-        :return: The execution plan of the program, as a JSON String.
-        """
-        return self._j_execution_environment.getExecutionPlan()
-
-    @staticmethod
-    def get_execution_environment() -> 'ExecutionEnvironment':
-        """
-        Creates an execution environment that represents the context in which the program is
-        currently executed. If the program is invoked standalone, this method returns a local
-        execution environment. If the program is invoked from within the command line client to be
-        submitted to a cluster, this method returns the execution environment of this cluster.
-
-        :return: The :class:`ExecutionEnvironment` of the context in which the program is executed.
-        """
-        gateway = get_gateway()
-        j_execution_environment = gateway.jvm.org.apache.flink.api.java.ExecutionEnvironment\
-            .getExecutionEnvironment()
-        return ExecutionEnvironment(j_execution_environment)
diff --git a/flink-python/pyflink/dataset/tests/__init__.py b/flink-python/pyflink/dataset/tests/__init__.py
deleted file mode 100644
index 65b48d4..0000000
--- a/flink-python/pyflink/dataset/tests/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
diff --git a/flink-python/pyflink/dataset/tests/test_execution_environment.py b/flink-python/pyflink/dataset/tests/test_execution_environment.py
deleted file mode 100644
index 49ed8c9..0000000
--- a/flink-python/pyflink/dataset/tests/test_execution_environment.py
+++ /dev/null
@@ -1,137 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-import json
-import os
-import tempfile
-import time
-
-import unittest
-
-from pyflink.common import ExecutionConfig, RestartStrategies
-from pyflink.dataset import ExecutionEnvironment
-from pyflink.table import DataTypes, BatchTableEnvironment, CsvTableSource, CsvTableSink
-from pyflink.testing.test_case_utils import PyFlinkTestCase, exec_insert_table
-
-
-class ExecutionEnvironmentTests(PyFlinkTestCase):
-
-    def setUp(self):
-        self.env = ExecutionEnvironment.get_execution_environment()
-
-    def test_get_set_parallelism(self):
-
-        self.env.set_parallelism(10)
-
-        parallelism = self.env.get_parallelism()
-
-        self.assertEqual(parallelism, 10)
-
-    def test_get_set_default_local_parallelism(self):
-
-        self.env.set_default_local_parallelism(8)
-
-        parallelism = self.env.get_default_local_parallelism()
-
-        self.assertEqual(parallelism, 8)
-
-    def test_get_config(self):
-
-        execution_config = self.env.get_config()
-
-        self.assertIsInstance(execution_config, ExecutionConfig)
-
-    def test_set_get_restart_strategy(self):
-
-        self.env.set_restart_strategy(RestartStrategies.no_restart())
-
-        restart_strategy = self.env.get_restart_strategy()
-
-        self.assertEqual(restart_strategy, RestartStrategies.no_restart())
-
-    def test_add_default_kryo_serializer(self):
-
-        self.env.add_default_kryo_serializer(
-            "org.apache.flink.runtime.state.StateBackendTestBase$TestPojo",
-            "org.apache.flink.runtime.state.StateBackendTestBase$CustomKryoTestSerializer")
-
-        class_dict = self.env.get_config().get_default_kryo_serializer_classes()
-
-        self.assertEqual(class_dict,
-                         {'org.apache.flink.runtime.state.StateBackendTestBase$TestPojo':
-                          'org.apache.flink.runtime.state'
-                          '.StateBackendTestBase$CustomKryoTestSerializer'})
-
-    def test_register_type_with_kryo_serializer(self):
-
-        self.env.register_type_with_kryo_serializer(
-            "org.apache.flink.runtime.state.StateBackendTestBase$TestPojo",
-            "org.apache.flink.runtime.state.StateBackendTestBase$CustomKryoTestSerializer")
-
-        class_dict = self.env.get_config().get_registered_types_with_kryo_serializer_classes()
-
-        self.assertEqual(class_dict,
-                         {'org.apache.flink.runtime.state.StateBackendTestBase$TestPojo':
-                          'org.apache.flink.runtime.state'
-                          '.StateBackendTestBase$CustomKryoTestSerializer'})
-
-    def test_register_type(self):
-
-        self.env.register_type("org.apache.flink.runtime.state.StateBackendTestBase$TestPojo")
-
-        type_list = self.env.get_config().get_registered_pojo_types()
-
-        self.assertEqual(type_list,
-                         ["org.apache.flink.runtime.state.StateBackendTestBase$TestPojo"])
-
-    @unittest.skip("Python API does not support DataSet now. refactor this test later")
-    def test_get_execution_plan(self):
-        tmp_dir = tempfile.gettempdir()
-        source_path = os.path.join(tmp_dir + '/streaming.csv')
-        tmp_csv = os.path.join(tmp_dir + '/streaming2.csv')
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
-
-        t_env = BatchTableEnvironment.create(self.env)
-        csv_source = CsvTableSource(source_path, field_names, field_types)
-        t_env.register_table_source("Orders", csv_source)
-        t_env.register_table_sink(
-            "Results",
-            CsvTableSink(field_names, field_types, tmp_csv))
-        t_env.from_path("Orders").execute_insert("Results").wait()
-
-        plan = self.env.get_execution_plan()
-
-        json.loads(plan)
-
-    def test_execute(self):
-        tmp_dir = tempfile.gettempdir()
-        field_names = ['a', 'b', 'c']
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env = BatchTableEnvironment.create(self.env)
-        t_env.register_table_sink(
-            'Results',
-            CsvTableSink(field_names, field_types,
-                         os.path.join('{}/{}.csv'.format(tmp_dir, round(time.time())))))
-        execution_result = exec_insert_table(
-            t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c']),
-            'Results')
-        self.assertIsNotNone(execution_result.get_job_id())
-        self.assertIsNotNone(execution_result.get_net_runtime())
-        self.assertEqual(len(execution_result.get_all_accumulator_results()), 0)
-        self.assertIsNone(execution_result.get_accumulator_result('accumulator'))
-        self.assertIsNotNone(str(execution_result))
diff --git a/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py b/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py
deleted file mode 100644
index 2d49844..0000000
--- a/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py
+++ /dev/null
@@ -1,64 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-from pyflink.dataset import ExecutionEnvironment
-from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase, PyFlinkTestCase
-
-
-class ExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
-                                            PyFlinkTestCase):
-
-    @classmethod
-    def python_class(cls):
-        return ExecutionEnvironment
-
-    @classmethod
-    def java_class(cls):
-        return "org.apache.flink.api.java.ExecutionEnvironment"
-
-    @classmethod
-    def excluded_methods(cls):
-        # Exclude these methods for the time being, because current
-        # ExecutionEnvironment/StreamExecutionEnvironment do not apply to the
-        # DataSet/DataStream API, but to the Table API configuration.
-        # Currently only the methods for configuration is added.
-        # 'setSessionTimeout', 'getSessionTimeout', 'setNumberOfExecutionRetries',
-        # 'getNumberOfExecutionRetries' is deprecated, exclude them.
-        # 'access$000' is generated by java compiler, exclude it too.
-        return {'resetContextEnvironment', 'getSessionTimeout', 'fromParallelCollection',
-                'getId', 'registerCachedFile', 'setNumberOfExecutionRetries', 'readTextFile',
-                'getNumberOfExecutionRetries', 'registerCachedFilesWithPlan',
-                'getLastJobExecutionResult', 'readCsvFile', 'initializeContextEnvironment',
-                'createLocalEnvironment', 'createLocalEnvironmentWithWebUI', 'createProgramPlan',
-                'getIdString', 'setSessionTimeout', 'fromElements', 'createRemoteEnvironment',
-                'startNewSession', 'fromCollection', 'readTextFileWithValue', 'registerDataSink',
-                'createCollectionsEnvironment', 'readFile', 'readFileOfPrimitives',
-                'generateSequence', 'areExplicitEnvironmentsAllowed', 'createInput',
-                'getUserCodeClassLoader', 'getExecutorServiceLoader', 'getConfiguration',
-                'executeAsync', 'registerJobListener', 'clearJobListeners', 'configure'}
-
-
-if __name__ == '__main__':
-    import unittest
-
-    try:
-        import xmlrunner
-        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
-    except ImportError:
-        testRunner = None
-    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
index 890df9c..27d7c37 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
@@ -34,8 +34,8 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
     @classmethod
     def excluded_methods(cls):
         # Exclude these methods for the time being, because current
-        # ExecutionEnvironment/StreamExecutionEnvironment do not apply to the
-        # DataSet/DataStream API, but to the Table API configuration.
+        # StreamExecutionEnvironment do not apply to the
+        # DataStream API, but to the Table API configuration.
         # Currently only the methods for configuration is added.
         # 'isForceCheckpointing', 'getNumberOfExecutionRetries', 'setNumberOfExecutionRetries'
         # is deprecated, exclude them.
diff --git a/flink-python/pyflink/shell.py b/flink-python/pyflink/shell.py
index b5c9dc0..841991e 100644
--- a/flink-python/pyflink/shell.py
+++ b/flink-python/pyflink/shell.py
@@ -20,7 +20,6 @@ import platform
 import sys
 
 from pyflink.common import *
-from pyflink.dataset import *
 from pyflink.datastream import *
 from pyflink.table import *
 from pyflink.table.catalog import *
@@ -73,36 +72,6 @@ welcome_msg = u'''
 
 NOTE: Use the prebound Table Environment to implement batch or streaming Table programs.
 
-  Batch - Use 'b_env' and 'bt_env' variables
-
-    *
-    * import tempfile
-    * import os
-    * import shutil
-    * sink_path = tempfile.gettempdir() + '/batch.csv'
-    * if os.path.exists(sink_path):
-    *     if os.path.isfile(sink_path):
-    *         os.remove(sink_path)
-    *     else:
-    *         shutil.rmtree(sink_path)
-    * b_env.set_parallelism(1)
-    * t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
-    * bt_env.connect(FileSystem().path(sink_path)) \\
-    *     .with_format(OldCsv()
-    *                  .field_delimiter(',')
-    *                  .field("a", DataTypes.BIGINT())
-    *                  .field("b", DataTypes.STRING())
-    *                  .field("c", DataTypes.STRING())) \\
-    *     .with_schema(Schema()
-    *                  .field("a", DataTypes.BIGINT())
-    *                  .field("b", DataTypes.STRING())
-    *                  .field("c", DataTypes.STRING())) \\
-    *     .create_temporary_table("batch_sink")
-    *
-    * t.select("a + 1, b, c").insert_into("batch_sink")
-    *
-    * bt_env.execute("batch_job")
-
   Streaming - Use 's_env' and 'st_env' variables
 
     *
@@ -135,10 +104,6 @@ NOTE: Use the prebound Table Environment to implement batch or streaming Table p
 '''
 utf8_out.write(welcome_msg)
 
-b_env = ExecutionEnvironment.get_execution_environment()
-
-bt_env = BatchTableEnvironment.create(b_env)
-
 s_env = StreamExecutionEnvironment.get_execution_environment()
 
 st_env = StreamTableEnvironment.create(s_env)
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index 75436c9..1cf4313 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -81,8 +81,7 @@ from pyflink.table.statement_set import StatementSet
 from pyflink.table.table import GroupWindowedTable, GroupedTable, OverWindowedTable, Table, \
     WindowGroupedTable
 from pyflink.table.table_config import TableConfig
-from pyflink.table.table_environment import (TableEnvironment, StreamTableEnvironment,
-                                             BatchTableEnvironment)
+from pyflink.table.table_environment import (TableEnvironment, StreamTableEnvironment)
 from pyflink.table.table_result import TableResult
 from pyflink.table.table_schema import TableSchema
 from pyflink.table.types import DataTypes, UserDefinedType, Row, RowKind
@@ -91,7 +90,6 @@ from pyflink.table.udf import FunctionContext, ScalarFunction, TableFunction, Ag
 
 __all__ = [
     'AggregateFunction',
-    'BatchTableEnvironment',
     'CsvTableSink',
     'CsvTableSource',
     'DataTypes',
diff --git a/flink-python/pyflink/table/examples/batch/word_count.py b/flink-python/pyflink/table/examples/batch/word_count.py
index ec49fb4..2c0d930 100644
--- a/flink-python/pyflink/table/examples/batch/word_count.py
+++ b/flink-python/pyflink/table/examples/batch/word_count.py
@@ -21,7 +21,7 @@ import shutil
 import sys
 import tempfile
 
-from pyflink.table import BatchTableEnvironment, EnvironmentSettings
+from pyflink.table import EnvironmentSettings, TableEnvironment
 from pyflink.table import expressions as expr
 
 
@@ -35,7 +35,7 @@ def word_count():
               "with the License"
 
     env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
-    t_env = BatchTableEnvironment.create(environment_settings=env_settings)
+    t_env = TableEnvironment.create(environment_settings=env_settings)
 
     # register Results table in table environment
     tmp_dir = tempfile.gettempdir()
@@ -67,9 +67,7 @@ def word_count():
     table = t_env.from_elements(elements, ["word", "count"])
     table.group_by(table.word) \
          .select(table.word, expr.lit(1).count.alias('count')) \
-         .insert_into("Results")
-
-    t_env.execute("word_count")
+         .execute_insert("Results")
 
 
 if __name__ == '__main__':
diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py
index ccb3aa4..9e08278 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -44,7 +44,7 @@ class Table(object):
 
     """
     A :class:`~pyflink.table.Table` is the core component of the Table API.
-    Similar to how the batch and streaming APIs have DataSet and DataStream,
+    Similar to how the DataStream API has DataStream,
     the Table API is built around :class:`~pyflink.table.Table`.
 
     Use the methods of :class:`~pyflink.table.Table` to transform data.
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 2114dad..82d4557 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -30,13 +30,12 @@ from pyflink.common.typeinfo import TypeInformation
 from pyflink.datastream.data_stream import DataStream
 
 from pyflink.common import JobExecutionResult
-from pyflink.dataset import ExecutionEnvironment
 from pyflink.java_gateway import get_gateway
 from pyflink.serializers import BatchedSerializer, PickleSerializer
 from pyflink.table import Table, EnvironmentSettings, Expression, ExplainDetail, \
     Module, ModuleEntry, TableSink
 from pyflink.table.catalog import Catalog
-from pyflink.table.descriptors import StreamTableDescriptor, BatchTableDescriptor, \
+from pyflink.table.descriptors import StreamTableDescriptor, \
     ConnectorDescriptor, ConnectTableDescriptor
 from pyflink.table.serializers import ArrowSerializer
 from pyflink.table.statement_set import StatementSet
@@ -46,14 +45,13 @@ from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, D
     _infer_schema_from_data, _create_converter, from_arrow_type, RowField, create_arrow_schema, \
     _to_java_data_type
 from pyflink.table.udf import UserDefinedFunctionWrapper, AggregateFunction, udaf, \
-    UserDefinedAggregateFunctionWrapper, udtaf, TableAggregateFunction
+    udtaf, TableAggregateFunction
 from pyflink.table.utils import to_expression_jarray
 from pyflink.util import java_utils
 from pyflink.util.java_utils import get_j_env_configuration, is_local_deployment, load_java_class, \
     to_j_explain_detail_arr, to_jarray
 
 __all__ = [
-    'BatchTableEnvironment',
     'StreamTableEnvironment',
     'TableEnvironment'
 ]
@@ -93,7 +91,6 @@ class TableEnvironment(object):
 
     def __init__(self, j_tenv, serializer=PickleSerializer()):
         self._j_tenv = j_tenv
-        self._is_blink_planner = TableEnvironment._judge_blink_planner(j_tenv)
         self._serializer = serializer
         # When running in MiniCluster, launch the Python UDF worker using the Python executable
         # specified by sys.executable if users have not specified it explicitly via configuration
@@ -115,16 +112,6 @@ class TableEnvironment(object):
             environment_settings._j_environment_settings)
         return TableEnvironment(j_tenv)
 
-    @staticmethod
-    def _judge_blink_planner(j_tenv):
-        if "getPlanner" not in dir(j_tenv):
-            return False
-        else:
-            j_planner_class = j_tenv.getPlanner().getClass()
-            j_blink_planner_class = get_java_class(
-                get_gateway().jvm.org.apache.flink.table.planner.delegation.PlannerBase)
-            return j_blink_planner_class.isAssignableFrom(j_planner_class)
-
     def from_table_source(self, table_source: 'TableSource') -> 'Table':
         """
         Creates a table from a table source.
@@ -1082,8 +1069,7 @@ class TableEnvironment(object):
             .loadClass(function_class_name).newInstance()
         # this is a temporary solution and will be unified later when we use the new type
         # system(DataType) to replace the old type system(TypeInformation).
-        if (self._is_blink_planner and not isinstance(self, StreamTableEnvironment)) or \
-                self.__class__ == TableEnvironment:
+        if not isinstance(self, StreamTableEnvironment) or self.__class__ == TableEnvironment:
             if self._is_table_function(java_function):
                 self._register_table_function(name, java_function)
             elif self._is_aggregate_function(java_function):
@@ -1128,8 +1114,7 @@ class TableEnvironment(object):
         java_function = function._java_user_defined_function()
         # this is a temporary solution and will be unified later when we use the new type
         # system(DataType) to replace the old type system(TypeInformation).
-        if (self._is_blink_planner and isinstance(self, BatchTableEnvironment)) or \
-                self.__class__ == TableEnvironment:
+        if self.__class__ == TableEnvironment:
             if self._is_table_function(java_function):
                 self._register_table_function(name, java_function)
             elif self._is_aggregate_function(java_function):
@@ -1442,14 +1427,10 @@ class TableEnvironment(object):
             execution_config = self._get_j_env().getConfig()
             gateway = get_gateway()
             j_objs = gateway.jvm.PythonBridgeUtils.readPythonObjects(temp_file.name, True)
-            if self._is_blink_planner:
-                PythonTableUtils = gateway.jvm \
-                    .org.apache.flink.table.planner.utils.python.PythonTableUtils
-                PythonInputFormatTableSource = gateway.jvm \
-                    .org.apache.flink.table.planner.utils.python.PythonInputFormatTableSource
-            else:
-                PythonTableUtils = gateway.jvm.PythonTableUtils
-                PythonInputFormatTableSource = gateway.jvm.PythonInputFormatTableSource
+            PythonTableUtils = gateway.jvm \
+                .org.apache.flink.table.planner.utils.python.PythonTableUtils
+            PythonInputFormatTableSource = gateway.jvm \
+                .org.apache.flink.table.planner.utils.python.PythonInputFormatTableSource
             j_input_format = PythonTableUtils.getInputFormat(
                 j_objs, row_type_info, execution_config)
             j_table_source = PythonInputFormatTableSource(
@@ -1489,10 +1470,6 @@ class TableEnvironment(object):
         .. versionadded:: 1.11.0
         """
 
-        if not self._is_blink_planner and isinstance(self, BatchTableEnvironment):
-            raise TypeError("It doesn't support to convert from Pandas DataFrame in the batch "
-                            "mode of old planner")
-
         import pandas as pd
         if not isinstance(pdf, pd.DataFrame):
             raise TypeError("Unsupported type, expected pandas.DataFrame, got %s" % type(pdf))
@@ -1535,9 +1512,8 @@ class TableEnvironment(object):
 
             data_type = jvm.org.apache.flink.table.types.utils.TypeConversions\
                 .fromLegacyInfoToDataType(_to_java_type(result_type)).notNull()
-            if self._is_blink_planner:
-                data_type = data_type.bridgedTo(
-                    load_java_class('org.apache.flink.table.data.RowData'))
+            data_type = data_type.bridgedTo(
+                load_java_class('org.apache.flink.table.data.RowData'))
 
             j_arrow_table_source = \
                 jvm.org.apache.flink.table.runtime.arrow.ArrowUtils.createArrowTableSource(
@@ -1566,13 +1542,7 @@ class TableEnvironment(object):
             j_configuration.setString(config_key, ";".join(jar_urls_set))
 
     def _get_j_env(self):
-        if self._is_blink_planner:
-            return self._j_tenv.getPlanner().getExecEnv()
-        else:
-            try:
-                return self._j_tenv.execEnv()
-            except:
-                return self._j_tenv.getPlanner().getExecutionEnvironment()
+        return self._j_tenv.getPlanner().getExecEnv()
 
     @staticmethod
     def _is_table_function(java_function):
@@ -1618,10 +1588,6 @@ class TableEnvironment(object):
         self._add_jars_to_j_env_config(classpaths_key)
 
     def _wrap_aggregate_function_if_needed(self, function) -> UserDefinedFunctionWrapper:
-        if isinstance(function, (AggregateFunction, TableAggregateFunction,
-                                 UserDefinedAggregateFunctionWrapper)):
-            if not self._is_blink_planner:
-                raise Exception("Python UDAF and UDTAF are only supported in blink planner")
         if isinstance(function, AggregateFunction):
             function = udaf(function,
                             result_type=function.get_result_type(),
@@ -1794,147 +1760,3 @@ class StreamTableEnvironment(TableEnvironment):
         """
         j_data_stream = self._j_tenv.toRetractStream(table._j_table, type_info.get_java_type_info())
         return DataStream(j_data_stream=j_data_stream)
-
-
-class BatchTableEnvironment(TableEnvironment):
-    """
-    .. note:: BatchTableEnvironment will be dropped in Flink 1.14 because it only supports the old
-              planner. Use the unified :class:`~pyflink.table.TableEnvironment` instead, which
-              supports both batch and streaming. More advanced operations previously covered by
-              the DataSet API can now use the DataStream API in BATCH execution mode.
-    """
-
-    def __init__(self, j_tenv):
-        super(BatchTableEnvironment, self).__init__(j_tenv)
-        self._j_tenv = j_tenv
-
-    def connect(self, connector_descriptor: ConnectorDescriptor) -> \
-            Union[BatchTableDescriptor, StreamTableDescriptor]:
-        """
-        Creates a temporary table from a descriptor.
-
-        Descriptors allow for declaring the communication to external systems in an
-        implementation-agnostic way. The classpath is scanned for suitable table factories that
-        match the desired configuration.
-
-        The following example shows how to read from a connector using a JSON format and
-        registering a temporary table as "MyTable":
-        ::
-
-            >>> table_env \\
-            ...     .connect(ExternalSystemXYZ()
-            ...              .version("0.11")) \\
-            ...     .with_format(Json()
-            ...                  .json_schema("{...}")
-            ...                  .fail_on_missing_field(False)) \\
-            ...     .with_schema(Schema()
-            ...                  .field("user-name", "VARCHAR")
-            ...                  .from_origin_field("u_name")
-            ...                  .field("count", "DECIMAL")) \\
-            ...     .create_temporary_table("MyTable")
-
-        :param connector_descriptor: Connector descriptor describing the external system.
-        :return: A :class:`~pyflink.table.descriptors.BatchTableDescriptor` or a
-                 :class:`~pyflink.table.descriptors.StreamTableDescriptor` (for blink planner) used
-                 to build the temporary table.
-
-        .. note:: Deprecated in 1.11. Use :func:`execute_sql` to register a table instead.
-        """
-        warnings.warn("Deprecated in 1.11. Use execute_sql instead.", DeprecationWarning)
-        gateway = get_gateway()
-        blink_t_env_class = get_java_class(
-            gateway.jvm.org.apache.flink.table.api.internal.TableEnvironmentImpl)
-        if blink_t_env_class == self._j_tenv.getClass():
-            return StreamTableDescriptor(
-                self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
-        else:
-            return BatchTableDescriptor(
-                self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
-
-    @staticmethod
-    def create(execution_environment: ExecutionEnvironment = None,  # type: ignore
-               table_config: TableConfig = None,
-               environment_settings: EnvironmentSettings = None) -> 'BatchTableEnvironment':
-        """
-        Creates a :class:`~pyflink.table.BatchTableEnvironment`.
-
-        Example:
-        ::
-
-            # create with ExecutionEnvironment.
-            >>> env = ExecutionEnvironment.get_execution_environment()
-            >>> table_env = BatchTableEnvironment.create(env)
-            # create with ExecutionEnvironment and TableConfig.
-            >>> table_config = TableConfig()
-            >>> table_config.set_null_check(False)
-            >>> table_env = BatchTableEnvironment.create(env, table_config)
-            # create with EnvironmentSettings.
-            >>> environment_settings = EnvironmentSettings.new_instance().in_batch_mode() \\
-            ...     .use_blink_planner().build()
-            >>> table_env = BatchTableEnvironment.create(environment_settings=environment_settings)
-
-        :param execution_environment: The batch :class:`~pyflink.dataset.ExecutionEnvironment` of
-                                      the TableEnvironment.
-        :param table_config: The configuration of the TableEnvironment, optional.
-        :param environment_settings: The environment settings used to instantiate the
-                                     TableEnvironment. It provides the interfaces about planner
-                                     selection(flink or blink), optional.
-        :return: The BatchTableEnvironment created from given ExecutionEnvironment and
-                 configuration.
-
-        .. note:: This part of the API will be dropped in Flink 1.14 because it only supports the
-                  old planner. Use the unified :class:`~pyflink.table.TableEnvironment` instead, it
-                  supports both batch and streaming. For more advanced operations, the new batch
-                  mode of the DataStream API might be useful.
-        """
-        warnings.warn(
-            "Deprecated in 1.14. Use the unified TableEnvironment instead.",
-            DeprecationWarning)
-        if execution_environment is None and \
-                table_config is None and \
-                environment_settings is None:
-            raise ValueError("No argument found, the param 'execution_environment' "
-                             "or 'environment_settings' is required.")
-        elif execution_environment is None and \
-                table_config is not None and \
-                environment_settings is None:
-            raise ValueError("Only the param 'table_config' is found, "
-                             "the param 'execution_environment' is also required.")
-        elif execution_environment is not None and \
-                environment_settings is not None:
-            raise ValueError("The param 'execution_environment' and "
-                             "'environment_settings' cannot be used at the same time")
-        elif table_config is not None and \
-                environment_settings is not None:
-            raise ValueError("The param 'table_config' and "
-                             "'environment_settings' cannot be used at the same time")
-
-        gateway = get_gateway()
-        if environment_settings is not None:
-            if environment_settings.is_streaming_mode():
-                raise ValueError("The environment settings for BatchTableEnvironment must be "
-                                 "set to batch mode.")
-            JEnvironmentSettings = get_gateway().jvm.org.apache.flink.table.api.EnvironmentSettings
-
-            old_planner_class_name = EnvironmentSettings.new_instance().in_batch_mode() \
-                .use_old_planner().build()._j_environment_settings \
-                .toPlannerProperties()[JEnvironmentSettings.CLASS_NAME]
-            planner_properties = environment_settings._j_environment_settings.toPlannerProperties()
-            if JEnvironmentSettings.CLASS_NAME in planner_properties and \
-                    planner_properties[JEnvironmentSettings.CLASS_NAME] == old_planner_class_name:
-                # The Java EnvironmentSettings API does not support creating table environment with
-                # old planner. Create it from other API.
-                j_tenv = gateway.jvm.BatchTableEnvironment.create(
-                    ExecutionEnvironment.get_execution_environment()._j_execution_environment)
-            else:
-                j_tenv = gateway.jvm.TableEnvironment.create(
-                    environment_settings._j_environment_settings)
-        else:
-            if table_config is None:
-                j_tenv = gateway.jvm.BatchTableEnvironment.create(
-                    execution_environment._j_execution_environment)
-            else:
-                j_tenv = gateway.jvm.BatchTableEnvironment.create(
-                    execution_environment._j_execution_environment,
-                    table_config._j_table_config)
-        return BatchTableEnvironment(j_tenv)
diff --git a/flink-python/pyflink/table/tests/test_calc.py b/flink-python/pyflink/table/tests/test_calc.py
index c51974f..f09b1fa 100644
--- a/flink-python/pyflink/table/tests/test_calc.py
+++ b/flink-python/pyflink/table/tests/test_calc.py
@@ -21,7 +21,7 @@ import datetime
 from decimal import Decimal
 
 from pyflink.common import Row
-from pyflink.table import DataTypes, BatchTableEnvironment, EnvironmentSettings
+from pyflink.table import DataTypes
 from pyflink.table.expressions import row
 from pyflink.table.tests.test_types import PythonOnlyPoint, PythonOnlyUDT
 from pyflink.testing import source_sink_utils
@@ -122,50 +122,6 @@ class StreamTableCalcTests(PyFlinkBlinkStreamTableTestCase):
         expected = ['+I[1, abc, 2.0]', '+I[2, def, 3.0]']
         self.assert_equals(actual, expected)
 
-    def test_blink_from_element(self):
-        t_env = BatchTableEnvironment.create(environment_settings=EnvironmentSettings
-                                             .new_instance().use_blink_planner()
-                                             .in_batch_mode().build())
-        field_names = ["a", "b", "c", "d", "e", "f", "g", "h",
-                       "i", "j", "k", "l", "m", "n", "o", "p", "q"]
-        field_types = [DataTypes.BIGINT(), DataTypes.DOUBLE(), DataTypes.STRING(),
-                       DataTypes.STRING(), DataTypes.DATE(),
-                       DataTypes.TIME(),
-                       DataTypes.TIMESTAMP(3),
-                       DataTypes.INTERVAL(DataTypes.SECOND(3)),
-                       DataTypes.ARRAY(DataTypes.DOUBLE()),
-                       DataTypes.ARRAY(DataTypes.DOUBLE(False)),
-                       DataTypes.ARRAY(DataTypes.STRING()),
-                       DataTypes.ARRAY(DataTypes.DATE()),
-                       DataTypes.DECIMAL(38, 18),
-                       DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT()),
-                                      DataTypes.FIELD("b", DataTypes.DOUBLE())]),
-                       DataTypes.MAP(DataTypes.STRING(), DataTypes.DOUBLE()),
-                       DataTypes.BYTES(),
-                       PythonOnlyUDT()]
-        schema = DataTypes.ROW(
-            list(map(lambda field_name, field_type: DataTypes.FIELD(field_name, field_type),
-                     field_names,
-                     field_types)))
-        table_sink = source_sink_utils.TestAppendSink(field_names, field_types)
-        t_env.register_table_sink("Results", table_sink)
-        t = t_env.from_elements(
-            [(1, 1.0, "hi", "hello", datetime.date(1970, 1, 2), datetime.time(1, 0, 0),
-              datetime.datetime(1970, 1, 2, 0, 0),
-              datetime.timedelta(days=1, microseconds=10),
-              [1.0, None], array.array("d", [1.0, 2.0]),
-              ["abc"], [datetime.date(1970, 1, 2)], Decimal(1), Row("a", "b")(1, 2.0),
-              {"key": 1.0}, bytearray(b'ABCD'),
-              PythonOnlyPoint(3.0, 4.0))],
-            schema)
-        t.execute_insert("Results").wait()
-        actual = source_sink_utils.results()
-
-        expected = ['+I[1, 1.0, hi, hello, 1970-01-02, 01:00:00, 1970-01-02 00:00:00.0, '
-                    '86400000, [1.0, null], [1.0, 2.0], [abc], [1970-01-02], '
-                    '1.000000000000000000, +I[1, 2.0], {key=1.0}, [65, 66, 67, 68], [3.0, 4.0]]']
-        self.assert_equals(actual, expected)
-
 
 if __name__ == '__main__':
     import unittest
diff --git a/flink-python/pyflink/table/tests/test_dependency.py b/flink-python/pyflink/table/tests/test_dependency.py
index b3a0bca..7567b93 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -27,9 +27,7 @@ from pyflink.table import expressions as expr
 from pyflink.table.udf import udf
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import (PyFlinkBlinkStreamTableTestCase,
-                                             PyFlinkBlinkBatchTableTestCase,
-                                             PyFlinkOldStreamTableTestCase,
-                                             PyFlinkOldBatchTableTestCase)
+                                             PyFlinkBlinkBatchTableTestCase)
 
 
 class DependencyTests(object):
@@ -67,35 +65,6 @@ class DependencyTests(object):
         self.assert_equals(actual, ["+I[3, 1]", "+I[4, 2]", "+I[5, 3]"])
 
 
-class FlinkStreamDependencyTests(DependencyTests, PyFlinkOldStreamTableTestCase):
-
-    pass
-
-
-class FlinkBatchDependencyTests(PyFlinkOldBatchTableTestCase):
-
-    def test_add_python_file(self):
-        python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
-        os.mkdir(python_file_dir)
-        python_file_path = os.path.join(python_file_dir, "test_dependency_manage_lib.py")
-        with open(python_file_path, 'w') as f:
-            f.write("def add_two(a):\n    return a + 2")
-        self.t_env.add_python_file(python_file_path)
-
-        def plus_two(i):
-            from test_dependency_manage_lib import add_two
-            return add_two(i)
-
-        self.t_env.create_temporary_system_function(
-            "add_two", udf(plus_two, DataTypes.BIGINT(), DataTypes.BIGINT()))
-
-        t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
-        t = t.select(expr.call('add_two', t.a), t.a)
-
-        result = self.collect(t)
-        self.assertEqual(result, ["+I[3, 1]", "+I[4, 2]", "+I[5, 3]"])
-
-
 class BlinkBatchDependencyTests(DependencyTests, PyFlinkBlinkBatchTableTestCase):
 
     pass
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py
index 92bada3..e15bf7f 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -25,9 +25,7 @@ from pyflink.table.descriptors import (FileSystem, OldCsv, Rowtime, Schema, Kafk
                                        CustomFormatDescriptor)
 from pyflink.table.table_schema import TableSchema
 from pyflink.table.types import DataTypes
-from pyflink.testing.test_case_utils import (PyFlinkTestCase, PyFlinkOldStreamTableTestCase,
-                                             PyFlinkOldBatchTableTestCase,
-                                             _load_specific_flink_module_jars)
+from pyflink.testing.test_case_utils import (PyFlinkTestCase, _load_specific_flink_module_jars)
 
 
 class FileSystemDescriptorTests(PyFlinkTestCase):
@@ -1080,58 +1078,6 @@ class AbstractTableDescriptorTests(object):
             assert lines == '2,Hi,Hello\n' + "3,Hello,Hello\n"
 
 
-class StreamTableDescriptorTests(PyFlinkOldStreamTableTestCase, AbstractTableDescriptorTests):
-
-    def test_in_append_mode(self):
-        descriptor = self.t_env.connect(FileSystem())
-
-        descriptor = descriptor\
-            .with_format(OldCsv())\
-            .in_append_mode()
-
-        properties = descriptor.to_properties()
-        expected = {'update-mode': 'append',
-                    'format.type': 'csv',
-                    'format.property-version': '1',
-                    'connector.property-version': '1',
-                    'connector.type': 'filesystem'}
-        assert properties == expected
-
-    def test_in_retract_mode(self):
-        descriptor = self.t_env.connect(FileSystem())
-
-        descriptor = descriptor \
-            .with_format(OldCsv()) \
-            .in_retract_mode()
-
-        properties = descriptor.to_properties()
-        expected = {'update-mode': 'retract',
-                    'format.type': 'csv',
-                    'format.property-version': '1',
-                    'connector.property-version': '1',
-                    'connector.type': 'filesystem'}
-        assert properties == expected
-
-    def test_in_upsert_mode(self):
-        descriptor = self.t_env.connect(FileSystem())
-
-        descriptor = descriptor \
-            .with_format(OldCsv()) \
-            .in_upsert_mode()
-
-        properties = descriptor.to_properties()
-        expected = {'update-mode': 'upsert',
-                    'format.type': 'csv',
-                    'format.property-version': '1',
-                    'connector.property-version': '1',
-                    'connector.type': 'filesystem'}
-        assert properties == expected
-
-
-class BatchTableDescriptorTests(PyFlinkOldBatchTableTestCase, AbstractTableDescriptorTests):
-    pass
-
-
 if __name__ == '__main__':
     import unittest
 
diff --git a/flink-python/pyflink/table/tests/test_pandas_conversion.py b/flink-python/pyflink/table/tests/test_pandas_conversion.py
index 4f20e24..c9c0986 100644
--- a/flink-python/pyflink/table/tests/test_pandas_conversion.py
+++ b/flink-python/pyflink/table/tests/test_pandas_conversion.py
@@ -24,7 +24,7 @@ from pyflink.common import Row
 from pyflink.table.types import DataTypes
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase, \
-    PyFlinkBlinkStreamTableTestCase, PyFlinkOldStreamTableTestCase
+    PyFlinkBlinkStreamTableTestCase
 
 
 class PandasConversionTestBase(object):
@@ -172,11 +172,6 @@ class PandasConversionITTests(PandasConversionTestBase):
             self.assertTrue(expected_field == result_field)
 
 
-class StreamPandasConversionTests(PandasConversionITTests,
-                                  PyFlinkOldStreamTableTestCase):
-    pass
-
-
 class BlinkBatchPandasConversionTests(PandasConversionTests,
                                       PandasConversionITTests,
                                       PyFlinkBlinkBatchTableTestCase):
diff --git a/flink-python/pyflink/table/tests/test_pandas_udf.py b/flink-python/pyflink/table/tests/test_pandas_udf.py
index 8d3334d..77061ec 100644
--- a/flink-python/pyflink/table/tests/test_pandas_udf.py
+++ b/flink-python/pyflink/table/tests/test_pandas_udf.py
@@ -24,9 +24,8 @@ from pyflink.table import DataTypes
 from pyflink.table.tests.test_udf import SubtractOne
 from pyflink.table.udf import udf
 from pyflink.testing import source_sink_utils
-from pyflink.testing.test_case_utils import PyFlinkOldStreamTableTestCase, \
-    PyFlinkBlinkBatchTableTestCase, PyFlinkBlinkStreamTableTestCase, PyFlinkOldBatchTableTestCase, \
-    PyFlinkTestCase
+from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase, \
+    PyFlinkBlinkStreamTableTestCase, PyFlinkTestCase
 
 
 class PandasUDFTests(PyFlinkTestCase):
@@ -307,9 +306,6 @@ class PandasUDFITTests(object):
         with self.assertRaisesRegex(Py4JJavaError, expected_regex=msg):
             t.select(result_type_not_series(t.a)).to_pandas()
 
-
-class BlinkPandasUDFITTests(object):
-
     def test_data_types_only_supported_in_blink_planner(self):
         import pandas as pd
 
@@ -342,34 +338,12 @@ class BlinkPandasUDFITTests(object):
         self.assert_equals(actual, ["+I[1970-01-02T00:00:00.123Z]"])
 
 
-class StreamPandasUDFITTests(PandasUDFITTests,
-                             PyFlinkOldStreamTableTestCase):
-    pass
-
-
-class BatchPandasUDFITTests(PyFlinkOldBatchTableTestCase):
-
-    def test_basic_functionality(self):
-        add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT(), func_type="pandas")
-
-        # general Python UDF
-        subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
-
-        t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
-        t = t.where(add_one(t.b) <= 3) \
-            .select(t.a, t.b + 1, add(t.a + 1, subtract_one(t.c)) + 2, add(add_one(t.a), 1))
-        result = self.collect(t)
-        self.assert_equals(result, ["+I[1, 3, 6, 3]", "+I[3, 2, 14, 5]"])
-
-
 class BlinkBatchPandasUDFITTests(PandasUDFITTests,
-                                 BlinkPandasUDFITTests,
                                  PyFlinkBlinkBatchTableTestCase):
     pass
 
 
 class BlinkStreamPandasUDFITTests(PandasUDFITTests,
-                                  BlinkPandasUDFITTests,
                                   PyFlinkBlinkStreamTableTestCase):
     pass
 
diff --git a/flink-python/pyflink/table/tests/test_set_operation.py b/flink-python/pyflink/table/tests/test_set_operation.py
index 51da302..c25ec59 100644
--- a/flink-python/pyflink/table/tests/test_set_operation.py
+++ b/flink-python/pyflink/table/tests/test_set_operation.py
@@ -16,10 +16,10 @@
 # # limitations under the License.
 ################################################################################
 
-from pyflink.testing.test_case_utils import PyFlinkOldBatchTableTestCase
+from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase
 
 
-class StreamTableSetOperationTests(PyFlinkOldBatchTableTestCase):
+class StreamTableSetOperationTests(PyFlinkBlinkBatchTableTestCase):
 
     data1 = [(1, "Hi", "Hello")]
     data2 = [(3, "Hello", "Hello")]
diff --git a/flink-python/pyflink/table/tests/test_shell_example.py b/flink-python/pyflink/table/tests/test_shell_example.py
index 03dd197..60dae68 100644
--- a/flink-python/pyflink/table/tests/test_shell_example.py
+++ b/flink-python/pyflink/table/tests/test_shell_example.py
@@ -23,40 +23,6 @@ class ShellExampleTests(PyFlinkTestCase):
     If these tests failed, please fix these examples code and copy them to shell.py
     """
 
-    def test_batch_case(self):
-        from pyflink.shell import b_env, bt_env, FileSystem, OldCsv, DataTypes, Schema
-        # example begin
-
-        import tempfile
-        import os
-        import shutil
-        sink_path = tempfile.gettempdir() + '/batch.csv'
-        if os.path.exists(sink_path):
-            if os.path.isfile(sink_path):
-                os.remove(sink_path)
-            else:
-                shutil.rmtree(sink_path)
-        b_env.set_parallelism(1)
-        t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
-        bt_env.connect(FileSystem().path(sink_path))\
-            .with_format(OldCsv()
-                         .field_delimiter(',')
-                         .field("a", DataTypes.BIGINT())
-                         .field("b", DataTypes.STRING())
-                         .field("c", DataTypes.STRING()))\
-            .with_schema(Schema()
-                         .field("a", DataTypes.BIGINT())
-                         .field("b", DataTypes.STRING())
-                         .field("c", DataTypes.STRING()))\
-            .create_temporary_table("batch_sink")
-
-        t.select("a + 1, b, c").execute_insert("batch_sink").wait()
-
-        # verify code, do not copy these code to shell.py
-        with open(sink_path, 'r') as f:
-            lines = f.read()
-            self.assertEqual(lines, '2,hi,hello\n' + '3,hi,hello\n')
-
     def test_stream_case(self):
         from pyflink.shell import s_env, st_env, FileSystem, OldCsv, DataTypes, Schema
         # example begin
diff --git a/flink-python/pyflink/table/tests/test_sort.py b/flink-python/pyflink/table/tests/test_sort.py
index d0b787c..3e78689 100644
--- a/flink-python/pyflink/table/tests/test_sort.py
+++ b/flink-python/pyflink/table/tests/test_sort.py
@@ -16,10 +16,10 @@
 # limitations under the License.
 ################################################################################
 
-from pyflink.testing.test_case_utils import PyFlinkOldBatchTableTestCase
+from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase
 
 
-class BatchTableSortTests(PyFlinkOldBatchTableTestCase):
+class BatchTableSortTests(PyFlinkBlinkBatchTableTestCase):
 
     def test_order_by_offset_fetch(self):
         t = self.t_env.from_elements([(1, "Hello")], ["a", "b"])
diff --git a/flink-python/pyflink/table/tests/test_sql.py b/flink-python/pyflink/table/tests/test_sql.py
index 5a102ac..27ebc87 100644
--- a/flink-python/pyflink/table/tests/test_sql.py
+++ b/flink-python/pyflink/table/tests/test_sql.py
@@ -24,7 +24,7 @@ from pyflink.java_gateway import get_gateway
 from pyflink.table import DataTypes, ResultKind
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkBlinkStreamTableTestCase, \
-    PyFlinkOldBatchTableTestCase, PyFlinkTestCase
+    PyFlinkTestCase
 
 
 class StreamSqlTests(PyFlinkBlinkStreamTableTestCase):
@@ -110,16 +110,6 @@ class StreamSqlTests(PyFlinkBlinkStreamTableTestCase):
         self.assert_equals(actual, expected)
 
 
-class BatchSqlTests(PyFlinkOldBatchTableTestCase):
-
-    def test_sql_ddl(self):
-        self.t_env.execute_sql("create temporary function func1 as "
-                               "'pyflink.table.tests.test_udf.add' language python")
-        table = self.t_env.from_elements([(1, 2)]).alias("a, b").select("func1(a, b)")
-        plan = table.explain()
-        self.assertTrue(plan.find("DataSetPythonCalc(select=[add(f0, f1) AS _c0])") >= 0)
-
-
 class JavaSqlTests(PyFlinkTestCase):
     """
     We need to start these Java tests from python process to make sure that Python environment is
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 3774178..27cb89a 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -17,33 +17,25 @@
 ################################################################################
 import datetime
 import decimal
-import glob
 import os
-import pathlib
 import sys
 from py4j.protocol import Py4JJavaError
 from pyflink.table.udf import udf
 
 from pyflink.common import RowKind
 from pyflink.common.typeinfo import Types
-from pyflink.dataset import ExecutionEnvironment
-from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
-from pyflink.find_flink_home import _find_flink_source_root
 from pyflink.java_gateway import get_gateway
 from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment, EnvironmentSettings, \
     Module, ResultKind, ModuleEntry
 from pyflink.table.descriptors import FileSystem, OldCsv, Schema
 from pyflink.table.explain_detail import ExplainDetail
 from pyflink.table.expressions import col
-from pyflink.table.table_config import TableConfig
-from pyflink.table.table_environment import BatchTableEnvironment
 from pyflink.table.types import RowType, Row
 from pyflink.testing import source_sink_utils
-from pyflink.testing.test_case_utils import PyFlinkOldStreamTableTestCase, \
-    PyFlinkOldBatchTableTestCase, PyFlinkBlinkBatchTableTestCase, PyFlinkBlinkStreamTableTestCase, \
-    PyFlinkLegacyBlinkBatchTableTestCase, PyFlinkLegacyFlinkStreamTableTestCase, \
-    PyFlinkLegacyBlinkStreamTableTestCase, _load_specific_flink_module_jars
+from pyflink.testing.test_case_utils import \
+    PyFlinkBlinkBatchTableTestCase, PyFlinkBlinkStreamTableTestCase, \
+    _load_specific_flink_module_jars
 from pyflink.util.java_utils import get_j_env_configuration
 
 
@@ -201,495 +193,6 @@ class TableEnvironmentTest(object):
         self.assert_equals(actual, expected)
 
 
-class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkOldStreamTableTestCase):
-
-    def test_register_table_source_from_path(self):
-        t_env = self.t_env
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        source_path = os.path.join(self.tempdir + '/streaming.csv')
-        csv_source = self.prepare_csv_source(source_path, [], field_types, field_names)
-        t_env.register_table_source("Source", csv_source)
-
-        result = t_env.from_path("Source")
-        self.assertEqual(
-            'CatalogTable: (identifier: [`default_catalog`.`default_database`.`Source`]'
-            ', fields: [a, b, c])',
-            result._j_table.getQueryOperation().asSummaryString())
-
-    def test_register_table_sink(self):
-        t_env = self.t_env
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "Sinks",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-
-        t_env.from_elements([(1, "Hi", "Hello")], ["a", "b", "c"]).execute_insert("Sinks").wait()
-
-        actual = source_sink_utils.results()
-
-        expected = ['+I[1, Hi, Hello]']
-        self.assert_equals(actual, expected)
-
-    def test_from_table_source(self):
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        source_path = os.path.join(self.tempdir + '/streaming.csv')
-        csv_source = self.prepare_csv_source(source_path, [], field_types, field_names)
-
-        result = self.t_env.from_table_source(csv_source)
-        self.assertEqual(
-            'TableSource: (fields: [a, b, c])',
-            result._j_table.getQueryOperation().asSummaryString())
-
-    def test_list_tables(self):
-        source_path = os.path.join(self.tempdir + '/streaming.csv')
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
-        data = []
-        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
-        t_env = self.t_env
-        t_env.register_table_source("Orders", csv_source)
-        t_env.register_table_sink(
-            "Sinks",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-        t_env.register_table_sink(
-            "Results",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-
-        actual = t_env.list_tables()
-
-        expected = ['Orders', 'Results', 'Sinks']
-        self.assert_equals(actual, expected)
-
-    def test_temporary_views(self):
-        t_env = self.t_env
-        t_env.create_temporary_view(
-            "temporary_view_1",
-            t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c']))
-        t_env.create_temporary_view(
-            "temporary_view_2",
-            t_env.from_elements([(1, 'Hi')], ['a', 'b']))
-
-        actual = t_env.list_temporary_views()
-        expected = ['temporary_view_1', 'temporary_view_2']
-        self.assert_equals(actual, expected)
-
-        t_env.drop_temporary_view("temporary_view_1")
-        actual = t_env.list_temporary_views()
-        expected = ['temporary_view_2']
-        self.assert_equals(actual, expected)
-
-    def test_from_path(self):
-        t_env = self.t_env
-        t_env.create_temporary_view(
-            "temporary_view_1",
-            t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c']))
-        result = t_env.from_path("temporary_view_1")
-        self.assertEqual(
-            'CatalogTable: (identifier: [`default_catalog`.`default_database`.`temporary_view_1`]'
-            ', fields: [a, b, c])',
-            result._j_table.getQueryOperation().asSummaryString())
-
-    def test_insert_into(self):
-        t_env = self.t_env
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "Sinks",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-
-        t_env.from_elements([(1, "Hi", "Hello")], ["a", "b", "c"]).execute_insert("Sinks").wait()
-
-        actual = source_sink_utils.results()
-        expected = ['+I[1, Hi, Hello]']
-        self.assert_equals(actual, expected)
-
-    def test_statement_set(self):
-        t_env = self.t_env
-        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "sink1",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-        t_env.register_table_sink(
-            "sink2",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-
-        stmt_set = t_env.create_statement_set()
-
-        stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source) \
-            .add_insert("sink2", source.filter("a < 100"), False)
-
-        actual = stmt_set.explain(ExplainDetail.CHANGELOG_MODE)
-        assert isinstance(actual, str)
-
-    def test_explain_with_multi_sinks(self):
-        t_env = self.t_env
-        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "sink1",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-        t_env.register_table_sink(
-            "sink2",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-
-        stmt_set = t_env.create_statement_set()
-        stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source)
-        stmt_set.add_insert_sql("insert into sink2 select * from %s where a < 100" % source)
-
-        actual = stmt_set.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE)
-        assert isinstance(actual, str)
-
-    def test_explain_sql_without_explain_detail(self):
-        t_env = self.t_env
-        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "sinks",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-
-        result = t_env.explain_sql("select a + 1, b, c from %s" % source)
-
-        assert isinstance(result, str)
-
-    def test_explain_sql_with_explain_detail(self):
-        t_env = self.t_env
-        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "sinks",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-
-        result = t_env.explain_sql(
-            "select a + 1, b, c from %s" % source, ExplainDetail.CHANGELOG_MODE)
-
-        assert isinstance(result, str)
-
-    def test_create_table_environment(self):
-        table_config = TableConfig()
-        table_config.set_max_generated_code_length(32000)
-        table_config.set_null_check(False)
-        table_config.set_local_timezone("Asia/Shanghai")
-
-        env = StreamExecutionEnvironment.get_execution_environment()
-        t_env = StreamTableEnvironment.create(env, table_config)
-
-        readed_table_config = t_env.get_config()
-
-        self.assertFalse(readed_table_config.get_null_check())
-        self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000)
-        self.assertEqual(readed_table_config.get_local_timezone(), "Asia/Shanghai")
-
-    def test_create_table_environment_with_blink_planner(self):
-        t_env = StreamTableEnvironment.create(
-            StreamExecutionEnvironment.get_execution_environment(),
-            environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
-
-        planner = t_env._j_tenv.getPlanner()
-
-        self.assertEqual(
-            planner.getClass().getName(),
-            "org.apache.flink.table.planner.delegation.StreamPlanner")
-
-        t_env = StreamTableEnvironment.create(
-            environment_settings=EnvironmentSettings.new_instance().build())
-
-        planner = t_env._j_tenv.getPlanner()
-
-        self.assertEqual(
-            planner.getClass().getName(),
-            "org.apache.flink.table.planner.delegation.StreamPlanner")
-
-        t_env = StreamTableEnvironment.create(
-            environment_settings=EnvironmentSettings.new_instance().use_old_planner().build())
-
-        planner = t_env._j_tenv.getPlanner()
-
-        self.assertEqual(
-            planner.getClass().getName(),
-            "org.apache.flink.table.planner.StreamPlanner")
-
-    def test_table_environment_with_blink_planner(self):
-        env = StreamExecutionEnvironment.get_execution_environment()
-        env.set_parallelism(1)
-        t_env = StreamTableEnvironment.create(
-            env,
-            environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
-
-        source_path = os.path.join(self.tempdir + '/streaming.csv')
-        sink_path = os.path.join(self.tempdir + '/result.csv')
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
-        data = [(1, 'hi', 'hello'), (2, 'hello', 'hello')]
-        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
-
-        t_env.register_table_source("source", csv_source)
-
-        t_env.register_table_sink(
-            "sink",
-            CsvTableSink(field_names, field_types, sink_path))
-        source = t_env.from_path("source")
-
-        result = source.alias("a, b, c").select("1 + a, b, c")
-
-        result.execute_insert("sink").wait()
-
-        results = []
-        with open(sink_path, 'r') as f:
-            results.append(f.readline())
-            results.append(f.readline())
-
-        self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
-
-    def test_collect_null_value_result(self):
-        element_data = [(1, None, 'a'),
-                        (3, 4, 'b'),
-                        (5, None, 'a'),
-                        (7, 8, 'b')]
-        source = self.t_env.from_elements(element_data,
-                                          DataTypes.ROW([DataTypes.FIELD('a', DataTypes.INT()),
-                                                         DataTypes.FIELD('b', DataTypes.INT()),
-                                                         DataTypes.FIELD('c', DataTypes.STRING())]))
-        table_result = source.execute()
-        expected_result = [Row(1, None, 'a'), Row(3, 4, 'b'), Row(5, None, 'a'),
-                           Row(7, 8, 'b')]
-        with table_result.collect() as results:
-            collected_result = []
-            for result in results:
-                collected_result.append(result)
-            self.assertEqual(collected_result, expected_result)
-
-    def test_set_jars(self):
-        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_t_env)
-
-    def test_set_jars_with_execute_sql(self):
-        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_execute_sql)
-
-    def test_set_jars_with_statement_set(self):
-        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_statement_set)
-
-    def test_set_jars_with_table(self):
-        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_table)
-
-    def test_set_jars_with_table_execute_insert(self):
-        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_table_execute_insert)
-
-    def test_set_jars_with_table_to_pandas(self):
-        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_table_to_pandas)
-
-    def test_set_classpaths(self):
-        self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_t_env)
-
-    def test_set_classpaths_with_execute_sql(self):
-        self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_execute_sql)
-
-    def test_set_classpaths_with_statement_set(self):
-        self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_statement_set)
-
-    def test_set_classpaths_with_table(self):
-        self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_table)
-
-    def test_set_classpaths_with_table_execute_insert(self):
-        self.verify_set_java_dependencies(
-            "pipeline.classpaths", self.execute_with_table_execute_insert)
-
-    def test_set_classpaths_with_table_to_pandas(self):
-        self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_table_to_pandas)
-
-    def execute_with_t_env(self, t_env):
-        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
-        source.select("func1(a, b), func2(a, b)").execute_insert("sink").wait()
-        actual = source_sink_utils.results()
-        expected = ['+I[1 and Hi, 1 or Hi]', '+I[2 and Hello, 2 or Hello]']
-        self.assert_equals(actual, expected)
-
-    @staticmethod
-    def execute_with_execute_sql(t_env):
-        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
-        t_env.create_temporary_view("source", source)
-        t_env.execute_sql("select func1(a, b), func2(a, b) from source") \
-            .get_job_client() \
-            .get_job_execution_result() \
-            .result()
-
-    def execute_with_statement_set(self, t_env):
-        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
-        result = source.select("func1(a, b), func2(a, b)")
-        t_env.create_statement_set().add_insert("sink", result).execute() \
-            .get_job_client() \
-            .get_job_execution_result() \
-            .result()
-        actual = source_sink_utils.results()
-        expected = ['+I[1 and Hi, 1 or Hi]', '+I[2 and Hello, 2 or Hello]']
-        self.assert_equals(actual, expected)
-
-    @staticmethod
-    def execute_with_table(t_env):
-        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
-        result = source.select("func1(a, b), func2(a, b)")
-        result.execute() \
-            .get_job_client() \
-            .get_job_execution_result() \
-            .result()
-
-    def execute_with_table_execute_insert(self, t_env):
-        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
-        result = source.select("func1(a, b), func2(a, b)")
-        result.execute_insert("sink").wait()
-        actual = source_sink_utils.results()
-        expected = ['+I[1 and Hi, 1 or Hi]', '+I[2 and Hello, 2 or Hello]']
-        expected = ['+I[1 and Hi, 1 or Hi]', '+I[2 and Hello, 2 or Hello]']
-        self.assert_equals(actual, expected)
-
-    @staticmethod
-    def execute_with_table_to_pandas(t_env):
-        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
-        result = source.select("func1(a, b), func2(a, b)")
-        result.to_pandas()
-
-    def verify_set_java_dependencies(self, config_key, executor):
-        original_class_loader = \
-            get_gateway().jvm.Thread.currentThread().getContextClassLoader()
-        try:
-            jar_urls = []
-            func1_class_name = "org.apache.flink.python.util.TestScalarFunction1"
-            func2_class_name = "org.apache.flink.python.util.TestScalarFunction2"
-            func1_jar_pattern = "flink-python/target/artifacts/testUdf1.jar"
-            func2_jar_pattern = "flink-python/target/artifacts/testUdf2.jar"
-            self.ensure_jar_not_loaded(func1_class_name, func1_jar_pattern)
-            self.ensure_jar_not_loaded(func2_class_name, func2_jar_pattern)
-            jar_urls.extend(self.get_jar_url(func1_jar_pattern))
-            jar_urls.extend(self.get_jar_url(func2_jar_pattern))
-
-            # test set the "pipeline.jars" multiple times
-            self.t_env.get_config().get_configuration().set_string(config_key, ";".join(jar_urls))
-            first_class_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader()
-
-            self.t_env.get_config().get_configuration().set_string(config_key, jar_urls[0])
-            self.t_env.get_config().get_configuration().set_string(config_key, ";".join(jar_urls))
-            second_class_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader()
-
-            self.assertEqual(first_class_loader, second_class_loader)
-
-            self.t_env.register_java_function("func1", func1_class_name)
-            self.t_env.register_java_function("func2", func2_class_name)
-            table_sink = source_sink_utils.TestAppendSink(
-                ["a", "b"], [DataTypes.STRING(), DataTypes.STRING()])
-            self.t_env.register_table_sink("sink", table_sink)
-
-            executor(self.t_env)
-        finally:
-            get_gateway().jvm.Thread.currentThread().setContextClassLoader(original_class_loader)
-
-    def ensure_jar_not_loaded(self, func_class_name, jar_filename_pattern):
-        test_jars = glob.glob(os.path.join(_find_flink_source_root(), jar_filename_pattern))
-        if not test_jars:
-            self.fail("'%s' is not available. Please compile the test jars first."
-                      % jar_filename_pattern)
-        try:
-            self.t_env.register_java_function("func", func_class_name)
-        except Py4JJavaError:
-            pass
-        else:
-            self.fail("The scalar function '%s' should not be able to be loaded. Please remove "
-                      "the '%s' from the classpath of the PythonGatewayServer process." %
-                      (func_class_name, jar_filename_pattern))
-
-    @staticmethod
-    def get_jar_url(jar_filename_pattern):
-        test_jars = glob.glob(os.path.join(_find_flink_source_root(), jar_filename_pattern))
-        return [pathlib.Path(jar_path).as_uri() for jar_path in test_jars]
-
-    def test_collect_for_all_data_types(self):
-        expected_result = [Row(1, None, 1, True, 32767, -2147483648, 1.23,
-                               1.98932, bytearray(b'pyflink'), 'pyflink',
-                               datetime.date(2014, 9, 13), datetime.time(12, 0),
-                               datetime.datetime(2018, 3, 11, 3, 0, 0, 123000),
-                               [Row(['[pyflink]']), Row(['[pyflink]']),
-                                Row(['[pyflink]'])], {1: Row(['[flink]']), 2: Row(['[pyflink]'])},
-                               decimal.Decimal('1000000000000000000.05'),
-                               decimal.Decimal(
-                                   '1000000000000000000.05999999999999999899999999999'))]
-        source = self.t_env.from_elements([(1, None, 1, True, 32767, -2147483648, 1.23, 1.98932,
-                                            bytearray(b'pyflink'), 'pyflink',
-                                            datetime.date(2014, 9, 13),
-                                            datetime.time(hour=12, minute=0, second=0,
-                                                          microsecond=123000),
-                                            datetime.datetime(2018, 3, 11, 3, 0, 0, 123000),
-                                            [Row(['pyflink']), Row(['pyflink']), Row(['pyflink'])],
-                                            {1: Row(['flink']), 2: Row(['pyflink'])},
-                                            decimal.Decimal('1000000000000000000.05'),
-                                            decimal.Decimal(
-                                                '1000000000000000000.0599999999999999989'
-                                                '9999999999'))],
-                                          DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT()),
-                                                         DataTypes.FIELD("b", DataTypes.BIGINT()),
-                                                         DataTypes.FIELD("c", DataTypes.TINYINT()),
-                                                         DataTypes.FIELD("d", DataTypes.BOOLEAN()),
-                                                         DataTypes.FIELD("e", DataTypes.SMALLINT()),
-                                                         DataTypes.FIELD("f", DataTypes.INT()),
-                                                         DataTypes.FIELD("g", DataTypes.FLOAT()),
-                                                         DataTypes.FIELD("h", DataTypes.DOUBLE()),
-                                                         DataTypes.FIELD("i", DataTypes.BYTES()),
-                                                         DataTypes.FIELD("j", DataTypes.STRING()),
-                                                         DataTypes.FIELD("k", DataTypes.DATE()),
-                                                         DataTypes.FIELD("l", DataTypes.TIME()),
-                                                         DataTypes.FIELD("m",
-                                                                         DataTypes.TIMESTAMP(3)),
-                                                         DataTypes.FIELD("n", DataTypes.ARRAY(
-                                                             DataTypes.ROW([DataTypes.FIELD('ss2',
-                                                                            DataTypes.STRING())]))),
-                                                         DataTypes.FIELD("o", DataTypes.MAP(
-                                                             DataTypes.BIGINT(), DataTypes.ROW(
-                                                                 [DataTypes.FIELD('ss',
-                                                                  DataTypes.STRING())]))),
-                                                         DataTypes.FIELD("p",
-                                                                         DataTypes.DECIMAL(38, 18)),
-                                                         DataTypes.FIELD("q",
-                                                                         DataTypes.DECIMAL(38,
-                                                                                           18))]))
-        table_result = source.execute()
-        with table_result.collect() as result:
-            collected_result = []
-            for i in result:
-                collected_result.append(i)
-            self.assertEqual(expected_result, collected_result)
-
-    def test_collect_with_retract(self):
-
-        expected_row_kinds = [RowKind.INSERT, RowKind.DELETE, RowKind.INSERT, RowKind.INSERT,
-                              RowKind.DELETE, RowKind.INSERT]
-        element_data = [(1, 2, 'a'),
-                        (3, 4, 'b'),
-                        (5, 6, 'a'),
-                        (7, 8, 'b')]
-        field_names = ['a', 'b', 'c']
-        source = self.t_env.from_elements(element_data, field_names)
-        table_result = self.t_env.execute_sql(
-            "SELECT SUM(a), c FROM %s group by c" % source)
-        with table_result.collect() as result:
-            collected_result = []
-            for i in result:
-                collected_result.append(i)
-
-            collected_result = [str(result) + ',' + str(result.get_row_kind())
-                                for result in collected_result]
-            expected_result = [Row(1, 'a'), Row(1, 'a'), Row(6, 'a'), Row(3, 'b'),
-                               Row(3, 'b'), Row(10, 'b')]
-            for i in range(len(expected_result)):
-                expected_result[i] = str(expected_result[i]) + ',' + str(expected_row_kinds[i])
-            expected_result.sort()
-            collected_result.sort()
-            self.assertEqual(expected_result, collected_result)
-
-
 class DataStreamConversionTestCases(object):
 
     def test_from_data_stream(self):
@@ -755,21 +258,6 @@ class DataStreamConversionTestCases(object):
         self.assertEqual(result, expected)
 
 
-class LegacyBlinkBatchTableEnvironmentTests(TableEnvironmentTest,
-                                            PyFlinkLegacyBlinkBatchTableTestCase):
-    pass
-
-
-class LegacyBlinkStreamTableEnvironmentTests(TableEnvironmentTest, DataStreamConversionTestCases,
-                                             PyFlinkLegacyBlinkStreamTableTestCase):
-    pass
-
-
-class LegacyFlinkStreamTableEnvironmentTests(TableEnvironmentTest, DataStreamConversionTestCases,
-                                             PyFlinkLegacyFlinkStreamTableTestCase):
-    pass
-
-
 class BlinkStreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkBlinkStreamTableTestCase):
 
     def test_collect_with_retract(self):
@@ -841,117 +329,6 @@ class BlinkStreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkBlinkStreamT
             self.assertEqual(expected_result, collected_result)
 
 
-class BatchTableEnvironmentTests(TableEnvironmentTest, PyFlinkOldBatchTableTestCase):
-
-    def test_explain_with_multi_sinks(self):
-        t_env = self.t_env
-        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "sink1",
-            CsvTableSink(field_names, field_types, "path1"))
-        t_env.register_table_sink(
-            "sink2",
-            CsvTableSink(field_names, field_types, "path2"))
-
-        stmt_set = t_env.create_statement_set()
-        stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source)
-        stmt_set.add_insert_sql("insert into sink2 select * from %s where a < 100" % source)
-
-        actual = stmt_set.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE)
-
-        assert isinstance(actual, str)
-
-    def test_statement_set(self):
-        t_env = self.t_env
-        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "sink1",
-            CsvTableSink(field_names, field_types, "path1"))
-        t_env.register_table_sink(
-            "sink2",
-            CsvTableSink(field_names, field_types, "path2"))
-
-        stmt_set = t_env.create_statement_set()
-
-        stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source) \
-            .add_insert("sink2", source.filter("a < 100"))
-
-        actual = stmt_set.explain()
-        assert isinstance(actual, str)
-
-    def test_create_table_environment(self):
-        table_config = TableConfig()
-        table_config.set_max_generated_code_length(32000)
-        table_config.set_null_check(False)
-        table_config.set_local_timezone("Asia/Shanghai")
-
-        env = ExecutionEnvironment.get_execution_environment()
-        t_env = BatchTableEnvironment.create(env, table_config)
-
-        readed_table_config = t_env.get_config()
-
-        self.assertFalse(readed_table_config.get_null_check())
-        self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000)
-        self.assertEqual(readed_table_config.get_local_timezone(), "Asia/Shanghai")
-
-    def test_create_table_environment_with_old_planner(self):
-        t_env = BatchTableEnvironment.create(
-            environment_settings=EnvironmentSettings.new_instance().in_batch_mode()
-            .use_old_planner().build())
-        self.assertEqual(
-            t_env._j_tenv.getClass().getName(),
-            "org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl")
-
-    def test_create_table_environment_with_blink_planner(self):
-        t_env = BatchTableEnvironment.create(
-            environment_settings=EnvironmentSettings.new_instance().in_batch_mode()
-            .use_blink_planner().build())
-
-        planner = t_env._j_tenv.getPlanner()
-
-        self.assertEqual(
-            planner.getClass().getName(),
-            "org.apache.flink.table.planner.delegation.BatchPlanner")
-
-    def test_table_environment_with_blink_planner(self):
-        t_env = BatchTableEnvironment.create(
-            environment_settings=EnvironmentSettings.new_instance().in_batch_mode()
-            .use_blink_planner().build())
-
-        source_path = os.path.join(self.tempdir + '/streaming.csv')
-        sink_path = os.path.join(self.tempdir + '/results')
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
-        data = [(1, 'hi', 'hello'), (2, 'hello', 'hello')]
-        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
-
-        t_env.register_table_source("source", csv_source)
-
-        t_env.register_table_sink(
-            "sink",
-            CsvTableSink(field_names, field_types, sink_path))
-        source = t_env.from_path("source")
-
-        result = source.alias("a, b, c").select("1 + a, b, c")
-
-        result.execute_insert("sink").wait()
-
-        results = []
-        for root, dirs, files in os.walk(sink_path):
-            for sub_file in files:
-                with open(os.path.join(root, sub_file), 'r') as f:
-                    line = f.readline()
-                    while line is not None and line != '':
-                        results.append(line)
-                        line = f.readline()
-
-        self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
-
-
 class BlinkBatchTableEnvironmentTests(PyFlinkBlinkBatchTableTestCase):
 
     def test_explain_with_multi_sinks(self):
diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py
index 19cdc5f..c357554 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -24,9 +24,8 @@ import pytz
 from pyflink.table import DataTypes, expressions as expr
 from pyflink.table.udf import ScalarFunction, udf
 from pyflink.testing import source_sink_utils
-from pyflink.testing.test_case_utils import PyFlinkOldStreamTableTestCase, \
-    PyFlinkBlinkStreamTableTestCase, PyFlinkBlinkBatchTableTestCase, \
-    PyFlinkOldBatchTableTestCase
+from pyflink.testing.test_case_utils import PyFlinkBlinkStreamTableTestCase, \
+    PyFlinkBlinkBatchTableTestCase
 
 
 class UserDefinedFunctionTests(object):
@@ -639,24 +638,6 @@ def float_equal(a, b, rel_tol=1e-09, abs_tol=0.0):
     return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)
 
 
-class PyFlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
-                                            PyFlinkOldStreamTableTestCase):
-    pass
-
-
-class PyFlinkBatchUserDefinedFunctionTests(PyFlinkOldBatchTableTestCase):
-
-    def test_chaining_scalar_function(self):
-        add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
-        subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
-
-        t = self.t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
-        t = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
-
-        result = self.collect(t)
-        self.assertEqual(result, ["+I[3, 1, 1]", "+I[7, 2, 1]", "+I[4, 3, 1]"])
-
-
 class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
                                                  PyFlinkBlinkStreamTableTestCase):
     def test_deterministic(self):
diff --git a/flink-python/pyflink/table/tests/test_udtf.py b/flink-python/pyflink/table/tests/test_udtf.py
index 976b2f6..79d66ad 100644
--- a/flink-python/pyflink/table/tests/test_udtf.py
+++ b/flink-python/pyflink/table/tests/test_udtf.py
@@ -20,8 +20,8 @@ import unittest
 from pyflink.table import DataTypes
 from pyflink.table.udf import TableFunction, udtf, ScalarFunction, udf
 from pyflink.testing import source_sink_utils
-from pyflink.testing.test_case_utils import PyFlinkOldStreamTableTestCase, \
-    PyFlinkBlinkStreamTableTestCase, PyFlinkOldBatchTableTestCase, PyFlinkBlinkBatchTableTestCase
+from pyflink.testing.test_case_utils import PyFlinkBlinkStreamTableTestCase, \
+    PyFlinkBlinkBatchTableTestCase
 
 
 class UserDefinedTableFunctionTests(object):
@@ -71,11 +71,6 @@ class UserDefinedTableFunctionTests(object):
         return source_sink_utils.results()
 
 
-class PyFlinkStreamUserDefinedTableFunctionTests(UserDefinedTableFunctionTests,
-                                                 PyFlinkOldStreamTableTestCase):
-    pass
-
-
 class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedTableFunctionTests,
                                                  PyFlinkBlinkStreamTableTestCase):
     def test_execute_from_json_plan(self):
@@ -134,26 +129,6 @@ class PyFlinkBlinkBatchUserDefinedFunctionTests(UserDefinedTableFunctionTests,
     pass
 
 
-class PyFlinkBatchUserDefinedTableFunctionTests(UserDefinedTableFunctionTests,
-                                                PyFlinkOldBatchTableTestCase):
-    def _register_table_sink(self, field_names: list, field_types: list):
-        pass
-
-    def _get_output(self, t):
-        return self.collect(t)
-
-    def test_row_type_as_input_types_and_result_types(self):
-        # test input_types and result_types are DataTypes.ROW
-        a = udtf(lambda i: i,
-                 input_types=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]),
-                 result_types=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]))
-
-        self.assertEqual(a._input_types,
-                         [DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())])])
-        self.assertEqual(a._result_types,
-                         [DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())])])
-
-
 class MultiEmit(TableFunction, unittest.TestCase):
 
     def open(self, function_context):
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
index e086ca4..dd1d56b 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -30,13 +30,10 @@ from py4j.protocol import Py4JJavaError
 
 from pyflink.common import JobExecutionResult
 from pyflink.datastream.execution_mode import RuntimeExecutionMode
-from pyflink.table import TableConfig
 from pyflink.table.sources import CsvTableSource
-from pyflink.dataset.execution_environment import ExecutionEnvironment
 from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
 from pyflink.find_flink_home import _find_flink_home, _find_flink_source_root
-from pyflink.table.table_environment import BatchTableEnvironment, StreamTableEnvironment, \
-    TableEnvironment
+from pyflink.table.table_environment import TableEnvironment
 from pyflink.table.environment_settings import EnvironmentSettings
 from pyflink.java_gateway import get_gateway
 from pyflink.util.java_utils import add_jars_to_context_class_loader, to_jarray
@@ -146,91 +143,6 @@ class PyFlinkTestCase(unittest.TestCase):
         return CsvTableSource(path, fields, data_types)
 
 
-class PyFlinkLegacyBlinkBatchTableTestCase(PyFlinkTestCase):
-    """
-    Base class for pure Blink Batch TableEnvironment tests.
-    """
-
-    def setUp(self):
-        super(PyFlinkLegacyBlinkBatchTableTestCase, self).setUp()
-        self.t_env = BatchTableEnvironment.create(
-            environment_settings=EnvironmentSettings.new_instance()
-            .in_batch_mode().use_blink_planner().build())
-        self.t_env._j_tenv.getPlanner().getExecEnv().setParallelism(2)
-        self.t_env.get_config().get_configuration().set_string(
-            "python.fn-execution.bundle.size", "1")
-
-
-class PyFlinkLegacyBlinkStreamTableTestCase(PyFlinkTestCase):
-    """
-    Base class for pure Blink Batch TableEnvironment tests.
-    """
-
-    def setUp(self):
-        super(PyFlinkLegacyBlinkStreamTableTestCase, self).setUp()
-        self.env = StreamExecutionEnvironment.get_execution_environment()
-        self.env.set_parallelism(2)
-        self.t_env = StreamTableEnvironment.create(
-            self.env,
-            environment_settings=EnvironmentSettings.new_instance()
-                .in_streaming_mode().use_blink_planner().build())
-        self.t_env.get_config().get_configuration().set_string(
-            "python.fn-execution.bundle.size", "1")
-
-
-class PyFlinkLegacyFlinkStreamTableTestCase(PyFlinkTestCase):
-    """
-    Base class for pure Flink Stream TableEnvironment tests.
-    """
-
-    def setUp(self):
-        super(PyFlinkLegacyFlinkStreamTableTestCase, self).setUp()
-        self.env = StreamExecutionEnvironment.get_execution_environment()
-        self.env.set_parallelism(2)
-        self.t_env = StreamTableEnvironment.create(
-            self.env,
-            environment_settings=EnvironmentSettings.new_instance()
-                .in_streaming_mode().use_old_planner().build())
-        self.t_env.get_config().get_configuration().set_string(
-            "python.fn-execution.bundle.size", "1")
-
-
-class PyFlinkOldStreamTableTestCase(PyFlinkTestCase):
-    """
-    Base class for old planner stream tests.
-    """
-
-    def setUp(self):
-        super(PyFlinkOldStreamTableTestCase, self).setUp()
-        self.t_env = TableEnvironment.create(
-            EnvironmentSettings.new_instance().in_streaming_mode().use_old_planner().build())
-        self.t_env.get_config().get_configuration().set_string("parallelism.default", "2")
-        self.t_env.get_config().get_configuration().set_string(
-            "python.fn-execution.bundle.size", "1")
-
-
-class PyFlinkOldBatchTableTestCase(PyFlinkTestCase):
-    """
-    Base class for batch tests.
-    """
-
-    def setUp(self):
-        super(PyFlinkOldBatchTableTestCase, self).setUp()
-        self.env = ExecutionEnvironment.get_execution_environment()
-        self.env.set_parallelism(2)
-        self.t_env = BatchTableEnvironment.create(self.env, TableConfig())
-        self.t_env.get_config().get_configuration().set_string(
-            "python.fn-execution.bundle.size", "1")
-
-    def collect(self, table):
-        j_table = table._j_table
-        gateway = get_gateway()
-        row_result = self.t_env._j_tenv\
-            .toDataSet(j_table, gateway.jvm.Class.forName("org.apache.flink.types.Row")).collect()
-        string_result = [java_row.toString() for java_row in row_result]
-        return string_result
-
-
 class PyFlinkBlinkStreamTableTestCase(PyFlinkTestCase):
     """
     Base class for stream tests of blink planner.
diff --git a/flink-python/setup.py b/flink-python/setup.py
index efa6ffa..94b2019 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -269,7 +269,6 @@ try:
                 'pyflink.table',
                 'pyflink.util',
                 'pyflink.datastream',
-                'pyflink.dataset',
                 'pyflink.common',
                 'pyflink.fn_execution',
                 'pyflink.fn_execution.beam',
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
index f381bf3..1264247 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
@@ -19,22 +19,19 @@
 package org.apache.flink.table.runtime.arrow;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
-import org.apache.flink.table.api.internal.BatchTableEnvImpl;
-import org.apache.flink.table.api.internal.TableEnvImpl;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.api.internal.TableImpl;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.operations.OutputConversionModifyOperation;
-import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.runtime.arrow.readers.ArrayFieldReader;
 import org.apache.flink.table.runtime.arrow.readers.ArrowFieldReader;
 import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader;
@@ -664,7 +661,6 @@ public final class ArrowUtils {
         ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(root, null, baos);
         arrowStreamWriter.start();
 
-        ArrowWriter arrowWriter;
         Iterator<Row> results = table.execute().collect();
         Iterator<Row> appendOnlyResults;
         if (isAppendOnlyTable(table)) {
@@ -673,28 +669,21 @@ public final class ArrowUtils {
             appendOnlyResults = filterOutRetractRows(results);
         }
 
-        Iterator convertedResults;
-        if (isBlinkPlanner(table)) {
-            arrowWriter = createRowDataArrowWriter(root, rowType);
-            convertedResults =
-                    new Iterator<RowData>() {
-                        @Override
-                        public boolean hasNext() {
-                            return appendOnlyResults.hasNext();
-                        }
-
-                        @Override
-                        public RowData next() {
-                            DataFormatConverters.DataFormatConverter converter =
-                                    DataFormatConverters.getConverterForDataType(
-                                            defaultRowDataType);
-                            return (RowData) converter.toInternal(appendOnlyResults.next());
-                        }
-                    };
-        } else {
-            arrowWriter = createRowArrowWriter(root, rowType);
-            convertedResults = appendOnlyResults;
-        }
+        ArrowWriter arrowWriter = createRowDataArrowWriter(root, rowType);
+        Iterator convertedResults =
+                new Iterator<RowData>() {
+                    @Override
+                    public boolean hasNext() {
+                        return appendOnlyResults.hasNext();
+                    }
+
+                    @Override
+                    public RowData next() {
+                        DataFormatConverters.DataFormatConverter converter =
+                                DataFormatConverters.getConverterForDataType(defaultRowDataType);
+                        return (RowData) converter.toInternal(appendOnlyResults.next());
+                    }
+                };
 
         return new CustomIterator<byte[]>() {
             @Override
@@ -750,39 +739,22 @@ public final class ArrowUtils {
         return result.iterator();
     }
 
-    private static boolean isBlinkPlanner(Table table) {
+    private static boolean isStreamingMode(Table table) {
         TableEnvironment tableEnv = ((TableImpl) table).getTableEnvironment();
-        if (tableEnv instanceof TableEnvImpl) {
-            return false;
-        } else if (tableEnv instanceof TableEnvironmentImpl) {
-            Planner planner = ((TableEnvironmentImpl) tableEnv).getPlanner();
-            return planner instanceof PlannerBase;
+        if (tableEnv instanceof TableEnvironmentImpl) {
+            final RuntimeExecutionMode mode =
+                    tableEnv.getConfig().getConfiguration().get(ExecutionOptions.RUNTIME_MODE);
+            if (mode == RuntimeExecutionMode.AUTOMATIC) {
+                throw new RuntimeException(
+                        String.format("Runtime execution mode '%s' is not supported yet.", mode));
+            }
+            return mode == RuntimeExecutionMode.STREAMING;
         } else {
-            throw new RuntimeException(
-                    String.format(
-                            "Could not determine the planner type for table environment class %s.",
-                            tableEnv.getClass()));
-        }
-    }
-
-    private static boolean isStreamingMode(Table table) throws Exception {
-        TableEnvironment tableEnv = ((TableImpl) table).getTableEnvironment();
-        if (tableEnv instanceof BatchTableEnvironment || tableEnv instanceof BatchTableEnvImpl) {
             return false;
-        } else if (tableEnv instanceof TableEnvironmentImpl) {
-            java.lang.reflect.Field isStreamingModeMethod =
-                    TableEnvironmentImpl.class.getDeclaredField("isStreamingMode");
-            isStreamingModeMethod.setAccessible(true);
-            return (boolean) isStreamingModeMethod.get(tableEnv);
-        } else {
-            throw new RuntimeException(
-                    String.format(
-                            "Could not determine the streaming mode for table environment class %s",
-                            tableEnv.getClass()));
         }
     }
 
-    private static boolean isAppendOnlyTable(Table table) throws Exception {
+    private static boolean isAppendOnlyTable(Table table) {
         if (isStreamingMode(table)) {
             TableEnvironmentImpl tableEnv =
                     (TableEnvironmentImpl) ((TableImpl) table).getTableEnvironment();
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonScalarFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonScalarFunctionFlatMap.java
deleted file mode 100644
index a83034d..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonScalarFunctionFlatMap.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.python;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonEnv;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Arrays;
-
-/**
- * The abstract base {@link RichFlatMapFunction} used to invoke Python {@link ScalarFunction}
- * functions for the old planner.
- */
-@Internal
-public abstract class AbstractPythonScalarFunctionFlatMap
-        extends AbstractPythonStatelessFunctionFlatMap {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final String SCALAR_FUNCTION_URN = "flink:transform:scalar_function:v1";
-
-    /** The Python {@link ScalarFunction}s to be executed. */
-    public final PythonFunctionInfo[] scalarFunctions;
-
-    /** The offset of the fields which should be forwarded. */
-    private final int[] forwardedFields;
-
-    public AbstractPythonScalarFunctionFlatMap(
-            Configuration config,
-            PythonFunctionInfo[] scalarFunctions,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            int[] forwardedFields) {
-        super(config, inputType, outputType, udfInputOffsets);
-        this.scalarFunctions = Preconditions.checkNotNull(scalarFunctions);
-        this.forwardedFields = Preconditions.checkNotNull(forwardedFields);
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-
-        RowTypeInfo forwardedInputTypeInfo =
-                new RowTypeInfo(
-                        Arrays.stream(forwardedFields)
-                                .mapToObj(i -> inputType.getFields().get(i))
-                                .map(RowType.RowField::getType)
-                                .map(TypeConversions::fromLogicalToDataType)
-                                .map(TypeConversions::fromDataTypeToLegacyInfo)
-                                .toArray(TypeInformation[]::new));
-        forwardedInputSerializer =
-                forwardedInputTypeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
-    }
-
-    @Override
-    public PythonEnv getPythonEnv() {
-        return scalarFunctions[0].getPythonFunction().getPythonEnv();
-    }
-
-    @Override
-    public void bufferInput(Row input) {
-        Row forwardedFieldsRow = Row.project(input, forwardedFields);
-        if (getRuntimeContext().getExecutionConfig().isObjectReuseEnabled()) {
-            forwardedFieldsRow = forwardedInputSerializer.copy(forwardedFieldsRow);
-        }
-        forwardedInputQueue.add(forwardedFieldsRow);
-    }
-
-    @Override
-    public int getForwardedFieldsCount() {
-        return forwardedFields.length;
-    }
-
-    @Override
-    public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() {
-        FlinkFnApi.UserDefinedFunctions.Builder builder =
-                FlinkFnApi.UserDefinedFunctions.newBuilder();
-        // add udf proto
-        for (PythonFunctionInfo pythonFunctionInfo : scalarFunctions) {
-            builder.addUdfs(PythonOperatorUtils.getUserDefinedFunctionProto(pythonFunctionInfo));
-        }
-        builder.setMetricEnabled(getPythonConfig().isMetricEnabled());
-        return builder.build();
-    }
-
-    @Override
-    public String getFunctionUrn() {
-        return SCALAR_FUNCTION_URN;
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
deleted file mode 100644
index 62ca509..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.python;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.PythonConfig;
-import org.apache.flink.python.PythonFunctionRunner;
-import org.apache.flink.python.PythonOptions;
-import org.apache.flink.python.env.PythonDependencyInfo;
-import org.apache.flink.python.env.PythonEnvironmentManager;
-import org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
-import org.apache.flink.table.functions.python.PythonEnv;
-import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
-import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
-
-/**
- * Base Python stateless {@link RichFlatMapFunction} used to invoke Python stateless functions for
- * the old planner.
- */
-@Internal
-public abstract class AbstractPythonStatelessFunctionFlatMap extends RichFlatMapFunction<Row, Row>
-        implements ResultTypeQueryable<Row> {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOG =
-            LoggerFactory.getLogger(AbstractPythonStatelessFunctionFlatMap.class);
-
-    /** The python config. */
-    private final PythonConfig config;
-
-    /** The offsets of user-defined function inputs. */
-    private final int[] userDefinedFunctionInputOffsets;
-
-    /** The input logical type. */
-    protected final RowType inputType;
-
-    /** The output logical type. */
-    protected final RowType outputType;
-
-    /** The options used to configure the Python worker process. */
-    protected final Map<String, String> jobOptions;
-
-    /** The user-defined function input logical type. */
-    protected transient RowType userDefinedFunctionInputType;
-
-    /** The user-defined function output logical type. */
-    protected transient RowType userDefinedFunctionOutputType;
-
-    /**
-     * The queue holding the input elements for which the execution results have not been received.
-     */
-    protected transient LinkedBlockingQueue<Row> forwardedInputQueue;
-
-    /** Max number of elements to include in a bundle. */
-    private transient int maxBundleSize;
-
-    /** Number of processed elements in the current bundle. */
-    private transient int elementCount;
-
-    /** OutputStream Wrapper. */
-    transient DataOutputViewStreamWrapper baosWrapper;
-
-    /** The collector used to collect records. */
-    protected transient Collector<Row> resultCollector;
-
-    /**
-     * The {@link PythonFunctionRunner} which is responsible for Python user-defined function
-     * execution.
-     */
-    protected transient PythonFunctionRunner pythonFunctionRunner;
-
-    /** Reusable InputStream used to holding the execution results to be deserialized. */
-    protected transient ByteArrayInputStreamWithPos bais;
-
-    /** InputStream Wrapper. */
-    protected transient DataInputViewStreamWrapper baisWrapper;
-
-    /** The type serializer for the forwarded fields. */
-    protected transient TypeSerializer<Row> forwardedInputSerializer;
-
-    /** Reusable OutputStream used to holding the serialized input elements. */
-    protected transient ByteArrayOutputStreamWithPos baos;
-
-    public AbstractPythonStatelessFunctionFlatMap(
-            Configuration config,
-            RowType inputType,
-            RowType outputType,
-            int[] userDefinedFunctionInputOffsets) {
-        this.inputType = Preconditions.checkNotNull(inputType);
-        this.outputType = Preconditions.checkNotNull(outputType);
-        this.userDefinedFunctionInputOffsets =
-                Preconditions.checkNotNull(userDefinedFunctionInputOffsets);
-        this.config = new PythonConfig(Preconditions.checkNotNull(config));
-        this.jobOptions = buildJobOptions(config);
-    }
-
-    protected PythonConfig getPythonConfig() {
-        return config;
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-
-        this.elementCount = 0;
-        this.maxBundleSize = config.getMaxBundleSize();
-        if (this.maxBundleSize <= 0) {
-            this.maxBundleSize = PythonOptions.MAX_BUNDLE_SIZE.defaultValue();
-            LOG.error(
-                    "Invalid value for the maximum bundle size. Using default value of "
-                            + this.maxBundleSize
-                            + '.');
-        } else {
-            LOG.info("The maximum bundle size is configured to {}.", this.maxBundleSize);
-        }
-
-        if (config.getMaxBundleTimeMills() != PythonOptions.MAX_BUNDLE_TIME_MILLS.defaultValue()) {
-            LOG.info(
-                    "Maximum bundle time takes no effect in old planner under batch mode. "
-                            + "Config maximum bundle size instead! "
-                            + "Under batch mode, bundle size should be enough to control both throughput and latency.");
-        }
-        forwardedInputQueue = new LinkedBlockingQueue<>();
-        userDefinedFunctionInputType =
-                new RowType(
-                        Arrays.stream(userDefinedFunctionInputOffsets)
-                                .mapToObj(i -> inputType.getFields().get(i))
-                                .collect(Collectors.toList()));
-
-        bais = new ByteArrayInputStreamWithPos();
-        baisWrapper = new DataInputViewStreamWrapper(bais);
-
-        baos = new ByteArrayOutputStreamWithPos();
-        baosWrapper = new DataOutputViewStreamWrapper(baos);
-
-        userDefinedFunctionOutputType =
-                new RowType(
-                        outputType
-                                .getFields()
-                                .subList(getForwardedFieldsCount(), outputType.getFieldCount()));
-
-        this.pythonFunctionRunner = createPythonFunctionRunner();
-        this.pythonFunctionRunner.open(config);
-    }
-
-    @Override
-    public void flatMap(Row value, Collector<Row> out) throws Exception {
-        this.resultCollector = out;
-        bufferInput(value);
-        processElementInternal(value);
-        checkInvokeFinishBundleByCount();
-        emitResults();
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public TypeInformation<Row> getProducedType() {
-        return (TypeInformation<Row>)
-                LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
-                        LogicalTypeDataTypeConverter.toDataType(outputType));
-    }
-
-    @Override
-    public void close() throws Exception {
-        try {
-            invokeFinishBundle();
-
-            if (pythonFunctionRunner != null) {
-                pythonFunctionRunner.close();
-                pythonFunctionRunner = null;
-            }
-        } finally {
-            super.close();
-        }
-    }
-
-    /** Returns the {@link PythonEnv} used to create PythonEnvironmentManager.. */
-    public abstract PythonEnv getPythonEnv();
-
-    public abstract void bufferInput(Row input);
-
-    public abstract void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception;
-
-    public abstract int getForwardedFieldsCount();
-
-    /** Gets the proto representation of the Python user-defined functions to be executed. */
-    public abstract FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto();
-
-    public abstract String getInputOutputCoderUrn();
-
-    public abstract String getFunctionUrn();
-
-    public abstract void processElementInternal(Row value) throws Exception;
-
-    /** Checks whether to invoke finishBundle by elements count. Called in flatMap. */
-    protected void checkInvokeFinishBundleByCount() throws Exception {
-        elementCount++;
-        if (elementCount >= maxBundleSize) {
-            invokeFinishBundle();
-        }
-    }
-
-    protected PythonEnvironmentManager createPythonEnvironmentManager() throws IOException {
-        PythonDependencyInfo dependencyInfo =
-                PythonDependencyInfo.create(config, getRuntimeContext().getDistributedCache());
-        PythonEnv pythonEnv = getPythonEnv();
-        if (pythonEnv.getExecType() == PythonEnv.ExecType.PROCESS) {
-            return new ProcessPythonEnvironmentManager(
-                    dependencyInfo,
-                    ConfigurationUtils.splitPaths(System.getProperty("java.io.tmpdir")),
-                    System.getenv());
-        } else {
-            throw new UnsupportedOperationException(
-                    String.format(
-                            "Execution type '%s' is not supported.", pythonEnv.getExecType()));
-        }
-    }
-
-    protected FlinkMetricContainer getFlinkMetricContainer() {
-        return this.config.isMetricEnabled()
-                ? new FlinkMetricContainer(getRuntimeContext().getMetricGroup())
-                : null;
-    }
-
-    protected Row getFunctionInput(Row element) {
-        return Row.project(element, userDefinedFunctionInputOffsets);
-    }
-
-    private void emitResults() throws Exception {
-        Tuple2<byte[], Integer> resultTuple;
-        while ((resultTuple = pythonFunctionRunner.pollResult()) != null) {
-            emitResult(resultTuple);
-        }
-    }
-
-    protected void invokeFinishBundle() throws Exception {
-        if (elementCount > 0) {
-            pythonFunctionRunner.flush();
-            elementCount = 0;
-            emitResults();
-        }
-    }
-
-    private PythonFunctionRunner createPythonFunctionRunner() throws IOException {
-        return new BeamTableStatelessPythonFunctionRunner(
-                getRuntimeContext().getTaskName(),
-                createPythonEnvironmentManager(),
-                userDefinedFunctionInputType,
-                userDefinedFunctionOutputType,
-                getFunctionUrn(),
-                getUserDefinedFunctionsProto(),
-                getInputOutputCoderUrn(),
-                jobOptions,
-                getFlinkMetricContainer(),
-                null,
-                0.0,
-                FlinkFnApi.CoderParam.OutputMode.SINGLE);
-    }
-
-    private Map<String, String> buildJobOptions(Configuration config) {
-        Map<String, String> jobOptions = new HashMap<>();
-        if (config.containsKey("table.exec.timezone")) {
-            jobOptions.put("table.exec.timezone", config.getString("table.exec.timezone", null));
-        }
-        return jobOptions;
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java
deleted file mode 100644
index e1fc082..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.python;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-
-/**
- * The {@link RichFlatMapFunction} used to invoke Python {@link ScalarFunction} functions for the
- * old planner.
- */
-@Internal
-public class PythonScalarFunctionFlatMap extends AbstractPythonScalarFunctionFlatMap {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final String SCALAR_FUNCTION_SCHEMA_CODER_URN =
-            "flink:coder:schema:scalar_function:v1";
-
-    /** The TypeSerializer for udf input elements. */
-    private transient TypeSerializer<Row> userDefinedFunctionInputTypeSerializer;
-
-    /** The TypeSerializer for user-defined function execution results. */
-    private transient TypeSerializer<Row> userDefinedFunctionOutputTypeSerializer;
-
-    public PythonScalarFunctionFlatMap(
-            Configuration config,
-            PythonFunctionInfo[] scalarFunctions,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            int[] forwardedFields) {
-        super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-        userDefinedFunctionInputTypeSerializer =
-                PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionInputType);
-        userDefinedFunctionOutputTypeSerializer =
-                PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionOutputType);
-    }
-
-    @Override
-    public String getInputOutputCoderUrn() {
-        return SCALAR_FUNCTION_SCHEMA_CODER_URN;
-    }
-
-    @Override
-    public void processElementInternal(Row value) throws Exception {
-        userDefinedFunctionInputTypeSerializer.serialize(getFunctionInput(value), baosWrapper);
-        pythonFunctionRunner.process(baos.toByteArray());
-        baos.reset();
-    }
-
-    @Override
-    @SuppressWarnings("ConstantConditions")
-    public void emitResult(Tuple2<byte[], Integer> resultTuple) throws IOException {
-        byte[] rawUdfResult = resultTuple.f0;
-        int length = resultTuple.f1;
-        Row input = forwardedInputQueue.poll();
-        bais.setBuffer(rawUdfResult, 0, length);
-        Row udfResult = userDefinedFunctionOutputTypeSerializer.deserialize(baisWrapper);
-        this.resultCollector.collect(Row.join(input, udfResult));
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonTableFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonTableFunctionFlatMap.java
deleted file mode 100644
index fd68938..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonTableFunctionFlatMap.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.python;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.functions.python.PythonEnv;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.calcite.rel.core.JoinRelType;
-
-/**
- * The {@link RichFlatMapFunction} used to invoke Python {@link TableFunction} functions for the old
- * planner.
- */
-@Internal
-public final class PythonTableFunctionFlatMap extends AbstractPythonStatelessFunctionFlatMap {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final String TABLE_FUNCTION_SCHEMA_CODER_URN =
-            "flink:coder:schema:table_function:v1";
-
-    private static final String TABLE_FUNCTION_URN = "flink:transform:table_function:v1";
-
-    /** The Python {@link TableFunction} to be executed. */
-    private final PythonFunctionInfo tableFunction;
-
-    /** The correlate join type. */
-    private final JoinRelType joinType;
-
-    /** The TypeSerializer for udf input elements. */
-    private transient TypeSerializer<Row> userDefinedFunctionInputTypeSerializer;
-
-    /** The TypeSerializer for user-defined function execution results. */
-    private transient TypeSerializer<Row> userDefinedFunctionOutputTypeSerializer;
-
-    public PythonTableFunctionFlatMap(
-            Configuration config,
-            PythonFunctionInfo tableFunction,
-            RowType inputType,
-            RowType outputType,
-            int[] udtfInputOffsets,
-            JoinRelType joinType) {
-        super(config, inputType, outputType, udtfInputOffsets);
-        this.tableFunction = Preconditions.checkNotNull(tableFunction);
-        Preconditions.checkArgument(
-                joinType == JoinRelType.INNER || joinType == JoinRelType.LEFT,
-                "The join type should be inner join or left join");
-        this.joinType = joinType;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-
-        RowTypeInfo forwardedInputTypeInfo =
-                (RowTypeInfo)
-                        TypeConversions.fromDataTypeToLegacyInfo(
-                                TypeConversions.fromLogicalToDataType(inputType));
-        forwardedInputSerializer =
-                forwardedInputTypeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
-        userDefinedFunctionInputTypeSerializer =
-                PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionInputType);
-        userDefinedFunctionOutputTypeSerializer =
-                PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionOutputType);
-    }
-
-    @Override
-    public PythonEnv getPythonEnv() {
-        return tableFunction.getPythonFunction().getPythonEnv();
-    }
-
-    @Override
-    public void bufferInput(Row input) {
-        // If the input node is a DataSetCalc node, the RichFlatMapFunction generated by codegen
-        // will reuse the output Row, so here we always copy the input Row to solve this problem.
-        input = forwardedInputSerializer.copy(input);
-        forwardedInputQueue.add(input);
-    }
-
-    @Override
-    @SuppressWarnings("ConstantConditions")
-    public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
-        Row input = forwardedInputQueue.poll();
-        byte[] rawUdtfResult;
-        int length;
-        boolean isFinishResult;
-        boolean hasJoined = false;
-        Row udtfResult;
-        do {
-            rawUdtfResult = resultTuple.f0;
-            length = resultTuple.f1;
-            isFinishResult = isFinishResult(rawUdtfResult, length);
-            if (!isFinishResult) {
-                bais.setBuffer(rawUdtfResult, 0, length);
-                udtfResult = userDefinedFunctionOutputTypeSerializer.deserialize(baisWrapper);
-                this.resultCollector.collect(Row.join(input, udtfResult));
-                resultTuple = pythonFunctionRunner.pollResult();
-                hasJoined = true;
-            } else if (joinType == JoinRelType.LEFT && !hasJoined) {
-                udtfResult = new Row(userDefinedFunctionOutputType.getFieldCount());
-                for (int i = 0; i < udtfResult.getArity(); i++) {
-                    udtfResult.setField(0, null);
-                }
-                this.resultCollector.collect(Row.join(input, udtfResult));
-            }
-        } while (!isFinishResult);
-    }
-
-    @Override
-    public int getForwardedFieldsCount() {
-        return inputType.getFieldCount();
-    }
-
-    @Override
-    public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() {
-        FlinkFnApi.UserDefinedFunctions.Builder builder =
-                FlinkFnApi.UserDefinedFunctions.newBuilder();
-        builder.addUdfs(PythonOperatorUtils.getUserDefinedFunctionProto(tableFunction));
-        builder.setMetricEnabled(getPythonConfig().isMetricEnabled());
-        return builder.build();
-    }
-
-    @Override
-    public String getInputOutputCoderUrn() {
-        return TABLE_FUNCTION_SCHEMA_CODER_URN;
-    }
-
-    @Override
-    public String getFunctionUrn() {
-        return TABLE_FUNCTION_URN;
-    }
-
-    @Override
-    public void processElementInternal(Row value) throws Exception {
-        userDefinedFunctionInputTypeSerializer.serialize(getFunctionInput(value), baosWrapper);
-        pythonFunctionRunner.process(baos.toByteArray());
-        baos.reset();
-    }
-
-    private boolean isFinishResult(byte[] rawUdtfResult, int length) {
-        return length == 1 && rawUdtfResult[0] == 0x00;
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/arrow/ArrowPythonScalarFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/arrow/ArrowPythonScalarFunctionFlatMap.java
deleted file mode 100644
index d7253c5..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/arrow/ArrowPythonScalarFunctionFlatMap.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.python.arrow;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
-import org.apache.flink.table.runtime.arrow.serializers.RowArrowSerializer;
-import org.apache.flink.table.runtime.functions.python.AbstractPythonScalarFunctionFlatMap;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-
-/**
- * The {@link RichFlatMapFunction} used to invoke Arrow Python {@link ScalarFunction} functions for
- * the old planner.
- */
-@Internal
-public final class ArrowPythonScalarFunctionFlatMap extends AbstractPythonScalarFunctionFlatMap {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final String SCHEMA_ARROW_CODER_URN = "flink:coder:schema:arrow:v1";
-
-    /** The current number of elements to be included in an arrow batch. */
-    private transient int currentBatchCount;
-
-    /** Max number of elements to include in an arrow batch. */
-    private final int maxArrowBatchSize;
-
-    private transient ArrowSerializer<Row> arrowSerializer;
-
-    public ArrowPythonScalarFunctionFlatMap(
-            Configuration config,
-            PythonFunctionInfo[] scalarFunctions,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            int[] forwardedFields) {
-        super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-        maxArrowBatchSize = getPythonConfig().getMaxArrowBatchSize();
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-
-        arrowSerializer =
-                new RowArrowSerializer(userDefinedFunctionInputType, userDefinedFunctionOutputType);
-        arrowSerializer.open(bais, baos);
-        currentBatchCount = 0;
-    }
-
-    @Override
-    public void close() throws Exception {
-        invokeCurrentBatch();
-        try {
-            super.close();
-        } finally {
-            if (arrowSerializer != null) {
-                arrowSerializer.close();
-                arrowSerializer = null;
-            }
-        }
-    }
-
-    @Override
-    @SuppressWarnings("ConstantConditions")
-    public void emitResult(Tuple2<byte[], Integer> resultTuple) throws IOException {
-        byte[] udfResult = resultTuple.f0;
-        int length = resultTuple.f1;
-        bais.setBuffer(udfResult, 0, length);
-        int rowCount = arrowSerializer.load();
-        for (int i = 0; i < rowCount; i++) {
-            resultCollector.collect(Row.join(forwardedInputQueue.poll(), arrowSerializer.read(i)));
-        }
-        arrowSerializer.resetReader();
-    }
-
-    @Override
-    public String getInputOutputCoderUrn() {
-        return SCHEMA_ARROW_CODER_URN;
-    }
-
-    @Override
-    public void processElementInternal(Row value) throws Exception {
-        arrowSerializer.write(getFunctionInput(value));
-        currentBatchCount++;
-        if (currentBatchCount >= maxArrowBatchSize) {
-            invokeCurrentBatch();
-        }
-    }
-
-    @Override
-    protected void invokeFinishBundle() throws Exception {
-        invokeCurrentBatch();
-        super.invokeFinishBundle();
-    }
-
-    private void invokeCurrentBatch() throws Exception {
-        if (currentBatchCount > 0) {
-            arrowSerializer.finishCurrentBatch();
-            currentBatchCount = 0;
-            pythonFunctionRunner.process(baos.toByteArray());
-            checkInvokeFinishBundleByCount();
-            baos.reset();
-            arrowSerializer.resetWriter();
-        }
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractRowPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractRowPythonScalarFunctionOperator.java
deleted file mode 100644
index 507f6f9..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractRowPythonScalarFunctionOperator.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.scalar;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.operators.python.utils.StreamRecordCRowWrappingCollector;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.types.CRowTypeInfo;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-
-import java.util.Arrays;
-
-/** Base Python {@link ScalarFunction} operator for the legacy planner. */
-@Internal
-public abstract class AbstractRowPythonScalarFunctionOperator
-        extends AbstractPythonScalarFunctionOperator<CRow, CRow, Row> {
-
-    private static final long serialVersionUID = 1L;
-
-    /** The collector used to collect records. */
-    protected transient StreamRecordCRowWrappingCollector cRowWrapper;
-
-    /** The type serializer for the forwarded fields. */
-    private transient TypeSerializer<CRow> forwardedInputSerializer;
-
-    public AbstractRowPythonScalarFunctionOperator(
-            Configuration config,
-            PythonFunctionInfo[] scalarFunctions,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            int[] forwardedFields) {
-        super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-    }
-
-    @Override
-    public void open() throws Exception {
-        super.open();
-        this.cRowWrapper = new StreamRecordCRowWrappingCollector(output);
-
-        CRowTypeInfo forwardedInputTypeInfo =
-                new CRowTypeInfo(
-                        new RowTypeInfo(
-                                Arrays.stream(forwardedFields)
-                                        .mapToObj(i -> inputType.getFields().get(i))
-                                        .map(RowType.RowField::getType)
-                                        .map(TypeConversions::fromLogicalToDataType)
-                                        .map(TypeConversions::fromDataTypeToLegacyInfo)
-                                        .toArray(TypeInformation[]::new)));
-        forwardedInputSerializer = forwardedInputTypeInfo.createSerializer(getExecutionConfig());
-    }
-
-    @Override
-    public void bufferInput(CRow input) {
-        CRow forwardedFieldsRow =
-                new CRow(Row.project(input.row(), forwardedFields), input.change());
-        if (getExecutionConfig().isObjectReuseEnabled()) {
-            forwardedFieldsRow = forwardedInputSerializer.copy(forwardedFieldsRow);
-        }
-        forwardedInputQueue.add(forwardedFieldsRow);
-    }
-
-    @Override
-    public Row getFunctionInput(CRow element) {
-        return Row.project(element.row(), userDefinedFunctionInputOffsets);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
deleted file mode 100644
index b458850..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.scalar;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-/** The Python {@link ScalarFunction} operator for the legacy planner. */
-@Internal
-public class PythonScalarFunctionOperator extends AbstractRowPythonScalarFunctionOperator {
-
-    private static final long serialVersionUID = 1L;
-
-    /** The TypeSerializer for udf execution results. */
-    private transient TypeSerializer<Row> udfOutputTypeSerializer;
-
-    /** The TypeSerializer for udf input elements. */
-    private transient TypeSerializer<Row> udfInputTypeSerializer;
-
-    public PythonScalarFunctionOperator(
-            Configuration config,
-            PythonFunctionInfo[] scalarFunctions,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            int[] forwardedFields) {
-        super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void open() throws Exception {
-        super.open();
-        udfInputTypeSerializer =
-                PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionInputType);
-        udfOutputTypeSerializer =
-                PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionOutputType);
-    }
-
-    @Override
-    public void processElementInternal(CRow value) throws Exception {
-        udfInputTypeSerializer.serialize(getFunctionInput(value), baosWrapper);
-        pythonFunctionRunner.process(baos.toByteArray());
-        baos.reset();
-    }
-
-    @Override
-    @SuppressWarnings("ConstantConditions")
-    public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
-        byte[] rawUdfResult = resultTuple.f0;
-        int length = resultTuple.f1;
-        CRow input = forwardedInputQueue.poll();
-        cRowWrapper.setChange(input.change());
-        bais.setBuffer(rawUdfResult, 0, length);
-        Row udfResult = udfOutputTypeSerializer.deserialize(baisWrapper);
-        cRowWrapper.collect(Row.join(input.row(), udfResult));
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java
deleted file mode 100644
index d3ce04a..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.scalar.arrow;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
-import org.apache.flink.table.runtime.arrow.serializers.RowArrowSerializer;
-import org.apache.flink.table.runtime.operators.python.scalar.AbstractRowPythonScalarFunctionOperator;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-/** Arrow Python {@link ScalarFunction} operator for the old planner. */
-@Internal
-public class ArrowPythonScalarFunctionOperator extends AbstractRowPythonScalarFunctionOperator {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final String SCHEMA_ARROW_CODER_URN = "flink:coder:schema:arrow:v1";
-
-    /** The current number of elements to be included in an arrow batch. */
-    private transient int currentBatchCount;
-
-    /** Max number of elements to include in an arrow batch. */
-    private transient int maxArrowBatchSize;
-
-    private transient ArrowSerializer<Row> arrowSerializer;
-
-    public ArrowPythonScalarFunctionOperator(
-            Configuration config,
-            PythonFunctionInfo[] scalarFunctions,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            int[] forwardedFields) {
-        super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-    }
-
-    @Override
-    public void open() throws Exception {
-        super.open();
-        maxArrowBatchSize = Math.min(getPythonConfig().getMaxArrowBatchSize(), maxBundleSize);
-        arrowSerializer =
-                new RowArrowSerializer(userDefinedFunctionInputType, userDefinedFunctionOutputType);
-        arrowSerializer.open(bais, baos);
-        currentBatchCount = 0;
-    }
-
-    @Override
-    protected void invokeFinishBundle() throws Exception {
-        invokeCurrentBatch();
-        super.invokeFinishBundle();
-    }
-
-    @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        if (arrowSerializer != null) {
-            arrowSerializer.close();
-            arrowSerializer = null;
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        invokeCurrentBatch();
-        super.close();
-    }
-
-    @Override
-    public void endInput() throws Exception {
-        invokeCurrentBatch();
-        super.endInput();
-    }
-
-    @Override
-    @SuppressWarnings("ConstantConditions")
-    public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
-        byte[] udfResult = resultTuple.f0;
-        int length = resultTuple.f1;
-        bais.setBuffer(udfResult, 0, length);
-        int rowCount = arrowSerializer.load();
-        for (int i = 0; i < rowCount; i++) {
-            CRow input = forwardedInputQueue.poll();
-            cRowWrapper.setChange(input.change());
-            cRowWrapper.collect(Row.join(input.row(), arrowSerializer.read(i)));
-        }
-        arrowSerializer.resetReader();
-    }
-
-    @Override
-    public String getInputOutputCoderUrn() {
-        return SCHEMA_ARROW_CODER_URN;
-    }
-
-    @Override
-    public void processElementInternal(CRow value) throws Exception {
-        arrowSerializer.write(getFunctionInput(value));
-        currentBatchCount++;
-        if (currentBatchCount >= maxArrowBatchSize) {
-            invokeCurrentBatch();
-        }
-    }
-
-    private void invokeCurrentBatch() throws Exception {
-        if (currentBatchCount > 0) {
-            arrowSerializer.finishCurrentBatch();
-            currentBatchCount = 0;
-            pythonFunctionRunner.process(baos.toByteArray());
-            checkInvokeFinishBundleByCount();
-            baos.reset();
-            arrowSerializer.resetWriter();
-        }
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
deleted file mode 100644
index d6282dc..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.table;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.planner.plan.utils.JoinTypeUtil;
-import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
-import org.apache.flink.table.runtime.operators.python.utils.StreamRecordCRowWrappingCollector;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.types.CRowTypeInfo;
-import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-
-import org.apache.calcite.rel.core.JoinRelType;
-
-/** The Python {@link TableFunction} operator for the legacy planner. */
-@Internal
-public class PythonTableFunctionOperator
-        extends AbstractPythonTableFunctionOperator<CRow, CRow, Row> {
-
-    private static final long serialVersionUID = 1L;
-
-    /** The collector used to collect records. */
-    private transient StreamRecordCRowWrappingCollector cRowWrapper;
-
-    /** The type serializer for the forwarded fields. */
-    private transient TypeSerializer<CRow> forwardedInputSerializer;
-
-    /** The TypeSerializer for udtf execution results. */
-    private transient TypeSerializer<Row> udtfOutputTypeSerializer;
-
-    /** The TypeSerializer for udtf input elements. */
-    private transient TypeSerializer<Row> udtfInputTypeSerializer;
-
-    public PythonTableFunctionOperator(
-            Configuration config,
-            PythonFunctionInfo tableFunction,
-            RowType inputType,
-            RowType outputType,
-            int[] udtfInputOffsets,
-            JoinRelType joinType) {
-        super(
-                config,
-                tableFunction,
-                inputType,
-                outputType,
-                udtfInputOffsets,
-                JoinTypeUtil.getFlinkJoinType(joinType));
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void open() throws Exception {
-        super.open();
-        this.cRowWrapper = new StreamRecordCRowWrappingCollector(output);
-        CRowTypeInfo forwardedInputTypeInfo =
-                new CRowTypeInfo(
-                        (RowTypeInfo)
-                                TypeConversions.fromDataTypeToLegacyInfo(
-                                        TypeConversions.fromLogicalToDataType(inputType)));
-        forwardedInputSerializer = forwardedInputTypeInfo.createSerializer(getExecutionConfig());
-        udtfOutputTypeSerializer =
-                PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionOutputType);
-        udtfInputTypeSerializer =
-                PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionInputType);
-    }
-
-    @Override
-    @SuppressWarnings("ConstantConditions")
-    public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
-        CRow input = forwardedInputQueue.poll();
-        byte[] rawUdtfResult;
-        int length;
-        boolean isFinishResult;
-        boolean hasJoined = false;
-        Row udtfResult;
-        do {
-            rawUdtfResult = resultTuple.f0;
-            length = resultTuple.f1;
-            isFinishResult = isFinishResult(rawUdtfResult, length);
-            if (!isFinishResult) {
-                bais.setBuffer(rawUdtfResult, 0, length);
-                udtfResult = udtfOutputTypeSerializer.deserialize(baisWrapper);
-                cRowWrapper.setChange(input.change());
-                cRowWrapper.collect(Row.join(input.row(), udtfResult));
-                resultTuple = pythonFunctionRunner.pollResult();
-                hasJoined = true;
-            } else if (joinType == FlinkJoinType.LEFT && !hasJoined) {
-                udtfResult = new Row(userDefinedFunctionOutputType.getFieldCount());
-                for (int i = 0; i < udtfResult.getArity(); i++) {
-                    udtfResult.setField(0, null);
-                }
-                cRowWrapper.setChange(input.change());
-                cRowWrapper.collect(Row.join(input.row(), udtfResult));
-            }
-        } while (!isFinishResult);
-    }
-
-    @Override
-    public void bufferInput(CRow input) {
-        if (getExecutionConfig().isObjectReuseEnabled()) {
-            input = forwardedInputSerializer.copy(input);
-        }
-        forwardedInputQueue.add(input);
-    }
-
-    @Override
-    public Row getFunctionInput(CRow element) {
-        return Row.project(element.row(), userDefinedFunctionInputOffsets);
-    }
-
-    @Override
-    public void processElementInternal(CRow value) throws Exception {
-        udtfInputTypeSerializer.serialize(getFunctionInput(value), baosWrapper);
-        pythonFunctionRunner.process(baos.toByteArray());
-        baos.reset();
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/utils/StreamRecordCRowWrappingCollector.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/utils/StreamRecordCRowWrappingCollector.java
deleted file mode 100644
index 502de8e..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/utils/StreamRecordCRowWrappingCollector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.utils;
-
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Collector;
-
-/** The collector is used to convert a {@link Row} to a {@link CRow}. */
-public class StreamRecordCRowWrappingCollector implements Collector<Row> {
-
-    private final Collector<StreamRecord<CRow>> out;
-    private final CRow reuseCRow = new CRow();
-
-    /** For Table API & SQL jobs, the timestamp field is not used. */
-    private final StreamRecord<CRow> reuseStreamRecord = new StreamRecord<>(reuseCRow);
-
-    public StreamRecordCRowWrappingCollector(Collector<StreamRecord<CRow>> out) {
-        this.out = out;
-    }
-
-    public void setChange(boolean change) {
-        this.reuseCRow.change_$eq(change);
-    }
-
-    @Override
-    public void collect(Row record) {
-        reuseCRow.row_$eq(record);
-        out.collect(reuseStreamRecord);
-    }
-
-    @Override
-    public void close() {
-        out.close();
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java b/flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
index c8824fe..7676bfc 100644
--- a/flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
+++ b/flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
@@ -17,12 +17,10 @@
 
 package org.apache.flink.client.python;
 
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.util.FileUtils;
 
@@ -45,7 +43,6 @@ import static org.apache.flink.table.api.Expressions.call;
 public class PythonFunctionFactoryTest {
 
     private static String tmpdir = "";
-    private static BatchTableEnvironment flinkTableEnv;
     private static StreamTableEnvironment blinkTableEnv;
     private static Table flinkSourceTable;
     private static Table blinkSourceTable;
@@ -72,13 +69,6 @@ public class PythonFunctionFactoryTest {
                             + "    return str + str\n";
             out.write(code.getBytes());
         }
-        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-        flinkTableEnv = BatchTableEnvironment.create(env);
-        flinkTableEnv
-                .getConfig()
-                .getConfiguration()
-                .set(PYTHON_FILES, pyFilePath.getAbsolutePath());
-        flinkTableEnv.getConfig().getConfiguration().setString(TASK_OFF_HEAP_MEMORY.key(), "80mb");
         StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
         blinkTableEnv =
                 StreamTableEnvironment.create(
@@ -92,7 +82,6 @@ public class PythonFunctionFactoryTest {
                 .getConfiguration()
                 .set(PYTHON_FILES, pyFilePath.getAbsolutePath());
         blinkTableEnv.getConfig().getConfiguration().setString(TASK_OFF_HEAP_MEMORY.key(), "80mb");
-        flinkSourceTable = flinkTableEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str");
         blinkSourceTable = blinkTableEnv.fromDataStream(sEnv.fromElements("1", "2", "3")).as("str");
     }
 
@@ -102,24 +91,6 @@ public class PythonFunctionFactoryTest {
     }
 
     public static void testPythonFunctionFactory() {
-        // flink catalog
-        flinkTableEnv.executeSql("create function func1 as 'test1.func1' language python");
-        verifyPlan(flinkSourceTable.select(call("func1", $("str"))), flinkTableEnv);
-
-        // flink catalog
-        flinkTableEnv.executeSql("alter function func1 as 'test1.func1' language python");
-        verifyPlan(flinkSourceTable.select(call("func1", $("str"))), flinkTableEnv);
-
-        // flink temporary catalog
-        flinkTableEnv.executeSql(
-                "create temporary function func1 as 'test1.func1' language python");
-        verifyPlan(flinkSourceTable.select(call("func1", $("str"))), flinkTableEnv);
-
-        // flink temporary system
-        flinkTableEnv.executeSql(
-                "create temporary system function func1 as 'test1.func1' language python");
-        verifyPlan(flinkSourceTable.select(call("func1", $("str"))), flinkTableEnv);
-
         // blink catalog
         blinkTableEnv.executeSql("create function func1 as 'test1.func1' language python");
         verifyPlan(blinkSourceTable.select(call("func1", $("str"))), blinkTableEnv);
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
deleted file mode 100644
index 6f4cd18..0000000
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.scalar;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.PythonFunctionRunner;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.utils.PassThroughPythonScalarFunctionRunner;
-import org.apache.flink.table.runtime.utils.PythonTestUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Queue;
-
-/** Tests for {@link PythonScalarFunctionOperator}. */
-public class PythonScalarFunctionOperatorTest
-        extends PythonScalarFunctionOperatorTestBase<CRow, CRow, Row> {
-
-    @Override
-    public AbstractPythonScalarFunctionOperator<CRow, CRow, Row> getTestOperator(
-            Configuration config,
-            PythonFunctionInfo[] scalarFunctions,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            int[] forwardedFields) {
-        return new PassThroughPythonScalarFunctionOperator(
-                config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-    }
-
-    @Override
-    public CRow newRow(boolean accumulateMsg, Object... fields) {
-        return new CRow(Row.of(fields), accumulateMsg);
-    }
-
-    @Override
-    public void assertOutputEquals(
-            String message, Collection<Object> expected, Collection<Object> actual) {
-        TestHarnessUtil.assertOutputEquals(
-                message, (Queue<Object>) expected, (Queue<Object>) actual);
-    }
-
-    @Override
-    public StreamTableEnvironment createTableEnvironment(StreamExecutionEnvironment env) {
-        return StreamTableEnvironment.create(env);
-    }
-
-    @Override
-    public TypeSerializer<CRow> getOutputTypeSerializer(RowType dataType) {
-        // If set to null, PojoSerializer is used by default which works well here.
-        return null;
-    }
-
-    private static class PassThroughPythonScalarFunctionOperator
-            extends PythonScalarFunctionOperator {
-
-        PassThroughPythonScalarFunctionOperator(
-                Configuration config,
-                PythonFunctionInfo[] scalarFunctions,
-                RowType inputType,
-                RowType outputType,
-                int[] udfInputOffsets,
-                int[] forwardedFields) {
-            super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-        }
-
-        @Override
-        public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
-            return new PassThroughPythonScalarFunctionRunner(
-                    getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
-                    userDefinedFunctionInputType,
-                    userDefinedFunctionOutputType,
-                    getFunctionUrn(),
-                    getUserDefinedFunctionsProto(),
-                    getInputOutputCoderUrn(),
-                    new HashMap<>(),
-                    PythonTestUtils.createMockFlinkMetricContainer());
-        }
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
deleted file mode 100644
index 6274a83..0000000
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.scalar.arrow;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.PythonFunctionRunner;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator;
-import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.utils.PassThroughPythonScalarFunctionRunner;
-import org.apache.flink.table.runtime.utils.PythonTestUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Queue;
-
-/** Tests for {@link ArrowPythonScalarFunctionOperator}. */
-public class ArrowPythonScalarFunctionOperatorTest
-        extends PythonScalarFunctionOperatorTestBase<CRow, CRow, Row> {
-
-    public AbstractPythonScalarFunctionOperator<CRow, CRow, Row> getTestOperator(
-            Configuration config,
-            PythonFunctionInfo[] scalarFunctions,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            int[] forwardedFields) {
-        return new PassThroughArrowPythonScalarFunctionOperator(
-                config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-    }
-
-    public CRow newRow(boolean accumulateMsg, Object... fields) {
-        return new CRow(Row.of(fields), accumulateMsg);
-    }
-
-    public void assertOutputEquals(
-            String message, Collection<Object> expected, Collection<Object> actual) {
-        TestHarnessUtil.assertOutputEquals(
-                message, (Queue<Object>) expected, (Queue<Object>) actual);
-    }
-
-    public StreamTableEnvironment createTableEnvironment(StreamExecutionEnvironment env) {
-        return StreamTableEnvironment.create(env);
-    }
-
-    @Override
-    public TypeSerializer<CRow> getOutputTypeSerializer(RowType dataType) {
-        // If set to null, PojoSerializer is used by default which works well here.
-        return null;
-    }
-
-    private static class PassThroughArrowPythonScalarFunctionOperator
-            extends ArrowPythonScalarFunctionOperator {
-
-        PassThroughArrowPythonScalarFunctionOperator(
-                Configuration config,
-                PythonFunctionInfo[] scalarFunctions,
-                RowType inputType,
-                RowType outputType,
-                int[] udfInputOffsets,
-                int[] forwardedFields) {
-            super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-        }
-
-        @Override
-        public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
-            return new PassThroughPythonScalarFunctionRunner(
-                    getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
-                    userDefinedFunctionInputType,
-                    userDefinedFunctionOutputType,
-                    getFunctionUrn(),
-                    getUserDefinedFunctionsProto(),
-                    getInputOutputCoderUrn(),
-                    new HashMap<>(),
-                    PythonTestUtils.createMockFlinkMetricContainer());
-        }
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
deleted file mode 100644
index 1b3cc01..0000000
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.table;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.PythonFunctionRunner;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.utils.PassThroughPythonTableFunctionRunner;
-import org.apache.flink.table.runtime.utils.PythonTestUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import org.apache.calcite.rel.core.JoinRelType;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Queue;
-
-/** Tests for {@link PythonTableFunctionOperator}. */
-public class PythonTableFunctionOperatorTest
-        extends PythonTableFunctionOperatorTestBase<CRow, CRow, Row> {
-    @Override
-    public AbstractPythonTableFunctionOperator<CRow, CRow, Row> getTestOperator(
-            Configuration config,
-            PythonFunctionInfo tableFunction,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            JoinRelType joinRelType) {
-        return new PassThroughPythonTableFunctionOperator(
-                config, tableFunction, inputType, outputType, udfInputOffsets, joinRelType);
-    }
-
-    @Override
-    public CRow newRow(boolean accumulateMsg, Object... fields) {
-        return new CRow(Row.of(fields), accumulateMsg);
-    }
-
-    @Override
-    public void assertOutputEquals(
-            String message, Collection<Object> expected, Collection<Object> actual) {
-        TestHarnessUtil.assertOutputEquals(
-                message, (Queue<Object>) expected, (Queue<Object>) actual);
-    }
-
-    private static class PassThroughPythonTableFunctionOperator
-            extends PythonTableFunctionOperator {
-
-        PassThroughPythonTableFunctionOperator(
-                Configuration config,
-                PythonFunctionInfo tableFunction,
-                RowType inputType,
-                RowType outputType,
-                int[] udfInputOffsets,
-                JoinRelType joinRelType) {
-            super(config, tableFunction, inputType, outputType, udfInputOffsets, joinRelType);
-        }
-
-        @Override
-        public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
-            return new PassThroughPythonTableFunctionRunner(
-                    getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
-                    userDefinedFunctionInputType,
-                    userDefinedFunctionOutputType,
-                    getFunctionUrn(),
-                    getUserDefinedFunctionsProto(),
-                    getInputOutputCoderUrn(),
-                    new HashMap<>(),
-                    PythonTestUtils.createMockFlinkMetricContainer());
-        }
-    }
-}
diff --git a/flink-python/tox.ini b/flink-python/tox.ini
index ce7d02c..2cc04f6 100644
--- a/flink-python/tox.ini
+++ b/flink-python/tox.ini
@@ -53,7 +53,7 @@ max-line-length=100
 exclude=.tox/*,dev/*,lib/*,target/*,build/*,dist/*,pyflink/shell.py,.eggs/*,pyflink/fn_execution/tests/process_mode_test_data.py,pyflink/fn_execution/*_pb2.py
 
 [mypy]
-files=pyflink/common/*.py,pyflink/table/*.py,pyflink/dataset/*.py,pyflink/datastream/*.py,pyflink/metrics/*.py
+files=pyflink/common/*.py,pyflink/table/*.py,pyflink/datastream/*.py,pyflink/metrics/*.py
 ignore_missing_imports = True
 strict_optional=False