You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/05/28 13:21:53 UTC

[flink] branch master updated (a1da49d -> a2afa88)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from a1da49d  [FLINK-22655][sql-client] Fix "-i init.sql" doesn't work when first line is a comment
     new 9fe0ec7  [FLINK-22619][python] Drop usages of BatchTableEnvironment and old planner in Python
     new a2afa88  [FLINK-22619][python] Drop Python UDF serializers for old planner

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../docs/dev/python/table/table_environment.md     |   2 +-
 .../docs/dev/python/table/table_environment.md     |   2 +-
 .../flink-python-test/python/python_job.py         |  16 +-
 .../python/tests/FlinkBatchPythonUdfSqlJob.java    |  56 --
 .../python/tests/FlinkStreamPythonUdfSqlJob.java   |  63 ---
 .../test-scripts/test_pyflink.sh                   |  20 -
 flink-python/dev/integration_test.sh               |   3 -
 flink-python/dev/pip_test_code.py                  |  12 +-
 flink-python/docs/index.rst                        |   1 -
 flink-python/docs/pyflink.dataset.rst              |  28 -
 flink-python/docs/pyflink.rst                      |   1 -
 flink-python/pom.xml                               |   8 +-
 .../pyflink/common/tests/test_execution_config.py  |   8 +-
 flink-python/pyflink/dataset/__init__.py           |  27 -
 .../pyflink/dataset/execution_environment.py       | 197 -------
 flink-python/pyflink/dataset/tests/__init__.py     |  17 -
 .../dataset/tests/test_execution_environment.py    | 137 -----
 .../test_execution_environment_completeness.py     |  64 ---
 ...st_stream_execution_environment_completeness.py |   4 +-
 flink-python/pyflink/shell.py                      |  35 --
 flink-python/pyflink/table/__init__.py             |   4 +-
 .../pyflink/table/examples/batch/word_count.py     |   8 +-
 flink-python/pyflink/table/table.py                |   2 +-
 flink-python/pyflink/table/table_environment.py    | 200 +------
 flink-python/pyflink/table/tests/test_calc.py      |  46 +-
 .../pyflink/table/tests/test_dependency.py         |  33 +-
 .../pyflink/table/tests/test_descriptor.py         |  56 +-
 .../pyflink/table/tests/test_pandas_conversion.py  |   7 +-
 .../pyflink/table/tests/test_pandas_udf.py         |  30 +-
 .../pyflink/table/tests/test_set_operation.py      |   4 +-
 .../pyflink/table/tests/test_shell_example.py      |  34 --
 flink-python/pyflink/table/tests/test_sort.py      |   4 +-
 flink-python/pyflink/table/tests/test_sql.py       |  12 +-
 .../table/tests/test_table_environment_api.py      | 629 +--------------------
 flink-python/pyflink/table/tests/test_udf.py       |  23 +-
 flink-python/pyflink/table/tests/test_udtf.py      |  29 +-
 flink-python/pyflink/testing/test_case_utils.py    |  90 +--
 flink-python/setup.py                              |   1 -
 .../flink/table/runtime/arrow/ArrowUtils.java      | 253 +--------
 .../runtime/arrow/readers/ArrayFieldReader.java    |  82 ---
 .../runtime/arrow/readers/ArrowFieldReader.java    |  48 --
 .../runtime/arrow/readers/BigIntFieldReader.java   |  37 --
 .../runtime/arrow/readers/BooleanFieldReader.java  |  37 --
 .../runtime/arrow/readers/DateFieldReader.java     |  44 --
 .../runtime/arrow/readers/DecimalFieldReader.java  |  39 --
 .../runtime/arrow/readers/DoubleFieldReader.java   |  37 --
 .../runtime/arrow/readers/FloatFieldReader.java    |  37 --
 .../runtime/arrow/readers/IntFieldReader.java      |  37 --
 .../runtime/arrow/readers/RowArrowReader.java      |  55 --
 .../runtime/arrow/readers/RowFieldReader.java      |  50 --
 .../runtime/arrow/readers/SmallIntFieldReader.java |  37 --
 .../runtime/arrow/readers/TimeFieldReader.java     |  68 ---
 .../arrow/readers/TimestampFieldReader.java        |  66 ---
 .../runtime/arrow/readers/TinyIntFieldReader.java  |  37 --
 .../arrow/readers/VarBinaryFieldReader.java        |  37 --
 .../runtime/arrow/readers/VarCharFieldReader.java  |  43 --
 .../arrow/serializers/RowArrowSerializer.java      |  46 --
 .../arrow/sources/RowArrowSourceFunction.java      |  52 --
 .../runtime/arrow/sources/RowArrowTableSource.java |  38 --
 .../AbstractPythonScalarFunctionFlatMap.java       | 119 ----
 .../AbstractPythonStatelessFunctionFlatMap.java    | 312 ----------
 .../python/PythonScalarFunctionFlatMap.java        |  94 ---
 .../python/PythonTableFunctionFlatMap.java         | 174 ------
 .../arrow/ArrowPythonScalarFunctionFlatMap.java    | 131 -----
 .../AbstractRowPythonScalarFunctionOperator.java   |  91 ---
 .../scalar/PythonScalarFunctionOperator.java       |  82 ---
 .../arrow/ArrowPythonScalarFunctionOperator.java   | 135 -----
 .../python/table/PythonTableFunctionOperator.java  | 142 -----
 .../utils/StreamRecordCRowWrappingCollector.java   |  53 --
 .../table/runtime/typeutils/PythonTypeUtils.java   | 144 +----
 .../client/python/PythonFunctionFactoryTest.java   |  29 -
 .../flink/table/runtime/arrow/ArrowUtilsTest.java  | 124 +---
 .../runtime/arrow/RowArrowReaderWriterTest.java    | 226 --------
 .../arrow/sources/RowArrowSourceFunctionTest.java  |  87 ---
 .../scalar/PythonScalarFunctionOperatorTest.java   | 105 ----
 .../ArrowPythonScalarFunctionOperatorTest.java     | 103 ----
 .../table/PythonTableFunctionOperatorTest.java     |  92 ---
 .../runtime/typeutils/PythonTypeUtilsTest.java     |  26 +-
 flink-python/tox.ini                               |   2 +-
 79 files changed, 124 insertions(+), 5240 deletions(-)
 delete mode 100644 flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkBatchPythonUdfSqlJob.java
 delete mode 100644 flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkStreamPythonUdfSqlJob.java
 delete mode 100644 flink-python/docs/pyflink.dataset.rst
 delete mode 100644 flink-python/pyflink/dataset/__init__.py
 delete mode 100644 flink-python/pyflink/dataset/execution_environment.py
 delete mode 100644 flink-python/pyflink/dataset/tests/__init__.py
 delete mode 100644 flink-python/pyflink/dataset/tests/test_execution_environment.py
 delete mode 100644 flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrayFieldReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrowFieldReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BigIntFieldReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BooleanFieldReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DateFieldReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DecimalFieldReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DoubleFieldReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/FloatFieldReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/IntFieldReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowArrowReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowFieldReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/SmallIntFieldReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimeFieldReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimestampFieldReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TinyIntFieldReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarBinaryFieldReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarCharFieldReader.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/RowArrowSerializer.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunction.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowTableSource.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonScalarFunctionFlatMap.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonTableFunctionFlatMap.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/arrow/ArrowPythonScalarFunctionFlatMap.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractRowPythonScalarFunctionOperator.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
 delete mode 100644 flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/utils/StreamRecordCRowWrappingCollector.java
 delete mode 100644 flink-python/src/test/java/org/apache/flink/table/runtime/arrow/RowArrowReaderWriterTest.java
 delete mode 100644 flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java
 delete mode 100644 flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
 delete mode 100644 flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
 delete mode 100644 flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java

[flink] 02/02: [FLINK-22619][python] Drop Python UDF serializers for old planner

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a2afa88d910bc5d99e95c8f9069fb13f5f75edd5
Author: huangxingbo <hx...@gmail.com>
AuthorDate: Thu May 27 20:01:26 2021 +0800

    [FLINK-22619][python] Drop Python UDF serializers for old planner
    
    This closes #16008.
---
 .../flink/table/runtime/arrow/ArrowUtils.java      | 171 +---------------
 .../runtime/arrow/readers/ArrayFieldReader.java    |  82 --------
 .../runtime/arrow/readers/ArrowFieldReader.java    |  48 -----
 .../runtime/arrow/readers/BigIntFieldReader.java   |  37 ----
 .../runtime/arrow/readers/BooleanFieldReader.java  |  37 ----
 .../runtime/arrow/readers/DateFieldReader.java     |  44 ----
 .../runtime/arrow/readers/DecimalFieldReader.java  |  39 ----
 .../runtime/arrow/readers/DoubleFieldReader.java   |  37 ----
 .../runtime/arrow/readers/FloatFieldReader.java    |  37 ----
 .../runtime/arrow/readers/IntFieldReader.java      |  37 ----
 .../runtime/arrow/readers/RowArrowReader.java      |  55 -----
 .../runtime/arrow/readers/RowFieldReader.java      |  50 -----
 .../runtime/arrow/readers/SmallIntFieldReader.java |  37 ----
 .../runtime/arrow/readers/TimeFieldReader.java     |  68 -------
 .../arrow/readers/TimestampFieldReader.java        |  66 ------
 .../runtime/arrow/readers/TinyIntFieldReader.java  |  37 ----
 .../arrow/readers/VarBinaryFieldReader.java        |  37 ----
 .../runtime/arrow/readers/VarCharFieldReader.java  |  43 ----
 .../arrow/serializers/RowArrowSerializer.java      |  46 -----
 .../arrow/sources/RowArrowSourceFunction.java      |  52 -----
 .../runtime/arrow/sources/RowArrowTableSource.java |  38 ----
 .../table/runtime/typeutils/PythonTypeUtils.java   | 144 +------------
 .../flink/table/runtime/arrow/ArrowUtilsTest.java  | 124 +++--------
 .../runtime/arrow/RowArrowReaderWriterTest.java    | 226 ---------------------
 .../arrow/sources/RowArrowSourceFunctionTest.java  |  87 --------
 .../runtime/typeutils/PythonTypeUtilsTest.java     |  26 +--
 26 files changed, 40 insertions(+), 1665 deletions(-)

diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
index 1264247..1821f5c 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
@@ -32,26 +32,8 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.table.data.vector.ColumnVector;
 import org.apache.flink.table.operations.OutputConversionModifyOperation;
-import org.apache.flink.table.runtime.arrow.readers.ArrayFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.ArrowFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.BooleanFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DateFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DecimalFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DoubleFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.FloatFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.IntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.RowArrowReader;
-import org.apache.flink.table.runtime.arrow.readers.RowFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.SmallIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TimeFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TimestampFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TinyIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.VarBinaryFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.VarCharFieldReader;
 import org.apache.flink.table.runtime.arrow.sources.AbstractArrowTableSource;
 import org.apache.flink.table.runtime.arrow.sources.ArrowTableSource;
-import org.apache.flink.table.runtime.arrow.sources.RowArrowTableSource;
 import org.apache.flink.table.runtime.arrow.vectors.ArrowArrayColumnVector;
 import org.apache.flink.table.runtime.arrow.vectors.ArrowBigIntColumnVector;
 import org.apache.flink.table.runtime.arrow.vectors.ArrowBooleanColumnVector;
@@ -77,21 +59,6 @@ import org.apache.flink.table.runtime.arrow.writers.DecimalWriter;
 import org.apache.flink.table.runtime.arrow.writers.DoubleWriter;
 import org.apache.flink.table.runtime.arrow.writers.FloatWriter;
 import org.apache.flink.table.runtime.arrow.writers.IntWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowArrayWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowBigIntWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowBooleanWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowDateWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowDecimalWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowDoubleWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowFloatWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowIntWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowRowWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowSmallIntWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowTimeWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowTimestampWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowTinyIntWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowVarBinaryWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowVarCharWriter;
 import org.apache.flink.table.runtime.arrow.writers.RowWriter;
 import org.apache.flink.table.runtime.arrow.writers.SmallIntWriter;
 import org.apache.flink.table.runtime.arrow.writers.TimeWriter;
@@ -238,73 +205,6 @@ public final class ArrowUtils {
         return new Field(fieldName, fieldType, children);
     }
 
-    /** Creates an {@link ArrowWriter} for the specified {@link VectorSchemaRoot}. */
-    public static ArrowWriter<Row> createRowArrowWriter(VectorSchemaRoot root, RowType rowType) {
-        ArrowFieldWriter<Row>[] fieldWriters = new ArrowFieldWriter[root.getFieldVectors().size()];
-        List<FieldVector> vectors = root.getFieldVectors();
-        for (int i = 0; i < vectors.size(); i++) {
-            FieldVector vector = vectors.get(i);
-            vector.allocateNew();
-            fieldWriters[i] = createRowArrowFieldWriter(vector, rowType.getTypeAt(i));
-        }
-
-        return new ArrowWriter<>(root, fieldWriters);
-    }
-
-    private static ArrowFieldWriter<Row> createRowArrowFieldWriter(
-            ValueVector vector, LogicalType fieldType) {
-        if (vector instanceof TinyIntVector) {
-            return new RowTinyIntWriter((TinyIntVector) vector);
-        } else if (vector instanceof SmallIntVector) {
-            return new RowSmallIntWriter((SmallIntVector) vector);
-        } else if (vector instanceof IntVector) {
-            return new RowIntWriter((IntVector) vector);
-        } else if (vector instanceof BigIntVector) {
-            return new RowBigIntWriter((BigIntVector) vector);
-        } else if (vector instanceof BitVector) {
-            return new RowBooleanWriter((BitVector) vector);
-        } else if (vector instanceof Float4Vector) {
-            return new RowFloatWriter((Float4Vector) vector);
-        } else if (vector instanceof Float8Vector) {
-            return new RowDoubleWriter((Float8Vector) vector);
-        } else if (vector instanceof VarCharVector) {
-            return new RowVarCharWriter((VarCharVector) vector);
-        } else if (vector instanceof VarBinaryVector) {
-            return new RowVarBinaryWriter((VarBinaryVector) vector);
-        } else if (vector instanceof DecimalVector) {
-            DecimalVector decimalVector = (DecimalVector) vector;
-            return new RowDecimalWriter(
-                    decimalVector, getPrecision(decimalVector), decimalVector.getScale());
-        } else if (vector instanceof DateDayVector) {
-            return new RowDateWriter((DateDayVector) vector);
-        } else if (vector instanceof TimeSecVector
-                || vector instanceof TimeMilliVector
-                || vector instanceof TimeMicroVector
-                || vector instanceof TimeNanoVector) {
-            return new RowTimeWriter(vector);
-        } else if (vector instanceof TimeStampVector
-                && ((ArrowType.Timestamp) vector.getField().getType()).getTimezone() == null) {
-            return new RowTimestampWriter(vector);
-        } else if (vector instanceof ListVector) {
-            ListVector listVector = (ListVector) vector;
-            LogicalType elementType = ((ArrayType) fieldType).getElementType();
-            return new RowArrayWriter(
-                    listVector, createRowArrowFieldWriter(listVector.getDataVector(), elementType));
-        } else if (vector instanceof StructVector) {
-            RowType rowType = (RowType) fieldType;
-            ArrowFieldWriter<Row>[] fieldsWriters = new ArrowFieldWriter[rowType.getFieldCount()];
-            for (int i = 0; i < fieldsWriters.length; i++) {
-                fieldsWriters[i] =
-                        createRowArrowFieldWriter(
-                                ((StructVector) vector).getVectorById(i), rowType.getTypeAt(i));
-            }
-            return new RowRowWriter((StructVector) vector, fieldsWriters);
-        } else {
-            throw new UnsupportedOperationException(
-                    String.format("Unsupported type %s.", fieldType));
-        }
-    }
-
     /**
      * Creates an {@link ArrowWriter} for blink planner for the specified {@link VectorSchemaRoot}.
      */
@@ -446,71 +346,6 @@ public final class ArrowUtils {
         }
     }
 
-    /** Creates an {@link ArrowReader} for the specified {@link VectorSchemaRoot}. */
-    public static RowArrowReader createRowArrowReader(VectorSchemaRoot root, RowType rowType) {
-        List<ArrowFieldReader> fieldReaders = new ArrayList<>();
-        List<FieldVector> fieldVectors = root.getFieldVectors();
-        for (int i = 0; i < fieldVectors.size(); i++) {
-            fieldReaders.add(createRowArrowFieldReader(fieldVectors.get(i), rowType.getTypeAt(i)));
-        }
-
-        return new RowArrowReader(fieldReaders.toArray(new ArrowFieldReader[0]));
-    }
-
-    public static ArrowFieldReader createRowArrowFieldReader(
-            ValueVector vector, LogicalType fieldType) {
-        if (vector instanceof TinyIntVector) {
-            return new TinyIntFieldReader((TinyIntVector) vector);
-        } else if (vector instanceof SmallIntVector) {
-            return new SmallIntFieldReader((SmallIntVector) vector);
-        } else if (vector instanceof IntVector) {
-            return new IntFieldReader((IntVector) vector);
-        } else if (vector instanceof BigIntVector) {
-            return new BigIntFieldReader((BigIntVector) vector);
-        } else if (vector instanceof BitVector) {
-            return new BooleanFieldReader((BitVector) vector);
-        } else if (vector instanceof Float4Vector) {
-            return new FloatFieldReader((Float4Vector) vector);
-        } else if (vector instanceof Float8Vector) {
-            return new DoubleFieldReader((Float8Vector) vector);
-        } else if (vector instanceof VarCharVector) {
-            return new VarCharFieldReader((VarCharVector) vector);
-        } else if (vector instanceof VarBinaryVector) {
-            return new VarBinaryFieldReader((VarBinaryVector) vector);
-        } else if (vector instanceof DecimalVector) {
-            return new DecimalFieldReader((DecimalVector) vector);
-        } else if (vector instanceof DateDayVector) {
-            return new DateFieldReader((DateDayVector) vector);
-        } else if (vector instanceof TimeSecVector
-                || vector instanceof TimeMilliVector
-                || vector instanceof TimeMicroVector
-                || vector instanceof TimeNanoVector) {
-            return new TimeFieldReader(vector);
-        } else if (vector instanceof TimeStampVector
-                && ((ArrowType.Timestamp) vector.getField().getType()).getTimezone() == null) {
-            return new TimestampFieldReader(vector);
-        } else if (vector instanceof ListVector) {
-            ListVector listVector = (ListVector) vector;
-            LogicalType elementType = ((ArrayType) fieldType).getElementType();
-            return new ArrayFieldReader(
-                    listVector,
-                    createRowArrowFieldReader(listVector.getDataVector(), elementType),
-                    elementType);
-        } else if (vector instanceof StructVector) {
-            StructVector structVector = (StructVector) vector;
-            ArrowFieldReader[] fieldReaders = new ArrowFieldReader[structVector.size()];
-            for (int i = 0; i < fieldReaders.length; i++) {
-                fieldReaders[i] =
-                        createRowArrowFieldReader(
-                                structVector.getVectorById(i), ((RowType) fieldType).getTypeAt(i));
-            }
-            return new RowFieldReader(structVector, fieldReaders);
-        } else {
-            throw new UnsupportedOperationException(
-                    String.format("Unsupported type %s.", fieldType));
-        }
-    }
-
     /**
      * Creates an {@link ArrowReader} for blink planner for the specified {@link VectorSchemaRoot}.
      */
@@ -580,11 +415,7 @@ public final class ArrowUtils {
     public static AbstractArrowTableSource createArrowTableSource(
             DataType dataType, String fileName) throws IOException {
         try (FileInputStream fis = new FileInputStream(fileName)) {
-            if (RowData.class.isAssignableFrom(dataType.getConversionClass())) {
-                return new ArrowTableSource(dataType, readArrowBatches(fis.getChannel()));
-            } else {
-                return new RowArrowTableSource(dataType, readArrowBatches(fis.getChannel()));
-            }
+            return new ArrowTableSource(dataType, readArrowBatches(fis.getChannel()));
         }
     }
 
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrayFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrayFieldReader.java
deleted file mode 100644
index 32130a1..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrayFieldReader.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.DateType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.TimeType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.arrow.vector.complex.ListVector;
-
-import java.lang.reflect.Array;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-
-/** {@link ArrowFieldReader} for Array. */
-@Internal
-public final class ArrayFieldReader extends ArrowFieldReader<Object[]> {
-
-    private final ArrowFieldReader arrayData;
-    private final Class<?> elementClass;
-
-    public ArrayFieldReader(
-            ListVector listVector, ArrowFieldReader arrayData, LogicalType elementType) {
-        super(listVector);
-        this.arrayData = Preconditions.checkNotNull(arrayData);
-        this.elementClass = getElementClass(elementType);
-    }
-
-    @Override
-    public Object[] read(int index) {
-        if (getValueVector().isNull(index)) {
-            return null;
-        } else {
-            int startIndex = index * ListVector.OFFSET_WIDTH;
-            int start = getValueVector().getOffsetBuffer().getInt(startIndex);
-            int end =
-                    getValueVector().getOffsetBuffer().getInt(startIndex + ListVector.OFFSET_WIDTH);
-            Object[] result = (Object[]) Array.newInstance(elementClass, end - start);
-            for (int i = 0; i < result.length; i++) {
-                result[i] = arrayData.read(start + i);
-            }
-            return result;
-        }
-    }
-
-    private Class<?> getElementClass(LogicalType elementType) {
-        DataType dataType = TypeConversions.fromLogicalToDataType(elementType);
-        if (elementType instanceof TimestampType) {
-            // the default conversion class is java.time.LocalDateTime
-            dataType = dataType.bridgedTo(Timestamp.class);
-        } else if (elementType instanceof DateType) {
-            // the default conversion class is java.time.LocalDate
-            dataType = dataType.bridgedTo(Date.class);
-        } else if (elementType instanceof TimeType) {
-            // the default conversion class is java.time.LocalTime
-            dataType = dataType.bridgedTo(Time.class);
-        }
-        return dataType.getConversionClass();
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrowFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrowFieldReader.java
deleted file mode 100644
index 6773b9e..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrowFieldReader.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.arrow.vector.ValueVector;
-
-/**
- * Base class for arrow field reader.
- *
- * @param <OUT> Type of the row to write.
- */
-@Internal
-public abstract class ArrowFieldReader<OUT> {
-
-    /** Container which is used to store the sequence of values of a column to read. */
-    private final ValueVector valueVector;
-
-    public ArrowFieldReader(ValueVector valueVector) {
-        this.valueVector = Preconditions.checkNotNull(valueVector);
-    }
-
-    /** Returns the underlying container which stores the sequence of values of a column to read. */
-    public ValueVector getValueVector() {
-        return valueVector;
-    }
-
-    /** Sets the field value as the specified value. */
-    public abstract OUT read(int index);
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BigIntFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BigIntFieldReader.java
deleted file mode 100644
index ef786db..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BigIntFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.BigIntVector;
-
-/** {@link ArrowFieldReader} for BigInt. */
-@Internal
-public final class BigIntFieldReader extends ArrowFieldReader<Long> {
-
-    public BigIntFieldReader(BigIntVector bigIntVector) {
-        super(bigIntVector);
-    }
-
-    @Override
-    public Long read(int index) {
-        return ((BigIntVector) getValueVector()).getObject(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BooleanFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BooleanFieldReader.java
deleted file mode 100644
index 17ca9aa..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BooleanFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.BitVector;
-
-/** {@link ArrowFieldReader} for Boolean. */
-@Internal
-public final class BooleanFieldReader extends ArrowFieldReader<Boolean> {
-
-    public BooleanFieldReader(BitVector bitVector) {
-        super(bitVector);
-    }
-
-    @Override
-    public Boolean read(int index) {
-        return ((BitVector) getValueVector()).getObject(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DateFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DateFieldReader.java
deleted file mode 100644
index 5eeb614..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DateFieldReader.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
-
-import org.apache.arrow.vector.DateDayVector;
-
-import java.sql.Date;
-
-/** {@link ArrowFieldReader} for Date. */
-@Internal
-public final class DateFieldReader extends ArrowFieldReader<Date> {
-
-    public DateFieldReader(DateDayVector dateDayVector) {
-        super(dateDayVector);
-    }
-
-    @Override
-    public Date read(int index) {
-        if (getValueVector().isNull(index)) {
-            return null;
-        } else {
-            return SqlDateTimeUtils.internalToDate(((DateDayVector) getValueVector()).get(index));
-        }
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DecimalFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DecimalFieldReader.java
deleted file mode 100644
index dc6744b..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DecimalFieldReader.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.DecimalVector;
-
-import java.math.BigDecimal;
-
-/** {@link ArrowFieldReader} for Decimal. */
-@Internal
-public final class DecimalFieldReader extends ArrowFieldReader<BigDecimal> {
-
-    public DecimalFieldReader(DecimalVector decimalVector) {
-        super(decimalVector);
-    }
-
-    @Override
-    public BigDecimal read(int index) {
-        return ((DecimalVector) getValueVector()).getObject(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DoubleFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DoubleFieldReader.java
deleted file mode 100644
index c7d7573..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DoubleFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.Float8Vector;
-
-/** {@link ArrowFieldReader} for Double. */
-@Internal
-public final class DoubleFieldReader extends ArrowFieldReader<Double> {
-
-    public DoubleFieldReader(Float8Vector doubleVector) {
-        super(doubleVector);
-    }
-
-    @Override
-    public Double read(int index) {
-        return ((Float8Vector) getValueVector()).getObject(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/FloatFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/FloatFieldReader.java
deleted file mode 100644
index 3276074..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/FloatFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.Float4Vector;
-
-/** {@link ArrowFieldReader} for Float. */
-@Internal
-public final class FloatFieldReader extends ArrowFieldReader<Float> {
-
-    public FloatFieldReader(Float4Vector floatVector) {
-        super(floatVector);
-    }
-
-    @Override
-    public Float read(int index) {
-        return ((Float4Vector) getValueVector()).getObject(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/IntFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/IntFieldReader.java
deleted file mode 100644
index 4e9a605..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/IntFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.IntVector;
-
-/** {@link ArrowFieldReader} for Int. */
-@Internal
-public final class IntFieldReader extends ArrowFieldReader<Integer> {
-
-    public IntFieldReader(IntVector intVector) {
-        super(intVector);
-    }
-
-    @Override
-    public Integer read(int index) {
-        return ((IntVector) getValueVector()).getObject(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowArrowReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowArrowReader.java
deleted file mode 100644
index 2d1403d..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowArrowReader.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.runtime.arrow.ArrowReader;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-/** {@link ArrowReader} which read the underlying Arrow format data as {@link Row}. */
-@Internal
-public final class RowArrowReader implements ArrowReader<Row> {
-
-    /**
-     * An array of readers which are responsible for the deserialization of each column of the rows.
-     */
-    private final ArrowFieldReader[] fieldReaders;
-
-    /** Reusable row used to hold the deserialized result. */
-    private final Row reuseRow;
-
-    public RowArrowReader(ArrowFieldReader[] fieldReaders) {
-        this.fieldReaders = Preconditions.checkNotNull(fieldReaders);
-        this.reuseRow = new Row(fieldReaders.length);
-    }
-
-    /** Gets the field readers. */
-    public ArrowFieldReader[] getFieldReaders() {
-        return fieldReaders;
-    }
-
-    @Override
-    public Row read(int rowId) {
-        for (int i = 0; i < fieldReaders.length; i++) {
-            reuseRow.setField(i, fieldReaders[i].read(rowId));
-        }
-        return reuseRow;
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowFieldReader.java
deleted file mode 100644
index 673bd0a..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowFieldReader.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.arrow.vector.complex.StructVector;
-
-/** {@link ArrowFieldReader} for Row. */
-@Internal
-public final class RowFieldReader extends ArrowFieldReader<Row> {
-
-    private final ArrowFieldReader[] fieldReaders;
-
-    public RowFieldReader(StructVector structVector, ArrowFieldReader[] fieldReaders) {
-        super(structVector);
-        this.fieldReaders = Preconditions.checkNotNull(fieldReaders);
-    }
-
-    @Override
-    public Row read(int index) {
-        if (getValueVector().isNull(index)) {
-            return null;
-        } else {
-            Row row = new Row(fieldReaders.length);
-            for (int i = 0; i < fieldReaders.length; i++) {
-                row.setField(i, fieldReaders[i].read(index));
-            }
-            return row;
-        }
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/SmallIntFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/SmallIntFieldReader.java
deleted file mode 100644
index 160dc51..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/SmallIntFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.SmallIntVector;
-
-/** {@link ArrowFieldReader} for SmallInt. */
-@Internal
-public final class SmallIntFieldReader extends ArrowFieldReader<Short> {
-
-    public SmallIntFieldReader(SmallIntVector smallIntVector) {
-        super(smallIntVector);
-    }
-
-    @Override
-    public Short read(int index) {
-        return ((SmallIntVector) getValueVector()).getObject(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimeFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimeFieldReader.java
deleted file mode 100644
index d29b737..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimeFieldReader.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.arrow.vector.TimeMicroVector;
-import org.apache.arrow.vector.TimeMilliVector;
-import org.apache.arrow.vector.TimeNanoVector;
-import org.apache.arrow.vector.TimeSecVector;
-import org.apache.arrow.vector.ValueVector;
-
-import java.sql.Time;
-import java.util.TimeZone;
-
-/** {@link ArrowFieldReader} for Time. */
-@Internal
-public final class TimeFieldReader extends ArrowFieldReader<Time> {
-
-    // The local time zone.
-    private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
-
-    public TimeFieldReader(ValueVector valueVector) {
-        super(valueVector);
-        Preconditions.checkState(
-                valueVector instanceof TimeSecVector
-                        || valueVector instanceof TimeMilliVector
-                        || valueVector instanceof TimeMicroVector
-                        || valueVector instanceof TimeNanoVector);
-    }
-
-    @Override
-    public Time read(int index) {
-        ValueVector valueVector = getValueVector();
-        if (valueVector.isNull(index)) {
-            return null;
-        } else {
-            long timeMilli;
-            if (valueVector instanceof TimeSecVector) {
-                timeMilli = ((TimeSecVector) getValueVector()).get(index) * 1000;
-            } else if (valueVector instanceof TimeMilliVector) {
-                timeMilli = ((TimeMilliVector) getValueVector()).get(index);
-            } else if (valueVector instanceof TimeMicroVector) {
-                timeMilli = ((TimeMicroVector) getValueVector()).get(index) / 1000;
-            } else {
-                timeMilli = ((TimeNanoVector) getValueVector()).get(index) / 1000000;
-            }
-            return new Time(timeMilli - LOCAL_TZ.getOffset(timeMilli));
-        }
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimestampFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimestampFieldReader.java
deleted file mode 100644
index 8dc29c7..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimestampFieldReader.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.arrow.vector.TimeStampMicroVector;
-import org.apache.arrow.vector.TimeStampMilliVector;
-import org.apache.arrow.vector.TimeStampNanoVector;
-import org.apache.arrow.vector.TimeStampSecVector;
-import org.apache.arrow.vector.TimeStampVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.types.pojo.ArrowType;
-
-import java.sql.Timestamp;
-
-/** {@link ArrowFieldReader} for Timestamp. */
-@Internal
-public final class TimestampFieldReader extends ArrowFieldReader<Timestamp> {
-
-    public TimestampFieldReader(ValueVector valueVector) {
-        super(valueVector);
-        Preconditions.checkState(
-                valueVector instanceof TimeStampVector
-                        && ((ArrowType.Timestamp) valueVector.getField().getType()).getTimezone()
-                                == null);
-    }
-
-    @Override
-    public Timestamp read(int i) {
-        ValueVector valueVector = getValueVector();
-        if (valueVector.isNull(i)) {
-            return null;
-        } else {
-            long millisecond;
-            if (valueVector instanceof TimeStampSecVector) {
-                millisecond = ((TimeStampSecVector) valueVector).get(i) * 1000;
-            } else if (valueVector instanceof TimeStampMilliVector) {
-                millisecond = ((TimeStampMilliVector) valueVector).get(i);
-            } else if (valueVector instanceof TimeStampMicroVector) {
-                millisecond = ((TimeStampMicroVector) valueVector).get(i) / 1000;
-            } else {
-                millisecond = ((TimeStampNanoVector) valueVector).get(i) / 1_000_000;
-            }
-            return PythonTypeUtils.internalToTimestamp(millisecond);
-        }
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TinyIntFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TinyIntFieldReader.java
deleted file mode 100644
index f27f936..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TinyIntFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.TinyIntVector;
-
-/** {@link ArrowFieldReader} for TinyInt. */
-@Internal
-public final class TinyIntFieldReader extends ArrowFieldReader<Byte> {
-
-    public TinyIntFieldReader(TinyIntVector tinyIntVector) {
-        super(tinyIntVector);
-    }
-
-    @Override
-    public Byte read(int index) {
-        return ((TinyIntVector) getValueVector()).getObject(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarBinaryFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarBinaryFieldReader.java
deleted file mode 100644
index 354d031..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarBinaryFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.VarBinaryVector;
-
-/** {@link ArrowFieldReader} for VarBinary. */
-@Internal
-public final class VarBinaryFieldReader extends ArrowFieldReader<byte[]> {
-
-    public VarBinaryFieldReader(VarBinaryVector varBinaryVector) {
-        super(varBinaryVector);
-    }
-
-    @Override
-    public byte[] read(int index) {
-        return ((VarBinaryVector) getValueVector()).get(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarCharFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarCharFieldReader.java
deleted file mode 100644
index d69b546..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarCharFieldReader.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.runtime.util.StringUtf8Utils;
-
-import org.apache.arrow.vector.VarCharVector;
-
-/** {@link ArrowFieldReader} for VarChar. */
-@Internal
-public final class VarCharFieldReader extends ArrowFieldReader<String> {
-
-    public VarCharFieldReader(VarCharVector varCharVector) {
-        super(varCharVector);
-    }
-
-    @Override
-    public String read(int index) {
-        if (getValueVector().isNull(index)) {
-            return null;
-        } else {
-            byte[] bytes = ((VarCharVector) getValueVector()).get(index);
-            return StringUtf8Utils.decodeUTF8(bytes, 0, bytes.length);
-        }
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/RowArrowSerializer.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/RowArrowSerializer.java
deleted file mode 100644
index d5527c7..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/RowArrowSerializer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.serializers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.runtime.arrow.ArrowReader;
-import org.apache.flink.table.runtime.arrow.ArrowUtils;
-import org.apache.flink.table.runtime.arrow.ArrowWriter;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import org.apache.arrow.vector.VectorSchemaRoot;
-
-/** It takes {@link Row} as the input type. */
-@Internal
-public class RowArrowSerializer extends ArrowSerializer<Row> {
-    public RowArrowSerializer(RowType inputType, RowType outputType) {
-        super(inputType, outputType);
-    }
-
-    @Override
-    public ArrowWriter<Row> createArrowWriter() {
-        return ArrowUtils.createRowArrowWriter(rootWriter, inputType);
-    }
-
-    @Override
-    public ArrowReader<Row> createArrowReader(VectorSchemaRoot root) {
-        return ArrowUtils.createRowArrowReader(root, outputType);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunction.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunction.java
deleted file mode 100644
index 39cefc2..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunction.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.sources;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.table.runtime.arrow.ArrowReader;
-import org.apache.flink.table.runtime.arrow.ArrowUtils;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-
-import org.apache.arrow.vector.VectorSchemaRoot;
-
-/** An Arrow {@link SourceFunction} which takes {@link Row} as the type of the produced records. */
-@Internal
-public class RowArrowSourceFunction extends AbstractArrowSourceFunction<Row> {
-
-    private static final long serialVersionUID = 1L;
-
-    RowArrowSourceFunction(DataType dataType, byte[][] arrowData) {
-        super(dataType, arrowData);
-    }
-
-    @Override
-    ArrowReader<Row> createArrowReader(VectorSchemaRoot root) {
-        return ArrowUtils.createRowArrowReader(root, (RowType) dataType.getLogicalType());
-    }
-
-    @Override
-    public TypeInformation<Row> getProducedType() {
-        return (TypeInformation<Row>) TypeConversions.fromDataTypeToLegacyInfo(dataType);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowTableSource.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowTableSource.java
deleted file mode 100644
index 463f69b..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowTableSource.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.sources;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.types.Row;
-
-/** An Arrow TableSource which takes {@link Row} as the type of the produced records. */
-@Internal
-public class RowArrowTableSource extends AbstractArrowTableSource<Row> {
-    public RowArrowTableSource(DataType dataType, byte[][] arrowData) {
-        super(dataType, arrowData);
-    }
-
-    @Override
-    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
-        return execEnv.addSource(new RowArrowSourceFunction(dataType, arrowData));
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
index 67bebc2..a5259e1 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
@@ -24,25 +24,17 @@ import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
 import org.apache.flink.api.common.typeutils.base.ByteSerializer;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
-import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.api.common.typeutils.base.ShortSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
 import org.apache.flink.table.runtime.typeutils.serializers.python.ArrayDataSerializer;
-import org.apache.flink.table.runtime.typeutils.serializers.python.BigDecSerializer;
-import org.apache.flink.table.runtime.typeutils.serializers.python.DateSerializer;
 import org.apache.flink.table.runtime.typeutils.serializers.python.DecimalDataSerializer;
 import org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer;
 import org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer;
-import org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer;
-import org.apache.flink.table.runtime.typeutils.serializers.python.TimeSerializer;
-import org.apache.flink.table.runtime.typeutils.serializers.python.TimestampSerializer;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BinaryType;
@@ -68,12 +60,8 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 
-import java.lang.reflect.Array;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
 import java.util.TimeZone;
 
 /**
@@ -113,10 +101,6 @@ public final class PythonTypeUtils {
         return logicalType.accept(new PythonTypeUtils.LogicalTypeToProtoTypeConverter());
     }
 
-    public static TypeSerializer toFlinkTypeSerializer(LogicalType logicalType) {
-        return logicalType.accept(new LogicalTypeToTypeSerializerConverter());
-    }
-
     public static TypeSerializer toBlinkTypeSerializer(LogicalType logicalType) {
         return logicalType.accept(new LogicalTypeToBlinkTypeSerializerConverter());
     }
@@ -163,16 +147,6 @@ public final class PythonTypeUtils {
     }
 
     /**
-     * Converts the internal representation of a SQL TIMESTAMP (long) to the Java type used for UDF
-     * parameters ({@link java.sql.Timestamp}).
-     *
-     * <p>Note: The implementation refers to {@link SqlDateTimeUtils#internalToTimestamp}.
-     */
-    public static java.sql.Timestamp internalToTimestamp(long v) {
-        return new java.sql.Timestamp(v - LOCAL_TZ.getOffset(v));
-    }
-
-    /**
      * Converts the Java type used for UDF parameters of SQL TIMESTAMP type ({@link
      * java.sql.Timestamp}) to internal representation (long).
      *
@@ -183,43 +157,7 @@ public final class PythonTypeUtils {
         return time + LOCAL_TZ.getOffset(time);
     }
 
-    /** Convert LogicalType to conversion class for flink planner. */
-    public static class LogicalTypeToConversionClassConverter
-            extends LogicalTypeDefaultVisitor<Class> {
-
-        public static final LogicalTypeToConversionClassConverter INSTANCE =
-                new LogicalTypeToConversionClassConverter();
-
-        private LogicalTypeToConversionClassConverter() {}
-
-        @Override
-        public Class visit(DateType dateType) {
-            return Date.class;
-        }
-
-        @Override
-        public Class visit(TimeType timeType) {
-            return Time.class;
-        }
-
-        @Override
-        public Class visit(TimestampType timestampType) {
-            return Timestamp.class;
-        }
-
-        @Override
-        public Class visit(ArrayType arrayType) {
-            Class elementClass = arrayType.getElementType().accept(this);
-            return Array.newInstance(elementClass, 0).getClass();
-        }
-
-        @Override
-        protected Class defaultMethod(LogicalType logicalType) {
-            return logicalType.getDefaultConversion();
-        }
-    }
-
-    private static class LogicalTypeToTypeSerializerConverter
+    private static class LogicalTypeToBlinkTypeSerializerConverter
             extends LogicalTypeDefaultVisitor<TypeSerializer> {
         @Override
         public TypeSerializer visit(BooleanType booleanType) {
@@ -267,78 +205,6 @@ public final class PythonTypeUtils {
         }
 
         @Override
-        public TypeSerializer visit(VarCharType varCharType) {
-            return StringSerializer.INSTANCE;
-        }
-
-        @Override
-        public TypeSerializer visit(CharType charType) {
-            return StringSerializer.INSTANCE;
-        }
-
-        @Override
-        public TypeSerializer visit(DateType dateType) {
-            return DateSerializer.INSTANCE;
-        }
-
-        @Override
-        public TypeSerializer visit(TimeType timeType) {
-            return TimeSerializer.INSTANCE;
-        }
-
-        @Override
-        public TypeSerializer visit(TimestampType timestampType) {
-            return new TimestampSerializer(timestampType.getPrecision());
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public TypeSerializer visit(ArrayType arrayType) {
-            LogicalType elementType = arrayType.getElementType();
-            TypeSerializer<?> elementTypeSerializer = elementType.accept(this);
-            Class<?> elementClass =
-                    elementType.accept(LogicalTypeToConversionClassConverter.INSTANCE);
-            return new GenericArraySerializer(elementClass, elementTypeSerializer);
-        }
-
-        @Override
-        public TypeSerializer visit(MapType mapType) {
-            TypeSerializer<?> keyTypeSerializer = mapType.getKeyType().accept(this);
-            TypeSerializer<?> valueTypeSerializer = mapType.getValueType().accept(this);
-            return new MapSerializer<>(keyTypeSerializer, valueTypeSerializer);
-        }
-
-        @Override
-        public TypeSerializer visit(RowType rowType) {
-            final TypeSerializer[] fieldTypeSerializers =
-                    rowType.getFields().stream()
-                            .map(f -> f.getType().accept(this))
-                            .toArray(TypeSerializer[]::new);
-            return new RowSerializer(fieldTypeSerializers);
-        }
-
-        @Override
-        protected TypeSerializer defaultMethod(LogicalType logicalType) {
-            if (logicalType instanceof LegacyTypeInformationType) {
-                Class<?> typeClass =
-                        ((LegacyTypeInformationType) logicalType)
-                                .getTypeInformation()
-                                .getTypeClass();
-                if (typeClass == BigDecimal.class) {
-                    return BigDecSerializer.INSTANCE;
-                }
-            }
-            throw new UnsupportedOperationException(
-                    String.format(
-                            "Python UDF doesn't support logical type %s currently.",
-                            logicalType.asSummaryString()));
-        }
-    }
-
-    private static class LogicalTypeToBlinkTypeSerializerConverter
-            extends LogicalTypeToTypeSerializerConverter {
-
-        @Override
         public TypeSerializer visit(RowType rowType) {
             final TypeSerializer[] fieldTypeSerializers =
                     rowType.getFields().stream()
@@ -398,6 +264,14 @@ public final class PythonTypeUtils {
         public TypeSerializer visit(DecimalType decimalType) {
             return new DecimalDataSerializer(decimalType.getPrecision(), decimalType.getScale());
         }
+
+        @Override
+        protected TypeSerializer defaultMethod(LogicalType logicalType) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Python UDF doesn't support logical type %s currently.",
+                            logicalType.asSummaryString()));
+        }
     }
 
     /** Converter That convert the logicalType to the related Prototype. */
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java
index 13a8d4a..33258c2 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java
@@ -18,27 +18,10 @@
 
 package org.apache.flink.table.runtime.arrow;
 
-import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple6;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.runtime.arrow.readers.ArrayFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.ArrowFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.BooleanFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DateFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DecimalFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DoubleFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.FloatFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.IntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.RowArrowReader;
-import org.apache.flink.table.runtime.arrow.readers.RowFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.SmallIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TimeFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TimestampFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TinyIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.VarBinaryFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.VarCharFieldReader;
 import org.apache.flink.table.runtime.arrow.vectors.ArrowArrayColumnVector;
 import org.apache.flink.table.runtime.arrow.vectors.ArrowBigIntColumnVector;
 import org.apache.flink.table.runtime.arrow.vectors.ArrowBooleanColumnVector;
@@ -103,7 +86,6 @@ import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.types.Row;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
@@ -132,8 +114,7 @@ import static org.junit.Assert.assertEquals;
 /** Tests for {@link ArrowUtils}. */
 public class ArrowUtilsTest {
 
-    private static List<
-                    Tuple7<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>, Class<?>>>
+    private static List<Tuple6<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>>>
             testFields;
     private static RowType rowType;
     private static BufferAllocator allocator;
@@ -142,243 +123,219 @@ public class ArrowUtilsTest {
     public static void init() {
         testFields = new ArrayList<>();
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f1",
                         new TinyIntType(),
                         new ArrowType.Int(8, true),
                         RowTinyIntWriter.class,
                         TinyIntWriter.TinyIntWriterForRow.class,
-                        TinyIntFieldReader.class,
                         ArrowTinyIntColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f2",
                         new SmallIntType(),
                         new ArrowType.Int(8 * 2, true),
                         RowSmallIntWriter.class,
                         SmallIntWriter.SmallIntWriterForRow.class,
-                        SmallIntFieldReader.class,
                         ArrowSmallIntColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f3",
                         new IntType(),
                         new ArrowType.Int(8 * 4, true),
                         RowIntWriter.class,
                         IntWriter.IntWriterForRow.class,
-                        IntFieldReader.class,
                         ArrowIntColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f4",
                         new BigIntType(),
                         new ArrowType.Int(8 * 8, true),
                         RowBigIntWriter.class,
                         BigIntWriter.BigIntWriterForRow.class,
-                        BigIntFieldReader.class,
                         ArrowBigIntColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f5",
                         new BooleanType(),
                         new ArrowType.Bool(),
                         RowBooleanWriter.class,
                         BooleanWriter.BooleanWriterForRow.class,
-                        BooleanFieldReader.class,
                         ArrowBooleanColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f6",
                         new FloatType(),
                         new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE),
                         RowFloatWriter.class,
                         FloatWriter.FloatWriterForRow.class,
-                        FloatFieldReader.class,
                         ArrowFloatColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f7",
                         new DoubleType(),
                         new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE),
                         RowDoubleWriter.class,
                         DoubleWriter.DoubleWriterForRow.class,
-                        DoubleFieldReader.class,
                         ArrowDoubleColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f8",
                         new VarCharType(),
                         ArrowType.Utf8.INSTANCE,
                         RowVarCharWriter.class,
                         VarCharWriter.VarCharWriterForRow.class,
-                        VarCharFieldReader.class,
                         ArrowVarCharColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f9",
                         new VarBinaryType(),
                         ArrowType.Binary.INSTANCE,
                         RowVarBinaryWriter.class,
                         VarBinaryWriter.VarBinaryWriterForRow.class,
-                        VarBinaryFieldReader.class,
                         ArrowVarBinaryColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f10",
                         new DecimalType(10, 3),
                         new ArrowType.Decimal(10, 3),
                         RowDecimalWriter.class,
                         DecimalWriter.DecimalWriterForRow.class,
-                        DecimalFieldReader.class,
                         ArrowDecimalColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f11",
                         new DateType(),
                         new ArrowType.Date(DateUnit.DAY),
                         RowDateWriter.class,
                         DateWriter.DateWriterForRow.class,
-                        DateFieldReader.class,
                         ArrowDateColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f13",
                         new TimeType(0),
                         new ArrowType.Time(TimeUnit.SECOND, 32),
                         RowTimeWriter.class,
                         TimeWriter.TimeWriterForRow.class,
-                        TimeFieldReader.class,
                         ArrowTimeColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f14",
                         new TimeType(2),
                         new ArrowType.Time(TimeUnit.MILLISECOND, 32),
                         RowTimeWriter.class,
                         TimeWriter.TimeWriterForRow.class,
-                        TimeFieldReader.class,
                         ArrowTimeColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f15",
                         new TimeType(4),
                         new ArrowType.Time(TimeUnit.MICROSECOND, 64),
                         RowTimeWriter.class,
                         TimeWriter.TimeWriterForRow.class,
-                        TimeFieldReader.class,
                         ArrowTimeColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f16",
                         new TimeType(8),
                         new ArrowType.Time(TimeUnit.NANOSECOND, 64),
                         RowTimeWriter.class,
                         TimeWriter.TimeWriterForRow.class,
-                        TimeFieldReader.class,
                         ArrowTimeColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f17",
                         new LocalZonedTimestampType(0),
                         new ArrowType.Timestamp(TimeUnit.SECOND, null),
                         RowTimestampWriter.class,
                         TimestampWriter.TimestampWriterForRow.class,
-                        TimestampFieldReader.class,
                         ArrowTimestampColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f18",
                         new LocalZonedTimestampType(2),
                         new ArrowType.Timestamp(TimeUnit.MILLISECOND, null),
                         RowTimestampWriter.class,
                         TimestampWriter.TimestampWriterForRow.class,
-                        TimestampFieldReader.class,
                         ArrowTimestampColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f19",
                         new LocalZonedTimestampType(4),
                         new ArrowType.Timestamp(TimeUnit.MICROSECOND, null),
                         RowTimestampWriter.class,
                         TimestampWriter.TimestampWriterForRow.class,
-                        TimestampFieldReader.class,
                         ArrowTimestampColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f20",
                         new LocalZonedTimestampType(8),
                         new ArrowType.Timestamp(TimeUnit.NANOSECOND, null),
                         RowTimestampWriter.class,
                         TimestampWriter.TimestampWriterForRow.class,
-                        TimestampFieldReader.class,
                         ArrowTimestampColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f21",
                         new TimestampType(0),
                         new ArrowType.Timestamp(TimeUnit.SECOND, null),
                         RowTimestampWriter.class,
                         TimestampWriter.TimestampWriterForRow.class,
-                        TimestampFieldReader.class,
                         ArrowTimestampColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f22",
                         new TimestampType(2),
                         new ArrowType.Timestamp(TimeUnit.MILLISECOND, null),
                         RowTimestampWriter.class,
                         TimestampWriter.TimestampWriterForRow.class,
-                        TimestampFieldReader.class,
                         ArrowTimestampColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f23",
                         new TimestampType(4),
                         new ArrowType.Timestamp(TimeUnit.MICROSECOND, null),
                         RowTimestampWriter.class,
                         TimestampWriter.TimestampWriterForRow.class,
-                        TimestampFieldReader.class,
                         ArrowTimestampColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f24",
                         new TimestampType(8),
                         new ArrowType.Timestamp(TimeUnit.NANOSECOND, null),
                         RowTimestampWriter.class,
                         TimestampWriter.TimestampWriterForRow.class,
-                        TimestampFieldReader.class,
                         ArrowTimestampColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f25",
                         new ArrayType(new VarCharType()),
                         ArrowType.List.INSTANCE,
                         RowArrayWriter.class,
                         ArrayWriter.ArrayWriterForRow.class,
-                        ArrayFieldReader.class,
                         ArrowArrayColumnVector.class));
 
         RowType rowFieldType =
@@ -396,17 +353,16 @@ public class ArrowUtilsTest {
                                                         new RowType.RowField(
                                                                 "e2", new VarCharType())))))));
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f26",
                         rowFieldType,
                         ArrowType.Struct.INSTANCE,
                         RowRowWriter.class,
                         RowWriter.RowWriterForRow.class,
-                        RowFieldReader.class,
                         ArrowRowColumnVector.class));
 
         List<RowType.RowField> rowFields = new ArrayList<>();
-        for (Tuple7<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>, Class<?>> field :
+        for (Tuple6<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>> field :
                 testFields) {
             rowFields.add(new RowType.RowField(field.f0, field.f1));
         }
@@ -429,35 +385,13 @@ public class ArrowUtilsTest {
     }
 
     @Test
-    public void testCreateRowArrowReader() {
-        VectorSchemaRoot root =
-                VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
-        RowArrowReader reader = ArrowUtils.createRowArrowReader(root, rowType);
-        ArrowFieldReader[] fieldReaders = reader.getFieldReaders();
-        for (int i = 0; i < fieldReaders.length; i++) {
-            assertEquals(testFields.get(i).f5, fieldReaders[i].getClass());
-        }
-    }
-
-    @Test
     public void testCreateRowDataArrowReader() {
         VectorSchemaRoot root =
                 VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
         RowDataArrowReader reader = ArrowUtils.createRowDataArrowReader(root, rowType);
         ColumnVector[] columnVectors = reader.getColumnVectors();
         for (int i = 0; i < columnVectors.length; i++) {
-            assertEquals(testFields.get(i).f6, columnVectors[i].getClass());
-        }
-    }
-
-    @Test
-    public void testCreateRowArrowWriter() {
-        VectorSchemaRoot root =
-                VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
-        ArrowWriter<Row> writer = ArrowUtils.createRowArrowWriter(root, rowType);
-        ArrowFieldWriter<Row>[] fieldWriters = writer.getFieldWriters();
-        for (int i = 0; i < fieldWriters.length; i++) {
-            assertEquals(testFields.get(i).f3, fieldWriters[i].getClass());
+            assertEquals(testFields.get(i).f5, columnVectors[i].getClass());
         }
     }
 
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/RowArrowReaderWriterTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/RowArrowReaderWriterTest.java
deleted file mode 100644
index 22c9e1b..0000000
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/RowArrowReaderWriterTest.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
-import org.apache.flink.table.types.logical.ArrayType;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.BooleanType;
-import org.apache.flink.table.types.logical.DateType;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.DoubleType;
-import org.apache.flink.table.types.logical.FloatType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.SmallIntType;
-import org.apache.flink.table.types.logical.TimeType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.TinyIntType;
-import org.apache.flink.table.types.logical.VarBinaryType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.types.Row;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.ipc.ArrowStreamWriter;
-import org.junit.BeforeClass;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.math.BigDecimal;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/** Tests for {@link ArrowReader} and {@link ArrowWriter} of Row. */
-public class RowArrowReaderWriterTest extends ArrowReaderWriterTestBase<Row> {
-    private static RowType rowType;
-    private static BufferAllocator allocator;
-
-    @BeforeClass
-    public static void init() {
-        List<LogicalType> fieldTypes = new ArrayList<>();
-        fieldTypes.add(new TinyIntType());
-        fieldTypes.add(new SmallIntType());
-        fieldTypes.add(new IntType());
-        fieldTypes.add(new BigIntType());
-        fieldTypes.add(new BooleanType());
-        fieldTypes.add(new FloatType());
-        fieldTypes.add(new DoubleType());
-        fieldTypes.add(new VarCharType());
-        fieldTypes.add(new VarBinaryType());
-        fieldTypes.add(new DecimalType(10, 0));
-        fieldTypes.add(new DateType());
-        fieldTypes.add(new TimeType(0));
-        fieldTypes.add(new TimeType(2));
-        fieldTypes.add(new TimeType(4));
-        fieldTypes.add(new TimeType(8));
-        fieldTypes.add(new LocalZonedTimestampType(0));
-        fieldTypes.add(new LocalZonedTimestampType(2));
-        fieldTypes.add(new LocalZonedTimestampType(4));
-        fieldTypes.add(new LocalZonedTimestampType(8));
-        fieldTypes.add(new TimestampType(0));
-        fieldTypes.add(new TimestampType(2));
-        fieldTypes.add(new TimestampType(4));
-        fieldTypes.add(new TimestampType(8));
-        fieldTypes.add(new ArrayType(new VarCharType()));
-        fieldTypes.add(
-                new RowType(
-                        Arrays.asList(
-                                new RowType.RowField("a", new IntType()),
-                                new RowType.RowField("b", new VarCharType()),
-                                new RowType.RowField("c", new ArrayType(new VarCharType())),
-                                new RowType.RowField("d", new TimestampType(2)),
-                                new RowType.RowField(
-                                        "e",
-                                        new RowType(
-                                                Arrays.asList(
-                                                        new RowType.RowField("e1", new IntType()),
-                                                        new RowType.RowField(
-                                                                "e2", new VarCharType())))))));
-
-        List<RowType.RowField> rowFields = new ArrayList<>();
-        for (int i = 0; i < fieldTypes.size(); i++) {
-            rowFields.add(new RowType.RowField("f" + i, fieldTypes.get(i)));
-        }
-        rowType = new RowType(rowFields);
-        allocator = ArrowUtils.getRootAllocator().newChildAllocator("stdout", 0, Long.MAX_VALUE);
-    }
-
-    @Override
-    public ArrowReader<Row> createArrowReader(InputStream inputStream) throws IOException {
-        ArrowStreamReader reader = new ArrowStreamReader(inputStream, allocator);
-        reader.loadNextBatch();
-        return ArrowUtils.createRowArrowReader(reader.getVectorSchemaRoot(), rowType);
-    }
-
-    @Override
-    public Tuple2<ArrowWriter<Row>, ArrowStreamWriter> createArrowWriter(OutputStream outputStream)
-            throws IOException {
-        VectorSchemaRoot root =
-                VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
-        ArrowWriter<Row> arrowWriter = ArrowUtils.createRowArrowWriter(root, rowType);
-        ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(root, null, outputStream);
-        arrowStreamWriter.start();
-        return Tuple2.of(arrowWriter, arrowStreamWriter);
-    }
-
-    @Override
-    public Row[] getTestData() {
-        Row row1 =
-                Row.of(
-                        (byte) 1,
-                        (short) 2,
-                        3,
-                        4L,
-                        true,
-                        1.0f,
-                        1.0,
-                        "hello",
-                        "hello".getBytes(),
-                        new BigDecimal(1),
-                        SqlDateTimeUtils.internalToDate(100),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new String[] {null, null, null},
-                        Row.of(
-                                1,
-                                "hello",
-                                new String[] {null, null, null},
-                                new Timestamp(3600000),
-                                Row.of(1, "hello")));
-        Row row2 =
-                Row.of(
-                        null,
-                        (short) 2,
-                        3,
-                        4L,
-                        false,
-                        1.0f,
-                        1.0,
-                        "中文",
-                        "中文".getBytes(),
-                        new BigDecimal(1),
-                        SqlDateTimeUtils.internalToDate(100),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new String[] {"hello", "中文", null},
-                        Row.of(
-                                1,
-                                "hello",
-                                new String[] {"hello", "中文", null},
-                                new Timestamp(3600000),
-                                Row.of(1, "hello")));
-        Row row3 =
-                Row.of(
-                        (byte) 1,
-                        null,
-                        3,
-                        4L,
-                        true,
-                        1.0f,
-                        1.0,
-                        "hello",
-                        "hello".getBytes(),
-                        new BigDecimal(1),
-                        SqlDateTimeUtils.internalToDate(100),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        null,
-                        null);
-        Row row4 = new Row(rowType.getFieldCount());
-        return new Row[] {row1, row2, row3, row4};
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java
deleted file mode 100644
index 329bcec..0000000
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.sources;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
-import org.apache.flink.table.runtime.arrow.ArrowUtils;
-import org.apache.flink.table.runtime.arrow.ArrowWriter;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.junit.BeforeClass;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-
-/** Tests for {@link RowArrowSourceFunction}. */
-public class RowArrowSourceFunctionTest extends ArrowSourceFunctionTestBase<Row> {
-
-    private static List<LogicalType> fieldTypes = new ArrayList<>();
-    private static RowType rowType;
-    private static DataType dataType;
-    private static BufferAllocator allocator;
-
-    public RowArrowSourceFunctionTest() {
-        super(
-                VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator),
-                new RowSerializer(new TypeSerializer[] {StringSerializer.INSTANCE}),
-                Comparator.comparing(o -> (String) (o.getField(0))));
-    }
-
-    @BeforeClass
-    public static void init() {
-        fieldTypes.add(new VarCharType());
-        List<RowType.RowField> rowFields = new ArrayList<>();
-        for (int i = 0; i < fieldTypes.size(); i++) {
-            rowFields.add(new RowType.RowField("f" + i, fieldTypes.get(i)));
-        }
-        rowType = new RowType(rowFields);
-        dataType = TypeConversions.fromLogicalToDataType(rowType);
-        allocator = ArrowUtils.getRootAllocator().newChildAllocator("stdout", 0, Long.MAX_VALUE);
-    }
-
-    @Override
-    public Tuple2<List<Row>, Integer> getTestData() {
-        return Tuple2.of(
-                Arrays.asList(
-                        Row.of("aaa"), Row.of("bbb"), Row.of("ccc"), Row.of("ddd"), Row.of("eee")),
-                3);
-    }
-
-    @Override
-    public ArrowWriter<Row> createArrowWriter() {
-        return ArrowUtils.createRowArrowWriter(root, rowType);
-    }
-
-    @Override
-    public AbstractArrowSourceFunction<Row> createArrowSourceFunction(byte[][] arrowData) {
-        return new RowArrowSourceFunction(dataType, arrowData);
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java
index 8e0b2ec..ab2f1c1 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java
@@ -19,12 +19,9 @@
 package org.apache.flink.table.runtime.typeutils;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
-import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.DateType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
@@ -32,7 +29,6 @@ import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.Test;
 
-import java.sql.Date;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -43,17 +39,6 @@ import static org.junit.Assert.assertTrue;
 public class PythonTypeUtilsTest {
 
     @Test
-    public void testLogicalTypeToFlinkTypeSerializer() {
-        List<RowType.RowField> rowFields = new ArrayList<>();
-        rowFields.add(new RowType.RowField("f1", new BigIntType()));
-        RowType rowType = new RowType(rowFields);
-        TypeSerializer rowSerializer = PythonTypeUtils.toFlinkTypeSerializer(rowType);
-        assertTrue(rowSerializer instanceof RowSerializer);
-
-        assertEquals(1, ((RowSerializer) rowSerializer).getArity());
-    }
-
-    @Test
     public void testLogicalTypeToBlinkTypeSerializer() {
         List<RowType.RowField> rowFields = new ArrayList<>();
         rowFields.add(new RowType.RowField("f1", new BigIntType()));
@@ -85,19 +70,10 @@ public class PythonTypeUtilsTest {
         String expectedTestException =
                 "Python UDF doesn't support logical type `cat`.`db`.`MyType` currently.";
         try {
-            PythonTypeUtils.toFlinkTypeSerializer(logicalType);
+            PythonTypeUtils.toBlinkTypeSerializer(logicalType);
         } catch (Exception e) {
             assertTrue(
                     ExceptionUtils.findThrowableWithMessage(e, expectedTestException).isPresent());
         }
     }
-
-    @Test
-    public void testLogicalTypeToConversionClassConverter() {
-        PythonTypeUtils.LogicalTypeToConversionClassConverter converter =
-                PythonTypeUtils.LogicalTypeToConversionClassConverter.INSTANCE;
-        ArrayType arrayType = new ArrayType(new ArrayType(new DateType()));
-        Class<?> conversionClass = converter.visit(arrayType);
-        assertEquals(Date[][].class, conversionClass);
-    }
 }

[flink] 01/02: [FLINK-22619][python] Drop usages of BatchTableEnvironment and old planner in Python

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9fe0ec7caa5fdc7d22e6655e557df49b6263ae76
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed May 19 09:20:31 2021 +0200

    [FLINK-22619][python] Drop usages of BatchTableEnvironment and old planner in Python
    
    This is a major cleanup of the Python module that drops support for
    BatchTableEnvironment and old planner.
    
    Removes usages of:
    - DataSet
    - BatchTableEnvironment
    - Legacy planner
    - ExecutionEnvironment
---
 .../docs/dev/python/table/table_environment.md     |   2 +-
 .../docs/dev/python/table/table_environment.md     |   2 +-
 .../flink-python-test/python/python_job.py         |  16 +-
 .../python/tests/FlinkBatchPythonUdfSqlJob.java    |  56 --
 .../python/tests/FlinkStreamPythonUdfSqlJob.java   |  63 ---
 .../test-scripts/test_pyflink.sh                   |  20 -
 flink-python/dev/integration_test.sh               |   3 -
 flink-python/dev/pip_test_code.py                  |  12 +-
 flink-python/docs/index.rst                        |   1 -
 flink-python/docs/pyflink.dataset.rst              |  28 -
 flink-python/docs/pyflink.rst                      |   1 -
 flink-python/pom.xml                               |   8 +-
 .../pyflink/common/tests/test_execution_config.py  |   8 +-
 flink-python/pyflink/dataset/__init__.py           |  27 -
 .../pyflink/dataset/execution_environment.py       | 197 -------
 flink-python/pyflink/dataset/tests/__init__.py     |  17 -
 .../dataset/tests/test_execution_environment.py    | 137 -----
 .../test_execution_environment_completeness.py     |  64 ---
 ...st_stream_execution_environment_completeness.py |   4 +-
 flink-python/pyflink/shell.py                      |  35 --
 flink-python/pyflink/table/__init__.py             |   4 +-
 .../pyflink/table/examples/batch/word_count.py     |   8 +-
 flink-python/pyflink/table/table.py                |   2 +-
 flink-python/pyflink/table/table_environment.py    | 200 +------
 flink-python/pyflink/table/tests/test_calc.py      |  46 +-
 .../pyflink/table/tests/test_dependency.py         |  33 +-
 .../pyflink/table/tests/test_descriptor.py         |  56 +-
 .../pyflink/table/tests/test_pandas_conversion.py  |   7 +-
 .../pyflink/table/tests/test_pandas_udf.py         |  30 +-
 .../pyflink/table/tests/test_set_operation.py      |   4 +-
 .../pyflink/table/tests/test_shell_example.py      |  34 --
 flink-python/pyflink/table/tests/test_sort.py      |   4 +-
 flink-python/pyflink/table/tests/test_sql.py       |  12 +-
 .../table/tests/test_table_environment_api.py      | 629 +--------------------
 flink-python/pyflink/table/tests/test_udf.py       |  23 +-
 flink-python/pyflink/table/tests/test_udtf.py      |  29 +-
 flink-python/pyflink/testing/test_case_utils.py    |  90 +--
 flink-python/setup.py                              |   1 -
 .../flink/table/runtime/arrow/ArrowUtils.java      |  82 +--
 .../AbstractPythonScalarFunctionFlatMap.java       | 119 ----
 .../AbstractPythonStatelessFunctionFlatMap.java    | 312 ----------
 .../python/PythonScalarFunctionFlatMap.java        |  94 ---
 .../python/PythonTableFunctionFlatMap.java         | 174 ------
 .../arrow/ArrowPythonScalarFunctionFlatMap.java    | 131 -----
 .../AbstractRowPythonScalarFunctionOperator.java   |  91 ---
 .../scalar/PythonScalarFunctionOperator.java       |  82 ---
 .../arrow/ArrowPythonScalarFunctionOperator.java   | 135 -----
 .../python/table/PythonTableFunctionOperator.java  | 142 -----
 .../utils/StreamRecordCRowWrappingCollector.java   |  53 --
 .../client/python/PythonFunctionFactoryTest.java   |  29 -
 .../scalar/PythonScalarFunctionOperatorTest.java   | 105 ----
 .../ArrowPythonScalarFunctionOperatorTest.java     | 103 ----
 .../table/PythonTableFunctionOperatorTest.java     |  92 ---
 flink-python/tox.ini                               |   2 +-
 54 files changed, 84 insertions(+), 3575 deletions(-)

diff --git a/docs/content.zh/docs/dev/python/table/table_environment.md b/docs/content.zh/docs/dev/python/table/table_environment.md
index 75c925f..0ba2738 100644
--- a/docs/content.zh/docs/dev/python/table/table_environment.md
+++ b/docs/content.zh/docs/dev/python/table/table_environment.md
@@ -48,7 +48,7 @@ table_env = TableEnvironment.create(env_settings)
 
 ```python
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import StreamTableEnvironment, BatchTableEnvironment, TableConfig
+from pyflink.table import StreamTableEnvironment
 
 # create a blink streaming TableEnvironment from a StreamExecutionEnvironment
 env = StreamExecutionEnvironment.get_execution_environment()
diff --git a/docs/content/docs/dev/python/table/table_environment.md b/docs/content/docs/dev/python/table/table_environment.md
index ea50ebd..d00e1d2 100644
--- a/docs/content/docs/dev/python/table/table_environment.md
+++ b/docs/content/docs/dev/python/table/table_environment.md
@@ -49,7 +49,7 @@ Alternatively, users can create a `StreamTableEnvironment` from an existing `Str
 
 ```python
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import StreamTableEnvironment, BatchTableEnvironment, TableConfig
+from pyflink.table import StreamTableEnvironment
 
 # create a blink streaming TableEnvironment from a StreamExecutionEnvironment
 env = StreamExecutionEnvironment.get_execution_environment()
diff --git a/flink-end-to-end-tests/flink-python-test/python/python_job.py b/flink-end-to-end-tests/flink-python-test/python/python_job.py
index a85e633..2afcf19 100644
--- a/flink-end-to-end-tests/flink-python-test/python/python_job.py
+++ b/flink-end-to-end-tests/flink-python-test/python/python_job.py
@@ -21,8 +21,7 @@ import shutil
 import sys
 import tempfile
 
-from pyflink.dataset import ExecutionEnvironment
-from pyflink.table import BatchTableEnvironment, TableConfig
+from pyflink.table import EnvironmentSettings, TableEnvironment
 
 
 def word_count():
@@ -34,9 +33,8 @@ def word_count():
               "License you may not use this file except in compliance " \
               "with the License"
 
-    t_config = TableConfig()
-    env = ExecutionEnvironment.get_execution_environment()
-    t_env = BatchTableEnvironment.create(env, t_config)
+    env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+    t_env = TableEnvironment.create(environment_settings=env_settings)
 
     # used to test pipeline.jars and pipleline.classpaths
     config_key = sys.argv[1]
@@ -68,9 +66,9 @@ def word_count():
             'connector.path' = '{}'
         )
         """.format(result_path)
-    t_env.sql_update(sink_ddl)
+    t_env.execute_sql(sink_ddl)
 
-    t_env.sql_update("create temporary system function add_one as 'add_one.add_one' language python")
+    t_env.execute_sql("create temporary system function add_one as 'add_one.add_one' language python")
     t_env.register_java_function("add_one_java", "org.apache.flink.python.tests.util.AddOne")
 
     elements = [(word, 0) for word in content.split(" ")]
@@ -78,9 +76,7 @@ def word_count():
         .select("word, add_one(count) as count, add_one_java(count) as count_java") \
         .group_by("word") \
         .select("word, count(count) as count, count(count_java) as count_java") \
-        .insert_into("Results")
-
-    t_env.execute("word_count")
+        .execute_insert("Results")
 
 
 if __name__ == '__main__':
diff --git a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkBatchPythonUdfSqlJob.java b/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkBatchPythonUdfSqlJob.java
deleted file mode 100644
index 3f6768c..0000000
--- a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkBatchPythonUdfSqlJob.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.python.tests;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
-import org.apache.flink.types.Row;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-/** A simple job used to test submitting the Python UDF job in flink batch mode. */
-public class FlinkBatchPythonUdfSqlJob {
-
-    public static void main(String[] args) {
-        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
-        tEnv.executeSql(
-                "create temporary system function add_one as 'add_one.add_one' language python");
-
-        tEnv.createTemporaryView("source", tEnv.fromDataSet(env.fromElements(1L, 2L, 3L)).as("a"));
-
-        Iterator<Row> result = tEnv.executeSql("select add_one(a) as a from source").collect();
-
-        List<Long> actual = new ArrayList<>();
-        while (result.hasNext()) {
-            Row r = result.next();
-            actual.add((Long) r.getField(0));
-        }
-
-        List<Long> expected = Arrays.asList(2L, 3L, 4L);
-        if (!actual.equals(expected)) {
-            throw new AssertionError(
-                    String.format(
-                            "The output result: %s is not as expected: %s!", actual, expected));
-        }
-    }
-}
diff --git a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkStreamPythonUdfSqlJob.java b/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkStreamPythonUdfSqlJob.java
deleted file mode 100644
index f90767e..0000000
--- a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkStreamPythonUdfSqlJob.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.python.tests;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-/** A simple job used to test submitting the Python UDF job in flink stream mode. */
-public class FlinkStreamPythonUdfSqlJob {
-
-    public static void main(String[] args) throws Exception {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-        StreamTableEnvironment tEnv =
-                StreamTableEnvironment.create(
-                        env,
-                        EnvironmentSettings.newInstance()
-                                .useOldPlanner()
-                                .inStreamingMode()
-                                .build());
-        tEnv.executeSql(
-                "create temporary system function add_one as 'add_one.add_one' language python");
-
-        tEnv.createTemporaryView("source", tEnv.fromValues(1L, 2L, 3L).as("a"));
-
-        Iterator<Row> result = tEnv.executeSql("select add_one(a) as a from source").collect();
-
-        List<Long> actual = new ArrayList<>();
-        while (result.hasNext()) {
-            Row r = result.next();
-            actual.add((Long) r.getField(0));
-        }
-
-        List<Long> expected = Arrays.asList(2L, 3L, 4L);
-        if (!actual.equals(expected)) {
-            throw new AssertionError(
-                    String.format(
-                            "The output result: %s is not as expected: %s!", actual, expected));
-        }
-    }
-}
diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink.sh b/flink-end-to-end-tests/test-scripts/test_pyflink.sh
index 24ffc59..27cfd0a 100755
--- a/flink-end-to-end-tests/test-scripts/test_pyflink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_pyflink.sh
@@ -155,26 +155,6 @@ PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
     -c org.apache.flink.python.tests.BlinkBatchPythonUdfSqlJob \
     "${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
 
-echo "Test flink stream python udf sql job:\n"
-PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-    -p 2 \
-    -pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-    -pyreq "${REQUIREMENTS_PATH}" \
-    -pyarch "${TEST_DATA_DIR}/venv.zip" \
-    -pyexec "venv.zip/.conda/bin/python" \
-    -c org.apache.flink.python.tests.FlinkStreamPythonUdfSqlJob \
-    "${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
-
-echo "Test flink batch python udf sql job:\n"
-PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-    -p 2 \
-    -pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-    -pyreq "${REQUIREMENTS_PATH}" \
-    -pyarch "${TEST_DATA_DIR}/venv.zip" \
-    -pyexec "venv.zip/.conda/bin/python" \
-    -c org.apache.flink.python.tests.FlinkBatchPythonUdfSqlJob \
-    "${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
-
 echo "Test using python udf in sql client:\n"
 SQL_CONF=$TEST_DATA_DIR/sql-client-session.conf
 
diff --git a/flink-python/dev/integration_test.sh b/flink-python/dev/integration_test.sh
index d2f10e5..c598d09 100755
--- a/flink-python/dev/integration_test.sh
+++ b/flink-python/dev/integration_test.sh
@@ -36,9 +36,6 @@ FLINK_PYTHON_DIR=$(dirname "$CURRENT_DIR")
 # test common module
 test_module "common"
 
-# test dataset module
-test_module "dataset"
-
 # test datastream module
 test_module "datastream"
 
diff --git a/flink-python/dev/pip_test_code.py b/flink-python/dev/pip_test_code.py
index 0904b53..3bd1062 100755
--- a/flink-python/dev/pip_test_code.py
+++ b/flink-python/dev/pip_test_code.py
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 # test pyflink shell environment
-from pyflink.shell import b_env, bt_env, FileSystem, OldCsv, DataTypes, Schema
+from pyflink.shell import s_env, st_env, FileSystem, OldCsv, DataTypes, Schema
 
 import tempfile
 import os
@@ -28,9 +28,9 @@ if os.path.exists(sink_path):
         os.remove(sink_path)
     else:
         shutil.rmtree(sink_path)
-b_env.set_parallelism(1)
-t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
-bt_env.connect(FileSystem().path(sink_path)) \
+s_env.set_parallelism(1)
+t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
+st_env.connect(FileSystem().path(sink_path)) \
     .with_format(OldCsv()
                  .field_delimiter(',')
                  .field("a", DataTypes.BIGINT())
@@ -40,9 +40,9 @@ bt_env.connect(FileSystem().path(sink_path)) \
                  .field("a", DataTypes.BIGINT())
                  .field("b", DataTypes.STRING())
                  .field("c", DataTypes.STRING())) \
-    .create_temporary_table("batch_sink")
+    .create_temporary_table("csv_sink")
 
-t.select("a + 1, b, c").execute_insert("batch_sink").wait()
+t.select("a + 1, b, c").execute_insert("csv_sink").wait()
 
 with open(sink_path, 'r') as f:
     lines = f.read()
diff --git a/flink-python/docs/index.rst b/flink-python/docs/index.rst
index 3d6f538..7a174fd 100644
--- a/flink-python/docs/index.rst
+++ b/flink-python/docs/index.rst
@@ -26,7 +26,6 @@ Welcome to Flink Python API Docs!
    pyflink
    pyflink.common
    pyflink.table
-   pyflink.dataset
    pyflink.datastream
    pyflink.metrics
 
diff --git a/flink-python/docs/pyflink.dataset.rst b/flink-python/docs/pyflink.dataset.rst
deleted file mode 100644
index dc42fcd..0000000
--- a/flink-python/docs/pyflink.dataset.rst
+++ /dev/null
@@ -1,28 +0,0 @@
-.. ################################################################################
-     Licensed to the Apache Software Foundation (ASF) under one
-     or more contributor license agreements.  See the NOTICE file
-     distributed with this work for additional information
-     regarding copyright ownership.  The ASF licenses this file
-     to you under the Apache License, Version 2.0 (the
-     "License"); you may not use this file except in compliance
-     with the License.  You may obtain a copy of the License at
-
-         http://www.apache.org/licenses/LICENSE-2.0
-
-     Unless required by applicable law or agreed to in writing, software
-     distributed under the License is distributed on an "AS IS" BASIS,
-     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-     See the License for the specific language governing permissions and
-    limitations under the License.
-   ################################################################################
-
-pyflink.dataset package
-=======================
-
-Module contents
----------------
-
-.. automodule:: pyflink.dataset
-    :members:
-    :undoc-members:
-    :show-inheritance:
diff --git a/flink-python/docs/pyflink.rst b/flink-python/docs/pyflink.rst
index ff17b2a..81ba9f7 100644
--- a/flink-python/docs/pyflink.rst
+++ b/flink-python/docs/pyflink.rst
@@ -27,7 +27,6 @@ Subpackages
 
     pyflink.common
     pyflink.table
-    pyflink.dataset
     pyflink.datastream
 
 .. automodule:: pyflink
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 9bb15ec..7199b2a 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -70,7 +70,7 @@ under the License.
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -80,12 +80,6 @@ under the License.
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
 
 		<!-- Beam dependencies -->
 
diff --git a/flink-python/pyflink/common/tests/test_execution_config.py b/flink-python/pyflink/common/tests/test_execution_config.py
index 8f3ba67..33433df 100644
--- a/flink-python/pyflink/common/tests/test_execution_config.py
+++ b/flink-python/pyflink/common/tests/test_execution_config.py
@@ -15,7 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-from pyflink.dataset import ExecutionEnvironment
+from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.common import (ExecutionConfig, RestartStrategies, ExecutionMode)
 from pyflink.java_gateway import get_gateway
 from pyflink.testing.test_case_utils import PyFlinkTestCase
@@ -24,7 +24,7 @@ from pyflink.testing.test_case_utils import PyFlinkTestCase
 class ExecutionConfigTests(PyFlinkTestCase):
 
     def setUp(self):
-        self.env = ExecutionEnvironment.get_execution_environment()
+        self.env = StreamExecutionEnvironment.get_execution_environment()
         self.execution_config = self.env.get_config()
 
     def test_constant(self):
@@ -253,9 +253,9 @@ class ExecutionConfigTests(PyFlinkTestCase):
 
     def test_equals_and_hash(self):
 
-        config1 = ExecutionEnvironment.get_execution_environment().get_config()
+        config1 = StreamExecutionEnvironment.get_execution_environment().get_config()
 
-        config2 = ExecutionEnvironment.get_execution_environment().get_config()
+        config2 = StreamExecutionEnvironment.get_execution_environment().get_config()
 
         self.assertEqual(config1, config2)
 
diff --git a/flink-python/pyflink/dataset/__init__.py b/flink-python/pyflink/dataset/__init__.py
deleted file mode 100644
index 3bd64cd..0000000
--- a/flink-python/pyflink/dataset/__init__.py
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-"""
-Important classes of Flink Batch API:
-
-    - :class:`ExecutionEnvironment`:
-      The ExecutionEnvironment is the context in which a batch program is executed.
-"""
-from pyflink.dataset.execution_environment import ExecutionEnvironment
-
-__all__ = ['ExecutionEnvironment']
diff --git a/flink-python/pyflink/dataset/execution_environment.py b/flink-python/pyflink/dataset/execution_environment.py
deleted file mode 100644
index 821189c..0000000
--- a/flink-python/pyflink/dataset/execution_environment.py
+++ /dev/null
@@ -1,197 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from pyflink.common.execution_config import ExecutionConfig
-from pyflink.common.job_execution_result import JobExecutionResult
-from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
-from pyflink.java_gateway import get_gateway
-from pyflink.util.java_utils import load_java_class
-
-
-class ExecutionEnvironment(object):
-    """
-    The ExecutionEnvironment is the context in which a program is executed.
-
-    The environment provides methods to control the job execution (such as setting the parallelism)
-    and to interact with the outside world (data access).
-    """
-
-    def __init__(self, j_execution_environment):
-        self._j_execution_environment = j_execution_environment
-
-    def get_parallelism(self) -> int:
-        """
-        Gets the parallelism with which operation are executed by default.
-
-        :return: The parallelism.
-        """
-        return self._j_execution_environment.getParallelism()
-
-    def set_parallelism(self, parallelism: int):
-        """
-        Sets the parallelism for operations executed through this environment.
-        Setting a parallelism of x here will cause all operators to run with
-        x parallel instances.
-
-        :param parallelism: The parallelism.
-        """
-        self._j_execution_environment.setParallelism(parallelism)
-
-    def get_default_local_parallelism(self) -> int:
-        """
-        Gets the default parallelism that will be used for the local execution environment.
-
-        :return: The parallelism.
-        """
-        return self._j_execution_environment.getDefaultLocalParallelism()
-
-    def set_default_local_parallelism(self, parallelism: int):
-        """
-        Sets the default parallelism that will be used for the local execution environment.
-
-        :param parallelism: The parallelism.
-        """
-        self._j_execution_environment.setDefaultLocalParallelism(parallelism)
-
-    def get_config(self) -> ExecutionConfig:
-        """
-        Gets the config object that defines execution parameters.
-
-        :return: An :class:`ExecutionConfig` object, the environment's execution configuration.
-        """
-        return ExecutionConfig(self._j_execution_environment.getConfig())
-
-    def set_restart_strategy(self, restart_strategy_configuration: RestartStrategyConfiguration):
-        """
-        Sets the restart strategy configuration. The configuration specifies which restart strategy
-        will be used for the execution graph in case of a restart.
-
-        Example:
-        ::
-
-            >>> env.set_restart_strategy(RestartStrategies.no_restart())
-
-        :param restart_strategy_configuration: Restart strategy configuration to be set.
-        """
-        self._j_execution_environment.setRestartStrategy(
-            restart_strategy_configuration._j_restart_strategy_configuration)
-
-    def get_restart_strategy(self) -> RestartStrategyConfiguration:
-        """
-        Returns the specified restart strategy configuration.
-
-        :return: The restart strategy configuration to be used.
-        """
-        return RestartStrategies._from_j_restart_strategy(
-            self._j_execution_environment.getRestartStrategy())
-
-    def add_default_kryo_serializer(self, type_class_name: str, serializer_class_name: str):
-        """
-        Adds a new Kryo default serializer to the Runtime.
-
-        Example:
-        ::
-
-            >>> env.add_default_kryo_serializer("com.aaa.bbb.TypeClass", "com.aaa.bbb.Serializer")
-
-        :param type_class_name: The full-qualified java class name of the types serialized with the
-                                given serializer.
-        :param serializer_class_name: The full-qualified java class name of the serializer to use.
-        """
-        type_clz = load_java_class(type_class_name)
-        j_serializer_clz = load_java_class(serializer_class_name)
-        self._j_execution_environment.addDefaultKryoSerializer(type_clz, j_serializer_clz)
-
-    def register_type_with_kryo_serializer(self, type_class_name: str, serializer_class_name: str):
-        """
-        Registers the given Serializer via its class as a serializer for the given type at the
-        KryoSerializer.
-
-        Example:
-        ::
-
-            >>> env.register_type_with_kryo_serializer("com.aaa.bbb.TypeClass",
-            ...                                        "com.aaa.bbb.Serializer")
-
-        :param type_class_name: The full-qualified java class name of the types serialized with
-                                the given serializer.
-        :param serializer_class_name: The full-qualified java class name of the serializer to use.
-        """
-        type_clz = load_java_class(type_class_name)
-        j_serializer_clz = load_java_class(serializer_class_name)
-        self._j_execution_environment.registerTypeWithKryoSerializer(type_clz, j_serializer_clz)
-
-    def register_type(self, type_class_name: str):
-        """
-        Registers the given type with the serialization stack. If the type is eventually
-        serialized as a POJO, then the type is registered with the POJO serializer. If the
-        type ends up being serialized with Kryo, then it will be registered at Kryo to make
-        sure that only tags are written.
-
-        Example:
-        ::
-
-            >>> env.register_type("com.aaa.bbb.TypeClass")
-
-        :param type_class_name: The full-qualified java class name of the type to register.
-        """
-        type_clz = load_java_class(type_class_name)
-        self._j_execution_environment.registerType(type_clz)
-
-    def execute(self, job_name: str = None) -> JobExecutionResult:
-        """
-        Triggers the program execution. The environment will execute all parts of the program that
-        have resulted in a "sink" operation.
-
-        The program execution will be logged and displayed with the given job name.
-
-        :param job_name: Desired name of the job, optional.
-        :return: The result of the job execution, containing elapsed time and accumulators.
-        """
-        if job_name is None:
-            return JobExecutionResult(self._j_execution_environment.execute())
-        else:
-            return JobExecutionResult(self._j_execution_environment.execute(job_name))
-
-    def get_execution_plan(self) -> str:
-        """
-        Creates the plan with which the system will execute the program, and returns it as
-        a String using a JSON representation of the execution data flow graph.
-        Note that this needs to be called, before the plan is executed.
-
-        If the compiler could not be instantiated, or the master could not
-        be contacted to retrieve information relevant to the execution planning,
-        an exception will be thrown.
-
-        :return: The execution plan of the program, as a JSON String.
-        """
-        return self._j_execution_environment.getExecutionPlan()
-
-    @staticmethod
-    def get_execution_environment() -> 'ExecutionEnvironment':
-        """
-        Creates an execution environment that represents the context in which the program is
-        currently executed. If the program is invoked standalone, this method returns a local
-        execution environment. If the program is invoked from within the command line client to be
-        submitted to a cluster, this method returns the execution environment of this cluster.
-
-        :return: The :class:`ExecutionEnvironment` of the context in which the program is executed.
-        """
-        gateway = get_gateway()
-        j_execution_environment = gateway.jvm.org.apache.flink.api.java.ExecutionEnvironment\
-            .getExecutionEnvironment()
-        return ExecutionEnvironment(j_execution_environment)
diff --git a/flink-python/pyflink/dataset/tests/__init__.py b/flink-python/pyflink/dataset/tests/__init__.py
deleted file mode 100644
index 65b48d4..0000000
--- a/flink-python/pyflink/dataset/tests/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
diff --git a/flink-python/pyflink/dataset/tests/test_execution_environment.py b/flink-python/pyflink/dataset/tests/test_execution_environment.py
deleted file mode 100644
index 49ed8c9..0000000
--- a/flink-python/pyflink/dataset/tests/test_execution_environment.py
+++ /dev/null
@@ -1,137 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-import json
-import os
-import tempfile
-import time
-
-import unittest
-
-from pyflink.common import ExecutionConfig, RestartStrategies
-from pyflink.dataset import ExecutionEnvironment
-from pyflink.table import DataTypes, BatchTableEnvironment, CsvTableSource, CsvTableSink
-from pyflink.testing.test_case_utils import PyFlinkTestCase, exec_insert_table
-
-
-class ExecutionEnvironmentTests(PyFlinkTestCase):
-
-    def setUp(self):
-        self.env = ExecutionEnvironment.get_execution_environment()
-
-    def test_get_set_parallelism(self):
-
-        self.env.set_parallelism(10)
-
-        parallelism = self.env.get_parallelism()
-
-        self.assertEqual(parallelism, 10)
-
-    def test_get_set_default_local_parallelism(self):
-
-        self.env.set_default_local_parallelism(8)
-
-        parallelism = self.env.get_default_local_parallelism()
-
-        self.assertEqual(parallelism, 8)
-
-    def test_get_config(self):
-
-        execution_config = self.env.get_config()
-
-        self.assertIsInstance(execution_config, ExecutionConfig)
-
-    def test_set_get_restart_strategy(self):
-
-        self.env.set_restart_strategy(RestartStrategies.no_restart())
-
-        restart_strategy = self.env.get_restart_strategy()
-
-        self.assertEqual(restart_strategy, RestartStrategies.no_restart())
-
-    def test_add_default_kryo_serializer(self):
-
-        self.env.add_default_kryo_serializer(
-            "org.apache.flink.runtime.state.StateBackendTestBase$TestPojo",
-            "org.apache.flink.runtime.state.StateBackendTestBase$CustomKryoTestSerializer")
-
-        class_dict = self.env.get_config().get_default_kryo_serializer_classes()
-
-        self.assertEqual(class_dict,
-                         {'org.apache.flink.runtime.state.StateBackendTestBase$TestPojo':
-                          'org.apache.flink.runtime.state'
-                          '.StateBackendTestBase$CustomKryoTestSerializer'})
-
-    def test_register_type_with_kryo_serializer(self):
-
-        self.env.register_type_with_kryo_serializer(
-            "org.apache.flink.runtime.state.StateBackendTestBase$TestPojo",
-            "org.apache.flink.runtime.state.StateBackendTestBase$CustomKryoTestSerializer")
-
-        class_dict = self.env.get_config().get_registered_types_with_kryo_serializer_classes()
-
-        self.assertEqual(class_dict,
-                         {'org.apache.flink.runtime.state.StateBackendTestBase$TestPojo':
-                          'org.apache.flink.runtime.state'
-                          '.StateBackendTestBase$CustomKryoTestSerializer'})
-
-    def test_register_type(self):
-
-        self.env.register_type("org.apache.flink.runtime.state.StateBackendTestBase$TestPojo")
-
-        type_list = self.env.get_config().get_registered_pojo_types()
-
-        self.assertEqual(type_list,
-                         ["org.apache.flink.runtime.state.StateBackendTestBase$TestPojo"])
-
-    @unittest.skip("Python API does not support DataSet now. refactor this test later")
-    def test_get_execution_plan(self):
-        tmp_dir = tempfile.gettempdir()
-        source_path = os.path.join(tmp_dir + '/streaming.csv')
-        tmp_csv = os.path.join(tmp_dir + '/streaming2.csv')
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
-
-        t_env = BatchTableEnvironment.create(self.env)
-        csv_source = CsvTableSource(source_path, field_names, field_types)
-        t_env.register_table_source("Orders", csv_source)
-        t_env.register_table_sink(
-            "Results",
-            CsvTableSink(field_names, field_types, tmp_csv))
-        t_env.from_path("Orders").execute_insert("Results").wait()
-
-        plan = self.env.get_execution_plan()
-
-        json.loads(plan)
-
-    def test_execute(self):
-        tmp_dir = tempfile.gettempdir()
-        field_names = ['a', 'b', 'c']
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env = BatchTableEnvironment.create(self.env)
-        t_env.register_table_sink(
-            'Results',
-            CsvTableSink(field_names, field_types,
-                         os.path.join('{}/{}.csv'.format(tmp_dir, round(time.time())))))
-        execution_result = exec_insert_table(
-            t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c']),
-            'Results')
-        self.assertIsNotNone(execution_result.get_job_id())
-        self.assertIsNotNone(execution_result.get_net_runtime())
-        self.assertEqual(len(execution_result.get_all_accumulator_results()), 0)
-        self.assertIsNone(execution_result.get_accumulator_result('accumulator'))
-        self.assertIsNotNone(str(execution_result))
diff --git a/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py b/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py
deleted file mode 100644
index 2d49844..0000000
--- a/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py
+++ /dev/null
@@ -1,64 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-from pyflink.dataset import ExecutionEnvironment
-from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase, PyFlinkTestCase
-
-
-class ExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
-                                            PyFlinkTestCase):
-
-    @classmethod
-    def python_class(cls):
-        return ExecutionEnvironment
-
-    @classmethod
-    def java_class(cls):
-        return "org.apache.flink.api.java.ExecutionEnvironment"
-
-    @classmethod
-    def excluded_methods(cls):
-        # Exclude these methods for the time being, because current
-        # ExecutionEnvironment/StreamExecutionEnvironment do not apply to the
-        # DataSet/DataStream API, but to the Table API configuration.
-        # Currently only the methods for configuration is added.
-        # 'setSessionTimeout', 'getSessionTimeout', 'setNumberOfExecutionRetries',
-        # 'getNumberOfExecutionRetries' is deprecated, exclude them.
-        # 'access$000' is generated by java compiler, exclude it too.
-        return {'resetContextEnvironment', 'getSessionTimeout', 'fromParallelCollection',
-                'getId', 'registerCachedFile', 'setNumberOfExecutionRetries', 'readTextFile',
-                'getNumberOfExecutionRetries', 'registerCachedFilesWithPlan',
-                'getLastJobExecutionResult', 'readCsvFile', 'initializeContextEnvironment',
-                'createLocalEnvironment', 'createLocalEnvironmentWithWebUI', 'createProgramPlan',
-                'getIdString', 'setSessionTimeout', 'fromElements', 'createRemoteEnvironment',
-                'startNewSession', 'fromCollection', 'readTextFileWithValue', 'registerDataSink',
-                'createCollectionsEnvironment', 'readFile', 'readFileOfPrimitives',
-                'generateSequence', 'areExplicitEnvironmentsAllowed', 'createInput',
-                'getUserCodeClassLoader', 'getExecutorServiceLoader', 'getConfiguration',
-                'executeAsync', 'registerJobListener', 'clearJobListeners', 'configure'}
-
-
-if __name__ == '__main__':
-    import unittest
-
-    try:
-        import xmlrunner
-        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
-    except ImportError:
-        testRunner = None
-    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
index 890df9c..27d7c37 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
@@ -34,8 +34,8 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
     @classmethod
     def excluded_methods(cls):
         # Exclude these methods for the time being, because current
-        # ExecutionEnvironment/StreamExecutionEnvironment do not apply to the
-        # DataSet/DataStream API, but to the Table API configuration.
+        # StreamExecutionEnvironment do not apply to the
+        # DataStream API, but to the Table API configuration.
         # Currently only the methods for configuration is added.
         # 'isForceCheckpointing', 'getNumberOfExecutionRetries', 'setNumberOfExecutionRetries'
         # is deprecated, exclude them.
diff --git a/flink-python/pyflink/shell.py b/flink-python/pyflink/shell.py
index b5c9dc0..841991e 100644
--- a/flink-python/pyflink/shell.py
+++ b/flink-python/pyflink/shell.py
@@ -20,7 +20,6 @@ import platform
 import sys
 
 from pyflink.common import *
-from pyflink.dataset import *
 from pyflink.datastream import *
 from pyflink.table import *
 from pyflink.table.catalog import *
@@ -73,36 +72,6 @@ welcome_msg = u'''
 
 NOTE: Use the prebound Table Environment to implement batch or streaming Table programs.
 
-  Batch - Use 'b_env' and 'bt_env' variables
-
-    *
-    * import tempfile
-    * import os
-    * import shutil
-    * sink_path = tempfile.gettempdir() + '/batch.csv'
-    * if os.path.exists(sink_path):
-    *     if os.path.isfile(sink_path):
-    *         os.remove(sink_path)
-    *     else:
-    *         shutil.rmtree(sink_path)
-    * b_env.set_parallelism(1)
-    * t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
-    * bt_env.connect(FileSystem().path(sink_path)) \\
-    *     .with_format(OldCsv()
-    *                  .field_delimiter(',')
-    *                  .field("a", DataTypes.BIGINT())
-    *                  .field("b", DataTypes.STRING())
-    *                  .field("c", DataTypes.STRING())) \\
-    *     .with_schema(Schema()
-    *                  .field("a", DataTypes.BIGINT())
-    *                  .field("b", DataTypes.STRING())
-    *                  .field("c", DataTypes.STRING())) \\
-    *     .create_temporary_table("batch_sink")
-    *
-    * t.select("a + 1, b, c").insert_into("batch_sink")
-    *
-    * bt_env.execute("batch_job")
-
   Streaming - Use 's_env' and 'st_env' variables
 
     *
@@ -135,10 +104,6 @@ NOTE: Use the prebound Table Environment to implement batch or streaming Table p
 '''
 utf8_out.write(welcome_msg)
 
-b_env = ExecutionEnvironment.get_execution_environment()
-
-bt_env = BatchTableEnvironment.create(b_env)
-
 s_env = StreamExecutionEnvironment.get_execution_environment()
 
 st_env = StreamTableEnvironment.create(s_env)
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index 75436c9..1cf4313 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -81,8 +81,7 @@ from pyflink.table.statement_set import StatementSet
 from pyflink.table.table import GroupWindowedTable, GroupedTable, OverWindowedTable, Table, \
     WindowGroupedTable
 from pyflink.table.table_config import TableConfig
-from pyflink.table.table_environment import (TableEnvironment, StreamTableEnvironment,
-                                             BatchTableEnvironment)
+from pyflink.table.table_environment import (TableEnvironment, StreamTableEnvironment)
 from pyflink.table.table_result import TableResult
 from pyflink.table.table_schema import TableSchema
 from pyflink.table.types import DataTypes, UserDefinedType, Row, RowKind
@@ -91,7 +90,6 @@ from pyflink.table.udf import FunctionContext, ScalarFunction, TableFunction, Ag
 
 __all__ = [
     'AggregateFunction',
-    'BatchTableEnvironment',
     'CsvTableSink',
     'CsvTableSource',
     'DataTypes',
diff --git a/flink-python/pyflink/table/examples/batch/word_count.py b/flink-python/pyflink/table/examples/batch/word_count.py
index ec49fb4..2c0d930 100644
--- a/flink-python/pyflink/table/examples/batch/word_count.py
+++ b/flink-python/pyflink/table/examples/batch/word_count.py
@@ -21,7 +21,7 @@ import shutil
 import sys
 import tempfile
 
-from pyflink.table import BatchTableEnvironment, EnvironmentSettings
+from pyflink.table import EnvironmentSettings, TableEnvironment
 from pyflink.table import expressions as expr
 
 
@@ -35,7 +35,7 @@ def word_count():
               "with the License"
 
     env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
-    t_env = BatchTableEnvironment.create(environment_settings=env_settings)
+    t_env = TableEnvironment.create(environment_settings=env_settings)
 
     # register Results table in table environment
     tmp_dir = tempfile.gettempdir()
@@ -67,9 +67,7 @@ def word_count():
     table = t_env.from_elements(elements, ["word", "count"])
     table.group_by(table.word) \
          .select(table.word, expr.lit(1).count.alias('count')) \
-         .insert_into("Results")
-
-    t_env.execute("word_count")
+         .execute_insert("Results")
 
 
 if __name__ == '__main__':
diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py
index ccb3aa4..9e08278 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -44,7 +44,7 @@ class Table(object):
 
     """
     A :class:`~pyflink.table.Table` is the core component of the Table API.
-    Similar to how the batch and streaming APIs have DataSet and DataStream,
+    Similar to how the DataStream API has DataStream,
     the Table API is built around :class:`~pyflink.table.Table`.
 
     Use the methods of :class:`~pyflink.table.Table` to transform data.
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 2114dad..82d4557 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -30,13 +30,12 @@ from pyflink.common.typeinfo import TypeInformation
 from pyflink.datastream.data_stream import DataStream
 
 from pyflink.common import JobExecutionResult
-from pyflink.dataset import ExecutionEnvironment
 from pyflink.java_gateway import get_gateway
 from pyflink.serializers import BatchedSerializer, PickleSerializer
 from pyflink.table import Table, EnvironmentSettings, Expression, ExplainDetail, \
     Module, ModuleEntry, TableSink
 from pyflink.table.catalog import Catalog
-from pyflink.table.descriptors import StreamTableDescriptor, BatchTableDescriptor, \
+from pyflink.table.descriptors import StreamTableDescriptor, \
     ConnectorDescriptor, ConnectTableDescriptor
 from pyflink.table.serializers import ArrowSerializer
 from pyflink.table.statement_set import StatementSet
@@ -46,14 +45,13 @@ from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, D
     _infer_schema_from_data, _create_converter, from_arrow_type, RowField, create_arrow_schema, \
     _to_java_data_type
 from pyflink.table.udf import UserDefinedFunctionWrapper, AggregateFunction, udaf, \
-    UserDefinedAggregateFunctionWrapper, udtaf, TableAggregateFunction
+    udtaf, TableAggregateFunction
 from pyflink.table.utils import to_expression_jarray
 from pyflink.util import java_utils
 from pyflink.util.java_utils import get_j_env_configuration, is_local_deployment, load_java_class, \
     to_j_explain_detail_arr, to_jarray
 
 __all__ = [
-    'BatchTableEnvironment',
     'StreamTableEnvironment',
     'TableEnvironment'
 ]
@@ -93,7 +91,6 @@ class TableEnvironment(object):
 
     def __init__(self, j_tenv, serializer=PickleSerializer()):
         self._j_tenv = j_tenv
-        self._is_blink_planner = TableEnvironment._judge_blink_planner(j_tenv)
         self._serializer = serializer
         # When running in MiniCluster, launch the Python UDF worker using the Python executable
         # specified by sys.executable if users have not specified it explicitly via configuration
@@ -115,16 +112,6 @@ class TableEnvironment(object):
             environment_settings._j_environment_settings)
         return TableEnvironment(j_tenv)
 
-    @staticmethod
-    def _judge_blink_planner(j_tenv):
-        if "getPlanner" not in dir(j_tenv):
-            return False
-        else:
-            j_planner_class = j_tenv.getPlanner().getClass()
-            j_blink_planner_class = get_java_class(
-                get_gateway().jvm.org.apache.flink.table.planner.delegation.PlannerBase)
-            return j_blink_planner_class.isAssignableFrom(j_planner_class)
-
     def from_table_source(self, table_source: 'TableSource') -> 'Table':
         """
         Creates a table from a table source.
@@ -1082,8 +1069,7 @@ class TableEnvironment(object):
             .loadClass(function_class_name).newInstance()
         # this is a temporary solution and will be unified later when we use the new type
         # system(DataType) to replace the old type system(TypeInformation).
-        if (self._is_blink_planner and not isinstance(self, StreamTableEnvironment)) or \
-                self.__class__ == TableEnvironment:
+        if not isinstance(self, StreamTableEnvironment) or self.__class__ == TableEnvironment:
             if self._is_table_function(java_function):
                 self._register_table_function(name, java_function)
             elif self._is_aggregate_function(java_function):
@@ -1128,8 +1114,7 @@ class TableEnvironment(object):
         java_function = function._java_user_defined_function()
         # this is a temporary solution and will be unified later when we use the new type
         # system(DataType) to replace the old type system(TypeInformation).
-        if (self._is_blink_planner and isinstance(self, BatchTableEnvironment)) or \
-                self.__class__ == TableEnvironment:
+        if self.__class__ == TableEnvironment:
             if self._is_table_function(java_function):
                 self._register_table_function(name, java_function)
             elif self._is_aggregate_function(java_function):
@@ -1442,14 +1427,10 @@ class TableEnvironment(object):
             execution_config = self._get_j_env().getConfig()
             gateway = get_gateway()
             j_objs = gateway.jvm.PythonBridgeUtils.readPythonObjects(temp_file.name, True)
-            if self._is_blink_planner:
-                PythonTableUtils = gateway.jvm \
-                    .org.apache.flink.table.planner.utils.python.PythonTableUtils
-                PythonInputFormatTableSource = gateway.jvm \
-                    .org.apache.flink.table.planner.utils.python.PythonInputFormatTableSource
-            else:
-                PythonTableUtils = gateway.jvm.PythonTableUtils
-                PythonInputFormatTableSource = gateway.jvm.PythonInputFormatTableSource
+            PythonTableUtils = gateway.jvm \
+                .org.apache.flink.table.planner.utils.python.PythonTableUtils
+            PythonInputFormatTableSource = gateway.jvm \
+                .org.apache.flink.table.planner.utils.python.PythonInputFormatTableSource
             j_input_format = PythonTableUtils.getInputFormat(
                 j_objs, row_type_info, execution_config)
             j_table_source = PythonInputFormatTableSource(
@@ -1489,10 +1470,6 @@ class TableEnvironment(object):
         .. versionadded:: 1.11.0
         """
 
-        if not self._is_blink_planner and isinstance(self, BatchTableEnvironment):
-            raise TypeError("It doesn't support to convert from Pandas DataFrame in the batch "
-                            "mode of old planner")
-
         import pandas as pd
         if not isinstance(pdf, pd.DataFrame):
             raise TypeError("Unsupported type, expected pandas.DataFrame, got %s" % type(pdf))
@@ -1535,9 +1512,8 @@ class TableEnvironment(object):
 
             data_type = jvm.org.apache.flink.table.types.utils.TypeConversions\
                 .fromLegacyInfoToDataType(_to_java_type(result_type)).notNull()
-            if self._is_blink_planner:
-                data_type = data_type.bridgedTo(
-                    load_java_class('org.apache.flink.table.data.RowData'))
+            data_type = data_type.bridgedTo(
+                load_java_class('org.apache.flink.table.data.RowData'))
 
             j_arrow_table_source = \
                 jvm.org.apache.flink.table.runtime.arrow.ArrowUtils.createArrowTableSource(
@@ -1566,13 +1542,7 @@ class TableEnvironment(object):
             j_configuration.setString(config_key, ";".join(jar_urls_set))
 
     def _get_j_env(self):
-        if self._is_blink_planner:
-            return self._j_tenv.getPlanner().getExecEnv()
-        else:
-            try:
-                return self._j_tenv.execEnv()
-            except:
-                return self._j_tenv.getPlanner().getExecutionEnvironment()
+        return self._j_tenv.getPlanner().getExecEnv()
 
     @staticmethod
     def _is_table_function(java_function):
@@ -1618,10 +1588,6 @@ class TableEnvironment(object):
         self._add_jars_to_j_env_config(classpaths_key)
 
     def _wrap_aggregate_function_if_needed(self, function) -> UserDefinedFunctionWrapper:
-        if isinstance(function, (AggregateFunction, TableAggregateFunction,
-                                 UserDefinedAggregateFunctionWrapper)):
-            if not self._is_blink_planner:
-                raise Exception("Python UDAF and UDTAF are only supported in blink planner")
         if isinstance(function, AggregateFunction):
             function = udaf(function,
                             result_type=function.get_result_type(),
@@ -1794,147 +1760,3 @@ class StreamTableEnvironment(TableEnvironment):
         """
         j_data_stream = self._j_tenv.toRetractStream(table._j_table, type_info.get_java_type_info())
         return DataStream(j_data_stream=j_data_stream)
-
-
-class BatchTableEnvironment(TableEnvironment):
-    """
-    .. note:: BatchTableEnvironment will be dropped in Flink 1.14 because it only supports the old
-              planner. Use the unified :class:`~pyflink.table.TableEnvironment` instead, which
-              supports both batch and streaming. More advanced operations previously covered by
-              the DataSet API can now use the DataStream API in BATCH execution mode.
-    """
-
-    def __init__(self, j_tenv):
-        super(BatchTableEnvironment, self).__init__(j_tenv)
-        self._j_tenv = j_tenv
-
-    def connect(self, connector_descriptor: ConnectorDescriptor) -> \
-            Union[BatchTableDescriptor, StreamTableDescriptor]:
-        """
-        Creates a temporary table from a descriptor.
-
-        Descriptors allow for declaring the communication to external systems in an
-        implementation-agnostic way. The classpath is scanned for suitable table factories that
-        match the desired configuration.
-
-        The following example shows how to read from a connector using a JSON format and
-        registering a temporary table as "MyTable":
-        ::
-
-            >>> table_env \\
-            ...     .connect(ExternalSystemXYZ()
-            ...              .version("0.11")) \\
-            ...     .with_format(Json()
-            ...                  .json_schema("{...}")
-            ...                  .fail_on_missing_field(False)) \\
-            ...     .with_schema(Schema()
-            ...                  .field("user-name", "VARCHAR")
-            ...                  .from_origin_field("u_name")
-            ...                  .field("count", "DECIMAL")) \\
-            ...     .create_temporary_table("MyTable")
-
-        :param connector_descriptor: Connector descriptor describing the external system.
-        :return: A :class:`~pyflink.table.descriptors.BatchTableDescriptor` or a
-                 :class:`~pyflink.table.descriptors.StreamTableDescriptor` (for blink planner) used
-                 to build the temporary table.
-
-        .. note:: Deprecated in 1.11. Use :func:`execute_sql` to register a table instead.
-        """
-        warnings.warn("Deprecated in 1.11. Use execute_sql instead.", DeprecationWarning)
-        gateway = get_gateway()
-        blink_t_env_class = get_java_class(
-            gateway.jvm.org.apache.flink.table.api.internal.TableEnvironmentImpl)
-        if blink_t_env_class == self._j_tenv.getClass():
-            return StreamTableDescriptor(
-                self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
-        else:
-            return BatchTableDescriptor(
-                self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
-
-    @staticmethod
-    def create(execution_environment: ExecutionEnvironment = None,  # type: ignore
-               table_config: TableConfig = None,
-               environment_settings: EnvironmentSettings = None) -> 'BatchTableEnvironment':
-        """
-        Creates a :class:`~pyflink.table.BatchTableEnvironment`.
-
-        Example:
-        ::
-
-            # create with ExecutionEnvironment.
-            >>> env = ExecutionEnvironment.get_execution_environment()
-            >>> table_env = BatchTableEnvironment.create(env)
-            # create with ExecutionEnvironment and TableConfig.
-            >>> table_config = TableConfig()
-            >>> table_config.set_null_check(False)
-            >>> table_env = BatchTableEnvironment.create(env, table_config)
-            # create with EnvironmentSettings.
-            >>> environment_settings = EnvironmentSettings.new_instance().in_batch_mode() \\
-            ...     .use_blink_planner().build()
-            >>> table_env = BatchTableEnvironment.create(environment_settings=environment_settings)
-
-        :param execution_environment: The batch :class:`~pyflink.dataset.ExecutionEnvironment` of
-                                      the TableEnvironment.
-        :param table_config: The configuration of the TableEnvironment, optional.
-        :param environment_settings: The environment settings used to instantiate the
-                                     TableEnvironment. It provides the interfaces about planner
-                                     selection(flink or blink), optional.
-        :return: The BatchTableEnvironment created from given ExecutionEnvironment and
-                 configuration.
-
-        .. note:: This part of the API will be dropped in Flink 1.14 because it only supports the
-                  old planner. Use the unified :class:`~pyflink.table.TableEnvironment` instead, it
-                  supports both batch and streaming. For more advanced operations, the new batch
-                  mode of the DataStream API might be useful.
-        """
-        warnings.warn(
-            "Deprecated in 1.14. Use the unified TableEnvironment instead.",
-            DeprecationWarning)
-        if execution_environment is None and \
-                table_config is None and \
-                environment_settings is None:
-            raise ValueError("No argument found, the param 'execution_environment' "
-                             "or 'environment_settings' is required.")
-        elif execution_environment is None and \
-                table_config is not None and \
-                environment_settings is None:
-            raise ValueError("Only the param 'table_config' is found, "
-                             "the param 'execution_environment' is also required.")
-        elif execution_environment is not None and \
-                environment_settings is not None:
-            raise ValueError("The param 'execution_environment' and "
-                             "'environment_settings' cannot be used at the same time")
-        elif table_config is not None and \
-                environment_settings is not None:
-            raise ValueError("The param 'table_config' and "
-                             "'environment_settings' cannot be used at the same time")
-
-        gateway = get_gateway()
-        if environment_settings is not None:
-            if environment_settings.is_streaming_mode():
-                raise ValueError("The environment settings for BatchTableEnvironment must be "
-                                 "set to batch mode.")
-            JEnvironmentSettings = get_gateway().jvm.org.apache.flink.table.api.EnvironmentSettings
-
-            old_planner_class_name = EnvironmentSettings.new_instance().in_batch_mode() \
-                .use_old_planner().build()._j_environment_settings \
-                .toPlannerProperties()[JEnvironmentSettings.CLASS_NAME]
-            planner_properties = environment_settings._j_environment_settings.toPlannerProperties()
-            if JEnvironmentSettings.CLASS_NAME in planner_properties and \
-                    planner_properties[JEnvironmentSettings.CLASS_NAME] == old_planner_class_name:
-                # The Java EnvironmentSettings API does not support creating table environment with
-                # old planner. Create it from other API.
-                j_tenv = gateway.jvm.BatchTableEnvironment.create(
-                    ExecutionEnvironment.get_execution_environment()._j_execution_environment)
-            else:
-                j_tenv = gateway.jvm.TableEnvironment.create(
-                    environment_settings._j_environment_settings)
-        else:
-            if table_config is None:
-                j_tenv = gateway.jvm.BatchTableEnvironment.create(
-                    execution_environment._j_execution_environment)
-            else:
-                j_tenv = gateway.jvm.BatchTableEnvironment.create(
-                    execution_environment._j_execution_environment,
-                    table_config._j_table_config)
-        return BatchTableEnvironment(j_tenv)
diff --git a/flink-python/pyflink/table/tests/test_calc.py b/flink-python/pyflink/table/tests/test_calc.py
index c51974f..f09b1fa 100644
--- a/flink-python/pyflink/table/tests/test_calc.py
+++ b/flink-python/pyflink/table/tests/test_calc.py
@@ -21,7 +21,7 @@ import datetime
 from decimal import Decimal
 
 from pyflink.common import Row
-from pyflink.table import DataTypes, BatchTableEnvironment, EnvironmentSettings
+from pyflink.table import DataTypes
 from pyflink.table.expressions import row
 from pyflink.table.tests.test_types import PythonOnlyPoint, PythonOnlyUDT
 from pyflink.testing import source_sink_utils
@@ -122,50 +122,6 @@ class StreamTableCalcTests(PyFlinkBlinkStreamTableTestCase):
         expected = ['+I[1, abc, 2.0]', '+I[2, def, 3.0]']
         self.assert_equals(actual, expected)
 
-    def test_blink_from_element(self):
-        t_env = BatchTableEnvironment.create(environment_settings=EnvironmentSettings
-                                             .new_instance().use_blink_planner()
-                                             .in_batch_mode().build())
-        field_names = ["a", "b", "c", "d", "e", "f", "g", "h",
-                       "i", "j", "k", "l", "m", "n", "o", "p", "q"]
-        field_types = [DataTypes.BIGINT(), DataTypes.DOUBLE(), DataTypes.STRING(),
-                       DataTypes.STRING(), DataTypes.DATE(),
-                       DataTypes.TIME(),
-                       DataTypes.TIMESTAMP(3),
-                       DataTypes.INTERVAL(DataTypes.SECOND(3)),
-                       DataTypes.ARRAY(DataTypes.DOUBLE()),
-                       DataTypes.ARRAY(DataTypes.DOUBLE(False)),
-                       DataTypes.ARRAY(DataTypes.STRING()),
-                       DataTypes.ARRAY(DataTypes.DATE()),
-                       DataTypes.DECIMAL(38, 18),
-                       DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT()),
-                                      DataTypes.FIELD("b", DataTypes.DOUBLE())]),
-                       DataTypes.MAP(DataTypes.STRING(), DataTypes.DOUBLE()),
-                       DataTypes.BYTES(),
-                       PythonOnlyUDT()]
-        schema = DataTypes.ROW(
-            list(map(lambda field_name, field_type: DataTypes.FIELD(field_name, field_type),
-                     field_names,
-                     field_types)))
-        table_sink = source_sink_utils.TestAppendSink(field_names, field_types)
-        t_env.register_table_sink("Results", table_sink)
-        t = t_env.from_elements(
-            [(1, 1.0, "hi", "hello", datetime.date(1970, 1, 2), datetime.time(1, 0, 0),
-              datetime.datetime(1970, 1, 2, 0, 0),
-              datetime.timedelta(days=1, microseconds=10),
-              [1.0, None], array.array("d", [1.0, 2.0]),
-              ["abc"], [datetime.date(1970, 1, 2)], Decimal(1), Row("a", "b")(1, 2.0),
-              {"key": 1.0}, bytearray(b'ABCD'),
-              PythonOnlyPoint(3.0, 4.0))],
-            schema)
-        t.execute_insert("Results").wait()
-        actual = source_sink_utils.results()
-
-        expected = ['+I[1, 1.0, hi, hello, 1970-01-02, 01:00:00, 1970-01-02 00:00:00.0, '
-                    '86400000, [1.0, null], [1.0, 2.0], [abc], [1970-01-02], '
-                    '1.000000000000000000, +I[1, 2.0], {key=1.0}, [65, 66, 67, 68], [3.0, 4.0]]']
-        self.assert_equals(actual, expected)
-
 
 if __name__ == '__main__':
     import unittest
diff --git a/flink-python/pyflink/table/tests/test_dependency.py b/flink-python/pyflink/table/tests/test_dependency.py
index b3a0bca..7567b93 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -27,9 +27,7 @@ from pyflink.table import expressions as expr
 from pyflink.table.udf import udf
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import (PyFlinkBlinkStreamTableTestCase,
-                                             PyFlinkBlinkBatchTableTestCase,
-                                             PyFlinkOldStreamTableTestCase,
-                                             PyFlinkOldBatchTableTestCase)
+                                             PyFlinkBlinkBatchTableTestCase)
 
 
 class DependencyTests(object):
@@ -67,35 +65,6 @@ class DependencyTests(object):
         self.assert_equals(actual, ["+I[3, 1]", "+I[4, 2]", "+I[5, 3]"])
 
 
-class FlinkStreamDependencyTests(DependencyTests, PyFlinkOldStreamTableTestCase):
-
-    pass
-
-
-class FlinkBatchDependencyTests(PyFlinkOldBatchTableTestCase):
-
-    def test_add_python_file(self):
-        python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
-        os.mkdir(python_file_dir)
-        python_file_path = os.path.join(python_file_dir, "test_dependency_manage_lib.py")
-        with open(python_file_path, 'w') as f:
-            f.write("def add_two(a):\n    return a + 2")
-        self.t_env.add_python_file(python_file_path)
-
-        def plus_two(i):
-            from test_dependency_manage_lib import add_two
-            return add_two(i)
-
-        self.t_env.create_temporary_system_function(
-            "add_two", udf(plus_two, DataTypes.BIGINT(), DataTypes.BIGINT()))
-
-        t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
-        t = t.select(expr.call('add_two', t.a), t.a)
-
-        result = self.collect(t)
-        self.assertEqual(result, ["+I[3, 1]", "+I[4, 2]", "+I[5, 3]"])
-
-
 class BlinkBatchDependencyTests(DependencyTests, PyFlinkBlinkBatchTableTestCase):
 
     pass
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py
index 92bada3..e15bf7f 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -25,9 +25,7 @@ from pyflink.table.descriptors import (FileSystem, OldCsv, Rowtime, Schema, Kafk
                                        CustomFormatDescriptor)
 from pyflink.table.table_schema import TableSchema
 from pyflink.table.types import DataTypes
-from pyflink.testing.test_case_utils import (PyFlinkTestCase, PyFlinkOldStreamTableTestCase,
-                                             PyFlinkOldBatchTableTestCase,
-                                             _load_specific_flink_module_jars)
+from pyflink.testing.test_case_utils import (PyFlinkTestCase, _load_specific_flink_module_jars)
 
 
 class FileSystemDescriptorTests(PyFlinkTestCase):
@@ -1080,58 +1078,6 @@ class AbstractTableDescriptorTests(object):
             assert lines == '2,Hi,Hello\n' + "3,Hello,Hello\n"
 
 
-class StreamTableDescriptorTests(PyFlinkOldStreamTableTestCase, AbstractTableDescriptorTests):
-
-    def test_in_append_mode(self):
-        descriptor = self.t_env.connect(FileSystem())
-
-        descriptor = descriptor\
-            .with_format(OldCsv())\
-            .in_append_mode()
-
-        properties = descriptor.to_properties()
-        expected = {'update-mode': 'append',
-                    'format.type': 'csv',
-                    'format.property-version': '1',
-                    'connector.property-version': '1',
-                    'connector.type': 'filesystem'}
-        assert properties == expected
-
-    def test_in_retract_mode(self):
-        descriptor = self.t_env.connect(FileSystem())
-
-        descriptor = descriptor \
-            .with_format(OldCsv()) \
-            .in_retract_mode()
-
-        properties = descriptor.to_properties()
-        expected = {'update-mode': 'retract',
-                    'format.type': 'csv',
-                    'format.property-version': '1',
-                    'connector.property-version': '1',
-                    'connector.type': 'filesystem'}
-        assert properties == expected
-
-    def test_in_upsert_mode(self):
-        descriptor = self.t_env.connect(FileSystem())
-
-        descriptor = descriptor \
-            .with_format(OldCsv()) \
-            .in_upsert_mode()
-
-        properties = descriptor.to_properties()
-        expected = {'update-mode': 'upsert',
-                    'format.type': 'csv',
-                    'format.property-version': '1',
-                    'connector.property-version': '1',
-                    'connector.type': 'filesystem'}
-        assert properties == expected
-
-
-class BatchTableDescriptorTests(PyFlinkOldBatchTableTestCase, AbstractTableDescriptorTests):
-    pass
-
-
 if __name__ == '__main__':
     import unittest
 
diff --git a/flink-python/pyflink/table/tests/test_pandas_conversion.py b/flink-python/pyflink/table/tests/test_pandas_conversion.py
index 4f20e24..c9c0986 100644
--- a/flink-python/pyflink/table/tests/test_pandas_conversion.py
+++ b/flink-python/pyflink/table/tests/test_pandas_conversion.py
@@ -24,7 +24,7 @@ from pyflink.common import Row
 from pyflink.table.types import DataTypes
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase, \
-    PyFlinkBlinkStreamTableTestCase, PyFlinkOldStreamTableTestCase
+    PyFlinkBlinkStreamTableTestCase
 
 
 class PandasConversionTestBase(object):
@@ -172,11 +172,6 @@ class PandasConversionITTests(PandasConversionTestBase):
             self.assertTrue(expected_field == result_field)
 
 
-class StreamPandasConversionTests(PandasConversionITTests,
-                                  PyFlinkOldStreamTableTestCase):
-    pass
-
-
 class BlinkBatchPandasConversionTests(PandasConversionTests,
                                       PandasConversionITTests,
                                       PyFlinkBlinkBatchTableTestCase):
diff --git a/flink-python/pyflink/table/tests/test_pandas_udf.py b/flink-python/pyflink/table/tests/test_pandas_udf.py
index 8d3334d..77061ec 100644
--- a/flink-python/pyflink/table/tests/test_pandas_udf.py
+++ b/flink-python/pyflink/table/tests/test_pandas_udf.py
@@ -24,9 +24,8 @@ from pyflink.table import DataTypes
 from pyflink.table.tests.test_udf import SubtractOne
 from pyflink.table.udf import udf
 from pyflink.testing import source_sink_utils
-from pyflink.testing.test_case_utils import PyFlinkOldStreamTableTestCase, \
-    PyFlinkBlinkBatchTableTestCase, PyFlinkBlinkStreamTableTestCase, PyFlinkOldBatchTableTestCase, \
-    PyFlinkTestCase
+from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase, \
+    PyFlinkBlinkStreamTableTestCase, PyFlinkTestCase
 
 
 class PandasUDFTests(PyFlinkTestCase):
@@ -307,9 +306,6 @@ class PandasUDFITTests(object):
         with self.assertRaisesRegex(Py4JJavaError, expected_regex=msg):
             t.select(result_type_not_series(t.a)).to_pandas()
 
-
-class BlinkPandasUDFITTests(object):
-
     def test_data_types_only_supported_in_blink_planner(self):
         import pandas as pd
 
@@ -342,34 +338,12 @@ class BlinkPandasUDFITTests(object):
         self.assert_equals(actual, ["+I[1970-01-02T00:00:00.123Z]"])
 
 
-class StreamPandasUDFITTests(PandasUDFITTests,
-                             PyFlinkOldStreamTableTestCase):
-    pass
-
-
-class BatchPandasUDFITTests(PyFlinkOldBatchTableTestCase):
-
-    def test_basic_functionality(self):
-        add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT(), func_type="pandas")
-
-        # general Python UDF
-        subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
-
-        t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
-        t = t.where(add_one(t.b) <= 3) \
-            .select(t.a, t.b + 1, add(t.a + 1, subtract_one(t.c)) + 2, add(add_one(t.a), 1))
-        result = self.collect(t)
-        self.assert_equals(result, ["+I[1, 3, 6, 3]", "+I[3, 2, 14, 5]"])
-
-
 class BlinkBatchPandasUDFITTests(PandasUDFITTests,
-                                 BlinkPandasUDFITTests,
                                  PyFlinkBlinkBatchTableTestCase):
     pass
 
 
 class BlinkStreamPandasUDFITTests(PandasUDFITTests,
-                                  BlinkPandasUDFITTests,
                                   PyFlinkBlinkStreamTableTestCase):
     pass
 
diff --git a/flink-python/pyflink/table/tests/test_set_operation.py b/flink-python/pyflink/table/tests/test_set_operation.py
index 51da302..c25ec59 100644
--- a/flink-python/pyflink/table/tests/test_set_operation.py
+++ b/flink-python/pyflink/table/tests/test_set_operation.py
@@ -16,10 +16,10 @@
 # # limitations under the License.
 ################################################################################
 
-from pyflink.testing.test_case_utils import PyFlinkOldBatchTableTestCase
+from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase
 
 
-class StreamTableSetOperationTests(PyFlinkOldBatchTableTestCase):
+class StreamTableSetOperationTests(PyFlinkBlinkBatchTableTestCase):
 
     data1 = [(1, "Hi", "Hello")]
     data2 = [(3, "Hello", "Hello")]
diff --git a/flink-python/pyflink/table/tests/test_shell_example.py b/flink-python/pyflink/table/tests/test_shell_example.py
index 03dd197..60dae68 100644
--- a/flink-python/pyflink/table/tests/test_shell_example.py
+++ b/flink-python/pyflink/table/tests/test_shell_example.py
@@ -23,40 +23,6 @@ class ShellExampleTests(PyFlinkTestCase):
     If these tests failed, please fix these examples code and copy them to shell.py
     """
 
-    def test_batch_case(self):
-        from pyflink.shell import b_env, bt_env, FileSystem, OldCsv, DataTypes, Schema
-        # example begin
-
-        import tempfile
-        import os
-        import shutil
-        sink_path = tempfile.gettempdir() + '/batch.csv'
-        if os.path.exists(sink_path):
-            if os.path.isfile(sink_path):
-                os.remove(sink_path)
-            else:
-                shutil.rmtree(sink_path)
-        b_env.set_parallelism(1)
-        t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
-        bt_env.connect(FileSystem().path(sink_path))\
-            .with_format(OldCsv()
-                         .field_delimiter(',')
-                         .field("a", DataTypes.BIGINT())
-                         .field("b", DataTypes.STRING())
-                         .field("c", DataTypes.STRING()))\
-            .with_schema(Schema()
-                         .field("a", DataTypes.BIGINT())
-                         .field("b", DataTypes.STRING())
-                         .field("c", DataTypes.STRING()))\
-            .create_temporary_table("batch_sink")
-
-        t.select("a + 1, b, c").execute_insert("batch_sink").wait()
-
-        # verify code, do not copy these code to shell.py
-        with open(sink_path, 'r') as f:
-            lines = f.read()
-            self.assertEqual(lines, '2,hi,hello\n' + '3,hi,hello\n')
-
     def test_stream_case(self):
         from pyflink.shell import s_env, st_env, FileSystem, OldCsv, DataTypes, Schema
         # example begin
diff --git a/flink-python/pyflink/table/tests/test_sort.py b/flink-python/pyflink/table/tests/test_sort.py
index d0b787c..3e78689 100644
--- a/flink-python/pyflink/table/tests/test_sort.py
+++ b/flink-python/pyflink/table/tests/test_sort.py
@@ -16,10 +16,10 @@
 # limitations under the License.
 ################################################################################
 
-from pyflink.testing.test_case_utils import PyFlinkOldBatchTableTestCase
+from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase
 
 
-class BatchTableSortTests(PyFlinkOldBatchTableTestCase):
+class BatchTableSortTests(PyFlinkBlinkBatchTableTestCase):
 
     def test_order_by_offset_fetch(self):
         t = self.t_env.from_elements([(1, "Hello")], ["a", "b"])
diff --git a/flink-python/pyflink/table/tests/test_sql.py b/flink-python/pyflink/table/tests/test_sql.py
index 5a102ac..27ebc87 100644
--- a/flink-python/pyflink/table/tests/test_sql.py
+++ b/flink-python/pyflink/table/tests/test_sql.py
@@ -24,7 +24,7 @@ from pyflink.java_gateway import get_gateway
 from pyflink.table import DataTypes, ResultKind
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkBlinkStreamTableTestCase, \
-    PyFlinkOldBatchTableTestCase, PyFlinkTestCase
+    PyFlinkTestCase
 
 
 class StreamSqlTests(PyFlinkBlinkStreamTableTestCase):
@@ -110,16 +110,6 @@ class StreamSqlTests(PyFlinkBlinkStreamTableTestCase):
         self.assert_equals(actual, expected)
 
 
-class BatchSqlTests(PyFlinkOldBatchTableTestCase):
-
-    def test_sql_ddl(self):
-        self.t_env.execute_sql("create temporary function func1 as "
-                               "'pyflink.table.tests.test_udf.add' language python")
-        table = self.t_env.from_elements([(1, 2)]).alias("a, b").select("func1(a, b)")
-        plan = table.explain()
-        self.assertTrue(plan.find("DataSetPythonCalc(select=[add(f0, f1) AS _c0])") >= 0)
-
-
 class JavaSqlTests(PyFlinkTestCase):
     """
     We need to start these Java tests from python process to make sure that Python environment is
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 3774178..27cb89a 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -17,33 +17,25 @@
 ################################################################################
 import datetime
 import decimal
-import glob
 import os
-import pathlib
 import sys
 from py4j.protocol import Py4JJavaError
 from pyflink.table.udf import udf
 
 from pyflink.common import RowKind
 from pyflink.common.typeinfo import Types
-from pyflink.dataset import ExecutionEnvironment
-from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
-from pyflink.find_flink_home import _find_flink_source_root
 from pyflink.java_gateway import get_gateway
 from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment, EnvironmentSettings, \
     Module, ResultKind, ModuleEntry
 from pyflink.table.descriptors import FileSystem, OldCsv, Schema
 from pyflink.table.explain_detail import ExplainDetail
 from pyflink.table.expressions import col
-from pyflink.table.table_config import TableConfig
-from pyflink.table.table_environment import BatchTableEnvironment
 from pyflink.table.types import RowType, Row
 from pyflink.testing import source_sink_utils
-from pyflink.testing.test_case_utils import PyFlinkOldStreamTableTestCase, \
-    PyFlinkOldBatchTableTestCase, PyFlinkBlinkBatchTableTestCase, PyFlinkBlinkStreamTableTestCase, \
-    PyFlinkLegacyBlinkBatchTableTestCase, PyFlinkLegacyFlinkStreamTableTestCase, \
-    PyFlinkLegacyBlinkStreamTableTestCase, _load_specific_flink_module_jars
+from pyflink.testing.test_case_utils import \
+    PyFlinkBlinkBatchTableTestCase, PyFlinkBlinkStreamTableTestCase, \
+    _load_specific_flink_module_jars
 from pyflink.util.java_utils import get_j_env_configuration
 
 
@@ -201,495 +193,6 @@ class TableEnvironmentTest(object):
         self.assert_equals(actual, expected)
 
 
-class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkOldStreamTableTestCase):
-
-    def test_register_table_source_from_path(self):
-        t_env = self.t_env
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        source_path = os.path.join(self.tempdir + '/streaming.csv')
-        csv_source = self.prepare_csv_source(source_path, [], field_types, field_names)
-        t_env.register_table_source("Source", csv_source)
-
-        result = t_env.from_path("Source")
-        self.assertEqual(
-            'CatalogTable: (identifier: [`default_catalog`.`default_database`.`Source`]'
-            ', fields: [a, b, c])',
-            result._j_table.getQueryOperation().asSummaryString())
-
-    def test_register_table_sink(self):
-        t_env = self.t_env
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "Sinks",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-
-        t_env.from_elements([(1, "Hi", "Hello")], ["a", "b", "c"]).execute_insert("Sinks").wait()
-
-        actual = source_sink_utils.results()
-
-        expected = ['+I[1, Hi, Hello]']
-        self.assert_equals(actual, expected)
-
-    def test_from_table_source(self):
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        source_path = os.path.join(self.tempdir + '/streaming.csv')
-        csv_source = self.prepare_csv_source(source_path, [], field_types, field_names)
-
-        result = self.t_env.from_table_source(csv_source)
-        self.assertEqual(
-            'TableSource: (fields: [a, b, c])',
-            result._j_table.getQueryOperation().asSummaryString())
-
-    def test_list_tables(self):
-        source_path = os.path.join(self.tempdir + '/streaming.csv')
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
-        data = []
-        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
-        t_env = self.t_env
-        t_env.register_table_source("Orders", csv_source)
-        t_env.register_table_sink(
-            "Sinks",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-        t_env.register_table_sink(
-            "Results",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-
-        actual = t_env.list_tables()
-
-        expected = ['Orders', 'Results', 'Sinks']
-        self.assert_equals(actual, expected)
-
-    def test_temporary_views(self):
-        t_env = self.t_env
-        t_env.create_temporary_view(
-            "temporary_view_1",
-            t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c']))
-        t_env.create_temporary_view(
-            "temporary_view_2",
-            t_env.from_elements([(1, 'Hi')], ['a', 'b']))
-
-        actual = t_env.list_temporary_views()
-        expected = ['temporary_view_1', 'temporary_view_2']
-        self.assert_equals(actual, expected)
-
-        t_env.drop_temporary_view("temporary_view_1")
-        actual = t_env.list_temporary_views()
-        expected = ['temporary_view_2']
-        self.assert_equals(actual, expected)
-
-    def test_from_path(self):
-        t_env = self.t_env
-        t_env.create_temporary_view(
-            "temporary_view_1",
-            t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c']))
-        result = t_env.from_path("temporary_view_1")
-        self.assertEqual(
-            'CatalogTable: (identifier: [`default_catalog`.`default_database`.`temporary_view_1`]'
-            ', fields: [a, b, c])',
-            result._j_table.getQueryOperation().asSummaryString())
-
-    def test_insert_into(self):
-        t_env = self.t_env
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "Sinks",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-
-        t_env.from_elements([(1, "Hi", "Hello")], ["a", "b", "c"]).execute_insert("Sinks").wait()
-
-        actual = source_sink_utils.results()
-        expected = ['+I[1, Hi, Hello]']
-        self.assert_equals(actual, expected)
-
-    def test_statement_set(self):
-        t_env = self.t_env
-        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "sink1",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-        t_env.register_table_sink(
-            "sink2",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-
-        stmt_set = t_env.create_statement_set()
-
-        stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source) \
-            .add_insert("sink2", source.filter("a < 100"), False)
-
-        actual = stmt_set.explain(ExplainDetail.CHANGELOG_MODE)
-        assert isinstance(actual, str)
-
-    def test_explain_with_multi_sinks(self):
-        t_env = self.t_env
-        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "sink1",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-        t_env.register_table_sink(
-            "sink2",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-
-        stmt_set = t_env.create_statement_set()
-        stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source)
-        stmt_set.add_insert_sql("insert into sink2 select * from %s where a < 100" % source)
-
-        actual = stmt_set.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE)
-        assert isinstance(actual, str)
-
-    def test_explain_sql_without_explain_detail(self):
-        t_env = self.t_env
-        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "sinks",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-
-        result = t_env.explain_sql("select a + 1, b, c from %s" % source)
-
-        assert isinstance(result, str)
-
-    def test_explain_sql_with_explain_detail(self):
-        t_env = self.t_env
-        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "sinks",
-            source_sink_utils.TestAppendSink(field_names, field_types))
-
-        result = t_env.explain_sql(
-            "select a + 1, b, c from %s" % source, ExplainDetail.CHANGELOG_MODE)
-
-        assert isinstance(result, str)
-
-    def test_create_table_environment(self):
-        table_config = TableConfig()
-        table_config.set_max_generated_code_length(32000)
-        table_config.set_null_check(False)
-        table_config.set_local_timezone("Asia/Shanghai")
-
-        env = StreamExecutionEnvironment.get_execution_environment()
-        t_env = StreamTableEnvironment.create(env, table_config)
-
-        readed_table_config = t_env.get_config()
-
-        self.assertFalse(readed_table_config.get_null_check())
-        self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000)
-        self.assertEqual(readed_table_config.get_local_timezone(), "Asia/Shanghai")
-
-    def test_create_table_environment_with_blink_planner(self):
-        t_env = StreamTableEnvironment.create(
-            StreamExecutionEnvironment.get_execution_environment(),
-            environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
-
-        planner = t_env._j_tenv.getPlanner()
-
-        self.assertEqual(
-            planner.getClass().getName(),
-            "org.apache.flink.table.planner.delegation.StreamPlanner")
-
-        t_env = StreamTableEnvironment.create(
-            environment_settings=EnvironmentSettings.new_instance().build())
-
-        planner = t_env._j_tenv.getPlanner()
-
-        self.assertEqual(
-            planner.getClass().getName(),
-            "org.apache.flink.table.planner.delegation.StreamPlanner")
-
-        t_env = StreamTableEnvironment.create(
-            environment_settings=EnvironmentSettings.new_instance().use_old_planner().build())
-
-        planner = t_env._j_tenv.getPlanner()
-
-        self.assertEqual(
-            planner.getClass().getName(),
-            "org.apache.flink.table.planner.StreamPlanner")
-
-    def test_table_environment_with_blink_planner(self):
-        env = StreamExecutionEnvironment.get_execution_environment()
-        env.set_parallelism(1)
-        t_env = StreamTableEnvironment.create(
-            env,
-            environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
-
-        source_path = os.path.join(self.tempdir + '/streaming.csv')
-        sink_path = os.path.join(self.tempdir + '/result.csv')
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
-        data = [(1, 'hi', 'hello'), (2, 'hello', 'hello')]
-        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
-
-        t_env.register_table_source("source", csv_source)
-
-        t_env.register_table_sink(
-            "sink",
-            CsvTableSink(field_names, field_types, sink_path))
-        source = t_env.from_path("source")
-
-        result = source.alias("a, b, c").select("1 + a, b, c")
-
-        result.execute_insert("sink").wait()
-
-        results = []
-        with open(sink_path, 'r') as f:
-            results.append(f.readline())
-            results.append(f.readline())
-
-        self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
-
-    def test_collect_null_value_result(self):
-        element_data = [(1, None, 'a'),
-                        (3, 4, 'b'),
-                        (5, None, 'a'),
-                        (7, 8, 'b')]
-        source = self.t_env.from_elements(element_data,
-                                          DataTypes.ROW([DataTypes.FIELD('a', DataTypes.INT()),
-                                                         DataTypes.FIELD('b', DataTypes.INT()),
-                                                         DataTypes.FIELD('c', DataTypes.STRING())]))
-        table_result = source.execute()
-        expected_result = [Row(1, None, 'a'), Row(3, 4, 'b'), Row(5, None, 'a'),
-                           Row(7, 8, 'b')]
-        with table_result.collect() as results:
-            collected_result = []
-            for result in results:
-                collected_result.append(result)
-            self.assertEqual(collected_result, expected_result)
-
-    def test_set_jars(self):
-        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_t_env)
-
-    def test_set_jars_with_execute_sql(self):
-        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_execute_sql)
-
-    def test_set_jars_with_statement_set(self):
-        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_statement_set)
-
-    def test_set_jars_with_table(self):
-        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_table)
-
-    def test_set_jars_with_table_execute_insert(self):
-        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_table_execute_insert)
-
-    def test_set_jars_with_table_to_pandas(self):
-        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_table_to_pandas)
-
-    def test_set_classpaths(self):
-        self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_t_env)
-
-    def test_set_classpaths_with_execute_sql(self):
-        self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_execute_sql)
-
-    def test_set_classpaths_with_statement_set(self):
-        self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_statement_set)
-
-    def test_set_classpaths_with_table(self):
-        self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_table)
-
-    def test_set_classpaths_with_table_execute_insert(self):
-        self.verify_set_java_dependencies(
-            "pipeline.classpaths", self.execute_with_table_execute_insert)
-
-    def test_set_classpaths_with_table_to_pandas(self):
-        self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_table_to_pandas)
-
-    def execute_with_t_env(self, t_env):
-        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
-        source.select("func1(a, b), func2(a, b)").execute_insert("sink").wait()
-        actual = source_sink_utils.results()
-        expected = ['+I[1 and Hi, 1 or Hi]', '+I[2 and Hello, 2 or Hello]']
-        self.assert_equals(actual, expected)
-
-    @staticmethod
-    def execute_with_execute_sql(t_env):
-        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
-        t_env.create_temporary_view("source", source)
-        t_env.execute_sql("select func1(a, b), func2(a, b) from source") \
-            .get_job_client() \
-            .get_job_execution_result() \
-            .result()
-
-    def execute_with_statement_set(self, t_env):
-        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
-        result = source.select("func1(a, b), func2(a, b)")
-        t_env.create_statement_set().add_insert("sink", result).execute() \
-            .get_job_client() \
-            .get_job_execution_result() \
-            .result()
-        actual = source_sink_utils.results()
-        expected = ['+I[1 and Hi, 1 or Hi]', '+I[2 and Hello, 2 or Hello]']
-        self.assert_equals(actual, expected)
-
-    @staticmethod
-    def execute_with_table(t_env):
-        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
-        result = source.select("func1(a, b), func2(a, b)")
-        result.execute() \
-            .get_job_client() \
-            .get_job_execution_result() \
-            .result()
-
-    def execute_with_table_execute_insert(self, t_env):
-        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
-        result = source.select("func1(a, b), func2(a, b)")
-        result.execute_insert("sink").wait()
-        actual = source_sink_utils.results()
-        expected = ['+I[1 and Hi, 1 or Hi]', '+I[2 and Hello, 2 or Hello]']
-        expected = ['+I[1 and Hi, 1 or Hi]', '+I[2 and Hello, 2 or Hello]']
-        self.assert_equals(actual, expected)
-
-    @staticmethod
-    def execute_with_table_to_pandas(t_env):
-        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
-        result = source.select("func1(a, b), func2(a, b)")
-        result.to_pandas()
-
-    def verify_set_java_dependencies(self, config_key, executor):
-        original_class_loader = \
-            get_gateway().jvm.Thread.currentThread().getContextClassLoader()
-        try:
-            jar_urls = []
-            func1_class_name = "org.apache.flink.python.util.TestScalarFunction1"
-            func2_class_name = "org.apache.flink.python.util.TestScalarFunction2"
-            func1_jar_pattern = "flink-python/target/artifacts/testUdf1.jar"
-            func2_jar_pattern = "flink-python/target/artifacts/testUdf2.jar"
-            self.ensure_jar_not_loaded(func1_class_name, func1_jar_pattern)
-            self.ensure_jar_not_loaded(func2_class_name, func2_jar_pattern)
-            jar_urls.extend(self.get_jar_url(func1_jar_pattern))
-            jar_urls.extend(self.get_jar_url(func2_jar_pattern))
-
-            # test set the "pipeline.jars" multiple times
-            self.t_env.get_config().get_configuration().set_string(config_key, ";".join(jar_urls))
-            first_class_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader()
-
-            self.t_env.get_config().get_configuration().set_string(config_key, jar_urls[0])
-            self.t_env.get_config().get_configuration().set_string(config_key, ";".join(jar_urls))
-            second_class_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader()
-
-            self.assertEqual(first_class_loader, second_class_loader)
-
-            self.t_env.register_java_function("func1", func1_class_name)
-            self.t_env.register_java_function("func2", func2_class_name)
-            table_sink = source_sink_utils.TestAppendSink(
-                ["a", "b"], [DataTypes.STRING(), DataTypes.STRING()])
-            self.t_env.register_table_sink("sink", table_sink)
-
-            executor(self.t_env)
-        finally:
-            get_gateway().jvm.Thread.currentThread().setContextClassLoader(original_class_loader)
-
-    def ensure_jar_not_loaded(self, func_class_name, jar_filename_pattern):
-        test_jars = glob.glob(os.path.join(_find_flink_source_root(), jar_filename_pattern))
-        if not test_jars:
-            self.fail("'%s' is not available. Please compile the test jars first."
-                      % jar_filename_pattern)
-        try:
-            self.t_env.register_java_function("func", func_class_name)
-        except Py4JJavaError:
-            pass
-        else:
-            self.fail("The scalar function '%s' should not be able to be loaded. Please remove "
-                      "the '%s' from the classpath of the PythonGatewayServer process." %
-                      (func_class_name, jar_filename_pattern))
-
-    @staticmethod
-    def get_jar_url(jar_filename_pattern):
-        test_jars = glob.glob(os.path.join(_find_flink_source_root(), jar_filename_pattern))
-        return [pathlib.Path(jar_path).as_uri() for jar_path in test_jars]
-
-    def test_collect_for_all_data_types(self):
-        expected_result = [Row(1, None, 1, True, 32767, -2147483648, 1.23,
-                               1.98932, bytearray(b'pyflink'), 'pyflink',
-                               datetime.date(2014, 9, 13), datetime.time(12, 0),
-                               datetime.datetime(2018, 3, 11, 3, 0, 0, 123000),
-                               [Row(['[pyflink]']), Row(['[pyflink]']),
-                                Row(['[pyflink]'])], {1: Row(['[flink]']), 2: Row(['[pyflink]'])},
-                               decimal.Decimal('1000000000000000000.05'),
-                               decimal.Decimal(
-                                   '1000000000000000000.05999999999999999899999999999'))]
-        source = self.t_env.from_elements([(1, None, 1, True, 32767, -2147483648, 1.23, 1.98932,
-                                            bytearray(b'pyflink'), 'pyflink',
-                                            datetime.date(2014, 9, 13),
-                                            datetime.time(hour=12, minute=0, second=0,
-                                                          microsecond=123000),
-                                            datetime.datetime(2018, 3, 11, 3, 0, 0, 123000),
-                                            [Row(['pyflink']), Row(['pyflink']), Row(['pyflink'])],
-                                            {1: Row(['flink']), 2: Row(['pyflink'])},
-                                            decimal.Decimal('1000000000000000000.05'),
-                                            decimal.Decimal(
-                                                '1000000000000000000.0599999999999999989'
-                                                '9999999999'))],
-                                          DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT()),
-                                                         DataTypes.FIELD("b", DataTypes.BIGINT()),
-                                                         DataTypes.FIELD("c", DataTypes.TINYINT()),
-                                                         DataTypes.FIELD("d", DataTypes.BOOLEAN()),
-                                                         DataTypes.FIELD("e", DataTypes.SMALLINT()),
-                                                         DataTypes.FIELD("f", DataTypes.INT()),
-                                                         DataTypes.FIELD("g", DataTypes.FLOAT()),
-                                                         DataTypes.FIELD("h", DataTypes.DOUBLE()),
-                                                         DataTypes.FIELD("i", DataTypes.BYTES()),
-                                                         DataTypes.FIELD("j", DataTypes.STRING()),
-                                                         DataTypes.FIELD("k", DataTypes.DATE()),
-                                                         DataTypes.FIELD("l", DataTypes.TIME()),
-                                                         DataTypes.FIELD("m",
-                                                                         DataTypes.TIMESTAMP(3)),
-                                                         DataTypes.FIELD("n", DataTypes.ARRAY(
-                                                             DataTypes.ROW([DataTypes.FIELD('ss2',
-                                                                            DataTypes.STRING())]))),
-                                                         DataTypes.FIELD("o", DataTypes.MAP(
-                                                             DataTypes.BIGINT(), DataTypes.ROW(
-                                                                 [DataTypes.FIELD('ss',
-                                                                  DataTypes.STRING())]))),
-                                                         DataTypes.FIELD("p",
-                                                                         DataTypes.DECIMAL(38, 18)),
-                                                         DataTypes.FIELD("q",
-                                                                         DataTypes.DECIMAL(38,
-                                                                                           18))]))
-        table_result = source.execute()
-        with table_result.collect() as result:
-            collected_result = []
-            for i in result:
-                collected_result.append(i)
-            self.assertEqual(expected_result, collected_result)
-
-    def test_collect_with_retract(self):
-
-        expected_row_kinds = [RowKind.INSERT, RowKind.DELETE, RowKind.INSERT, RowKind.INSERT,
-                              RowKind.DELETE, RowKind.INSERT]
-        element_data = [(1, 2, 'a'),
-                        (3, 4, 'b'),
-                        (5, 6, 'a'),
-                        (7, 8, 'b')]
-        field_names = ['a', 'b', 'c']
-        source = self.t_env.from_elements(element_data, field_names)
-        table_result = self.t_env.execute_sql(
-            "SELECT SUM(a), c FROM %s group by c" % source)
-        with table_result.collect() as result:
-            collected_result = []
-            for i in result:
-                collected_result.append(i)
-
-            collected_result = [str(result) + ',' + str(result.get_row_kind())
-                                for result in collected_result]
-            expected_result = [Row(1, 'a'), Row(1, 'a'), Row(6, 'a'), Row(3, 'b'),
-                               Row(3, 'b'), Row(10, 'b')]
-            for i in range(len(expected_result)):
-                expected_result[i] = str(expected_result[i]) + ',' + str(expected_row_kinds[i])
-            expected_result.sort()
-            collected_result.sort()
-            self.assertEqual(expected_result, collected_result)
-
-
 class DataStreamConversionTestCases(object):
 
     def test_from_data_stream(self):
@@ -755,21 +258,6 @@ class DataStreamConversionTestCases(object):
         self.assertEqual(result, expected)
 
 
-class LegacyBlinkBatchTableEnvironmentTests(TableEnvironmentTest,
-                                            PyFlinkLegacyBlinkBatchTableTestCase):
-    pass
-
-
-class LegacyBlinkStreamTableEnvironmentTests(TableEnvironmentTest, DataStreamConversionTestCases,
-                                             PyFlinkLegacyBlinkStreamTableTestCase):
-    pass
-
-
-class LegacyFlinkStreamTableEnvironmentTests(TableEnvironmentTest, DataStreamConversionTestCases,
-                                             PyFlinkLegacyFlinkStreamTableTestCase):
-    pass
-
-
 class BlinkStreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkBlinkStreamTableTestCase):
 
     def test_collect_with_retract(self):
@@ -841,117 +329,6 @@ class BlinkStreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkBlinkStreamT
             self.assertEqual(expected_result, collected_result)
 
 
-class BatchTableEnvironmentTests(TableEnvironmentTest, PyFlinkOldBatchTableTestCase):
-
-    def test_explain_with_multi_sinks(self):
-        t_env = self.t_env
-        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "sink1",
-            CsvTableSink(field_names, field_types, "path1"))
-        t_env.register_table_sink(
-            "sink2",
-            CsvTableSink(field_names, field_types, "path2"))
-
-        stmt_set = t_env.create_statement_set()
-        stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source)
-        stmt_set.add_insert_sql("insert into sink2 select * from %s where a < 100" % source)
-
-        actual = stmt_set.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE)
-
-        assert isinstance(actual, str)
-
-    def test_statement_set(self):
-        t_env = self.t_env
-        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
-        t_env.register_table_sink(
-            "sink1",
-            CsvTableSink(field_names, field_types, "path1"))
-        t_env.register_table_sink(
-            "sink2",
-            CsvTableSink(field_names, field_types, "path2"))
-
-        stmt_set = t_env.create_statement_set()
-
-        stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source) \
-            .add_insert("sink2", source.filter("a < 100"))
-
-        actual = stmt_set.explain()
-        assert isinstance(actual, str)
-
-    def test_create_table_environment(self):
-        table_config = TableConfig()
-        table_config.set_max_generated_code_length(32000)
-        table_config.set_null_check(False)
-        table_config.set_local_timezone("Asia/Shanghai")
-
-        env = ExecutionEnvironment.get_execution_environment()
-        t_env = BatchTableEnvironment.create(env, table_config)
-
-        readed_table_config = t_env.get_config()
-
-        self.assertFalse(readed_table_config.get_null_check())
-        self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000)
-        self.assertEqual(readed_table_config.get_local_timezone(), "Asia/Shanghai")
-
-    def test_create_table_environment_with_old_planner(self):
-        t_env = BatchTableEnvironment.create(
-            environment_settings=EnvironmentSettings.new_instance().in_batch_mode()
-            .use_old_planner().build())
-        self.assertEqual(
-            t_env._j_tenv.getClass().getName(),
-            "org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl")
-
-    def test_create_table_environment_with_blink_planner(self):
-        t_env = BatchTableEnvironment.create(
-            environment_settings=EnvironmentSettings.new_instance().in_batch_mode()
-            .use_blink_planner().build())
-
-        planner = t_env._j_tenv.getPlanner()
-
-        self.assertEqual(
-            planner.getClass().getName(),
-            "org.apache.flink.table.planner.delegation.BatchPlanner")
-
-    def test_table_environment_with_blink_planner(self):
-        t_env = BatchTableEnvironment.create(
-            environment_settings=EnvironmentSettings.new_instance().in_batch_mode()
-            .use_blink_planner().build())
-
-        source_path = os.path.join(self.tempdir + '/streaming.csv')
-        sink_path = os.path.join(self.tempdir + '/results')
-        field_names = ["a", "b", "c"]
-        field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
-        data = [(1, 'hi', 'hello'), (2, 'hello', 'hello')]
-        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
-
-        t_env.register_table_source("source", csv_source)
-
-        t_env.register_table_sink(
-            "sink",
-            CsvTableSink(field_names, field_types, sink_path))
-        source = t_env.from_path("source")
-
-        result = source.alias("a, b, c").select("1 + a, b, c")
-
-        result.execute_insert("sink").wait()
-
-        results = []
-        for root, dirs, files in os.walk(sink_path):
-            for sub_file in files:
-                with open(os.path.join(root, sub_file), 'r') as f:
-                    line = f.readline()
-                    while line is not None and line != '':
-                        results.append(line)
-                        line = f.readline()
-
-        self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
-
-
 class BlinkBatchTableEnvironmentTests(PyFlinkBlinkBatchTableTestCase):
 
     def test_explain_with_multi_sinks(self):
diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py
index 19cdc5f..c357554 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -24,9 +24,8 @@ import pytz
 from pyflink.table import DataTypes, expressions as expr
 from pyflink.table.udf import ScalarFunction, udf
 from pyflink.testing import source_sink_utils
-from pyflink.testing.test_case_utils import PyFlinkOldStreamTableTestCase, \
-    PyFlinkBlinkStreamTableTestCase, PyFlinkBlinkBatchTableTestCase, \
-    PyFlinkOldBatchTableTestCase
+from pyflink.testing.test_case_utils import PyFlinkBlinkStreamTableTestCase, \
+    PyFlinkBlinkBatchTableTestCase
 
 
 class UserDefinedFunctionTests(object):
@@ -639,24 +638,6 @@ def float_equal(a, b, rel_tol=1e-09, abs_tol=0.0):
     return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)
 
 
-class PyFlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
-                                            PyFlinkOldStreamTableTestCase):
-    pass
-
-
-class PyFlinkBatchUserDefinedFunctionTests(PyFlinkOldBatchTableTestCase):
-
-    def test_chaining_scalar_function(self):
-        add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
-        subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
-
-        t = self.t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
-        t = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
-
-        result = self.collect(t)
-        self.assertEqual(result, ["+I[3, 1, 1]", "+I[7, 2, 1]", "+I[4, 3, 1]"])
-
-
 class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
                                                  PyFlinkBlinkStreamTableTestCase):
     def test_deterministic(self):
diff --git a/flink-python/pyflink/table/tests/test_udtf.py b/flink-python/pyflink/table/tests/test_udtf.py
index 976b2f6..79d66ad 100644
--- a/flink-python/pyflink/table/tests/test_udtf.py
+++ b/flink-python/pyflink/table/tests/test_udtf.py
@@ -20,8 +20,8 @@ import unittest
 from pyflink.table import DataTypes
 from pyflink.table.udf import TableFunction, udtf, ScalarFunction, udf
 from pyflink.testing import source_sink_utils
-from pyflink.testing.test_case_utils import PyFlinkOldStreamTableTestCase, \
-    PyFlinkBlinkStreamTableTestCase, PyFlinkOldBatchTableTestCase, PyFlinkBlinkBatchTableTestCase
+from pyflink.testing.test_case_utils import PyFlinkBlinkStreamTableTestCase, \
+    PyFlinkBlinkBatchTableTestCase
 
 
 class UserDefinedTableFunctionTests(object):
@@ -71,11 +71,6 @@ class UserDefinedTableFunctionTests(object):
         return source_sink_utils.results()
 
 
-class PyFlinkStreamUserDefinedTableFunctionTests(UserDefinedTableFunctionTests,
-                                                 PyFlinkOldStreamTableTestCase):
-    pass
-
-
 class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedTableFunctionTests,
                                                  PyFlinkBlinkStreamTableTestCase):
     def test_execute_from_json_plan(self):
@@ -134,26 +129,6 @@ class PyFlinkBlinkBatchUserDefinedFunctionTests(UserDefinedTableFunctionTests,
     pass
 
 
-class PyFlinkBatchUserDefinedTableFunctionTests(UserDefinedTableFunctionTests,
-                                                PyFlinkOldBatchTableTestCase):
-    def _register_table_sink(self, field_names: list, field_types: list):
-        pass
-
-    def _get_output(self, t):
-        return self.collect(t)
-
-    def test_row_type_as_input_types_and_result_types(self):
-        # test input_types and result_types are DataTypes.ROW
-        a = udtf(lambda i: i,
-                 input_types=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]),
-                 result_types=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]))
-
-        self.assertEqual(a._input_types,
-                         [DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())])])
-        self.assertEqual(a._result_types,
-                         [DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())])])
-
-
 class MultiEmit(TableFunction, unittest.TestCase):
 
     def open(self, function_context):
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
index e086ca4..dd1d56b 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -30,13 +30,10 @@ from py4j.protocol import Py4JJavaError
 
 from pyflink.common import JobExecutionResult
 from pyflink.datastream.execution_mode import RuntimeExecutionMode
-from pyflink.table import TableConfig
 from pyflink.table.sources import CsvTableSource
-from pyflink.dataset.execution_environment import ExecutionEnvironment
 from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
 from pyflink.find_flink_home import _find_flink_home, _find_flink_source_root
-from pyflink.table.table_environment import BatchTableEnvironment, StreamTableEnvironment, \
-    TableEnvironment
+from pyflink.table.table_environment import TableEnvironment
 from pyflink.table.environment_settings import EnvironmentSettings
 from pyflink.java_gateway import get_gateway
 from pyflink.util.java_utils import add_jars_to_context_class_loader, to_jarray
@@ -146,91 +143,6 @@ class PyFlinkTestCase(unittest.TestCase):
         return CsvTableSource(path, fields, data_types)
 
 
-class PyFlinkLegacyBlinkBatchTableTestCase(PyFlinkTestCase):
-    """
-    Base class for pure Blink Batch TableEnvironment tests.
-    """
-
-    def setUp(self):
-        super(PyFlinkLegacyBlinkBatchTableTestCase, self).setUp()
-        self.t_env = BatchTableEnvironment.create(
-            environment_settings=EnvironmentSettings.new_instance()
-            .in_batch_mode().use_blink_planner().build())
-        self.t_env._j_tenv.getPlanner().getExecEnv().setParallelism(2)
-        self.t_env.get_config().get_configuration().set_string(
-            "python.fn-execution.bundle.size", "1")
-
-
-class PyFlinkLegacyBlinkStreamTableTestCase(PyFlinkTestCase):
-    """
-    Base class for pure Blink Batch TableEnvironment tests.
-    """
-
-    def setUp(self):
-        super(PyFlinkLegacyBlinkStreamTableTestCase, self).setUp()
-        self.env = StreamExecutionEnvironment.get_execution_environment()
-        self.env.set_parallelism(2)
-        self.t_env = StreamTableEnvironment.create(
-            self.env,
-            environment_settings=EnvironmentSettings.new_instance()
-                .in_streaming_mode().use_blink_planner().build())
-        self.t_env.get_config().get_configuration().set_string(
-            "python.fn-execution.bundle.size", "1")
-
-
-class PyFlinkLegacyFlinkStreamTableTestCase(PyFlinkTestCase):
-    """
-    Base class for pure Flink Stream TableEnvironment tests.
-    """
-
-    def setUp(self):
-        super(PyFlinkLegacyFlinkStreamTableTestCase, self).setUp()
-        self.env = StreamExecutionEnvironment.get_execution_environment()
-        self.env.set_parallelism(2)
-        self.t_env = StreamTableEnvironment.create(
-            self.env,
-            environment_settings=EnvironmentSettings.new_instance()
-                .in_streaming_mode().use_old_planner().build())
-        self.t_env.get_config().get_configuration().set_string(
-            "python.fn-execution.bundle.size", "1")
-
-
-class PyFlinkOldStreamTableTestCase(PyFlinkTestCase):
-    """
-    Base class for old planner stream tests.
-    """
-
-    def setUp(self):
-        super(PyFlinkOldStreamTableTestCase, self).setUp()
-        self.t_env = TableEnvironment.create(
-            EnvironmentSettings.new_instance().in_streaming_mode().use_old_planner().build())
-        self.t_env.get_config().get_configuration().set_string("parallelism.default", "2")
-        self.t_env.get_config().get_configuration().set_string(
-            "python.fn-execution.bundle.size", "1")
-
-
-class PyFlinkOldBatchTableTestCase(PyFlinkTestCase):
-    """
-    Base class for batch tests.
-    """
-
-    def setUp(self):
-        super(PyFlinkOldBatchTableTestCase, self).setUp()
-        self.env = ExecutionEnvironment.get_execution_environment()
-        self.env.set_parallelism(2)
-        self.t_env = BatchTableEnvironment.create(self.env, TableConfig())
-        self.t_env.get_config().get_configuration().set_string(
-            "python.fn-execution.bundle.size", "1")
-
-    def collect(self, table):
-        j_table = table._j_table
-        gateway = get_gateway()
-        row_result = self.t_env._j_tenv\
-            .toDataSet(j_table, gateway.jvm.Class.forName("org.apache.flink.types.Row")).collect()
-        string_result = [java_row.toString() for java_row in row_result]
-        return string_result
-
-
 class PyFlinkBlinkStreamTableTestCase(PyFlinkTestCase):
     """
     Base class for stream tests of blink planner.
diff --git a/flink-python/setup.py b/flink-python/setup.py
index efa6ffa..94b2019 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -269,7 +269,6 @@ try:
                 'pyflink.table',
                 'pyflink.util',
                 'pyflink.datastream',
-                'pyflink.dataset',
                 'pyflink.common',
                 'pyflink.fn_execution',
                 'pyflink.fn_execution.beam',
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
index f381bf3..1264247 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
@@ -19,22 +19,19 @@
 package org.apache.flink.table.runtime.arrow;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
-import org.apache.flink.table.api.internal.BatchTableEnvImpl;
-import org.apache.flink.table.api.internal.TableEnvImpl;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.api.internal.TableImpl;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.operations.OutputConversionModifyOperation;
-import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.runtime.arrow.readers.ArrayFieldReader;
 import org.apache.flink.table.runtime.arrow.readers.ArrowFieldReader;
 import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader;
@@ -664,7 +661,6 @@ public final class ArrowUtils {
         ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(root, null, baos);
         arrowStreamWriter.start();
 
-        ArrowWriter arrowWriter;
         Iterator<Row> results = table.execute().collect();
         Iterator<Row> appendOnlyResults;
         if (isAppendOnlyTable(table)) {
@@ -673,28 +669,21 @@ public final class ArrowUtils {
             appendOnlyResults = filterOutRetractRows(results);
         }
 
-        Iterator convertedResults;
-        if (isBlinkPlanner(table)) {
-            arrowWriter = createRowDataArrowWriter(root, rowType);
-            convertedResults =
-                    new Iterator<RowData>() {
-                        @Override
-                        public boolean hasNext() {
-                            return appendOnlyResults.hasNext();
-                        }
-
-                        @Override
-                        public RowData next() {
-                            DataFormatConverters.DataFormatConverter converter =
-                                    DataFormatConverters.getConverterForDataType(
-                                            defaultRowDataType);
-                            return (RowData) converter.toInternal(appendOnlyResults.next());
-                        }
-                    };
-        } else {
-            arrowWriter = createRowArrowWriter(root, rowType);
-            convertedResults = appendOnlyResults;
-        }
+        ArrowWriter arrowWriter = createRowDataArrowWriter(root, rowType);
+        Iterator convertedResults =
+                new Iterator<RowData>() {
+                    @Override
+                    public boolean hasNext() {
+                        return appendOnlyResults.hasNext();
+                    }
+
+                    @Override
+                    public RowData next() {
+                        DataFormatConverters.DataFormatConverter converter =
+                                DataFormatConverters.getConverterForDataType(defaultRowDataType);
+                        return (RowData) converter.toInternal(appendOnlyResults.next());
+                    }
+                };
 
         return new CustomIterator<byte[]>() {
             @Override
@@ -750,39 +739,22 @@ public final class ArrowUtils {
         return result.iterator();
     }
 
-    private static boolean isBlinkPlanner(Table table) {
+    private static boolean isStreamingMode(Table table) {
         TableEnvironment tableEnv = ((TableImpl) table).getTableEnvironment();
-        if (tableEnv instanceof TableEnvImpl) {
-            return false;
-        } else if (tableEnv instanceof TableEnvironmentImpl) {
-            Planner planner = ((TableEnvironmentImpl) tableEnv).getPlanner();
-            return planner instanceof PlannerBase;
+        if (tableEnv instanceof TableEnvironmentImpl) {
+            final RuntimeExecutionMode mode =
+                    tableEnv.getConfig().getConfiguration().get(ExecutionOptions.RUNTIME_MODE);
+            if (mode == RuntimeExecutionMode.AUTOMATIC) {
+                throw new RuntimeException(
+                        String.format("Runtime execution mode '%s' is not supported yet.", mode));
+            }
+            return mode == RuntimeExecutionMode.STREAMING;
         } else {
-            throw new RuntimeException(
-                    String.format(
-                            "Could not determine the planner type for table environment class %s.",
-                            tableEnv.getClass()));
-        }
-    }
-
-    private static boolean isStreamingMode(Table table) throws Exception {
-        TableEnvironment tableEnv = ((TableImpl) table).getTableEnvironment();
-        if (tableEnv instanceof BatchTableEnvironment || tableEnv instanceof BatchTableEnvImpl) {
             return false;
-        } else if (tableEnv instanceof TableEnvironmentImpl) {
-            java.lang.reflect.Field isStreamingModeMethod =
-                    TableEnvironmentImpl.class.getDeclaredField("isStreamingMode");
-            isStreamingModeMethod.setAccessible(true);
-            return (boolean) isStreamingModeMethod.get(tableEnv);
-        } else {
-            throw new RuntimeException(
-                    String.format(
-                            "Could not determine the streaming mode for table environment class %s",
-                            tableEnv.getClass()));
         }
     }
 
-    private static boolean isAppendOnlyTable(Table table) throws Exception {
+    private static boolean isAppendOnlyTable(Table table) {
         if (isStreamingMode(table)) {
             TableEnvironmentImpl tableEnv =
                     (TableEnvironmentImpl) ((TableImpl) table).getTableEnvironment();
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonScalarFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonScalarFunctionFlatMap.java
deleted file mode 100644
index a83034d..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonScalarFunctionFlatMap.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.python;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonEnv;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Arrays;
-
-/**
- * The abstract base {@link RichFlatMapFunction} used to invoke Python {@link ScalarFunction}
- * functions for the old planner.
- */
-@Internal
-public abstract class AbstractPythonScalarFunctionFlatMap
-        extends AbstractPythonStatelessFunctionFlatMap {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final String SCALAR_FUNCTION_URN = "flink:transform:scalar_function:v1";
-
-    /** The Python {@link ScalarFunction}s to be executed. */
-    public final PythonFunctionInfo[] scalarFunctions;
-
-    /** The offset of the fields which should be forwarded. */
-    private final int[] forwardedFields;
-
-    public AbstractPythonScalarFunctionFlatMap(
-            Configuration config,
-            PythonFunctionInfo[] scalarFunctions,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            int[] forwardedFields) {
-        super(config, inputType, outputType, udfInputOffsets);
-        this.scalarFunctions = Preconditions.checkNotNull(scalarFunctions);
-        this.forwardedFields = Preconditions.checkNotNull(forwardedFields);
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-
-        RowTypeInfo forwardedInputTypeInfo =
-                new RowTypeInfo(
-                        Arrays.stream(forwardedFields)
-                                .mapToObj(i -> inputType.getFields().get(i))
-                                .map(RowType.RowField::getType)
-                                .map(TypeConversions::fromLogicalToDataType)
-                                .map(TypeConversions::fromDataTypeToLegacyInfo)
-                                .toArray(TypeInformation[]::new));
-        forwardedInputSerializer =
-                forwardedInputTypeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
-    }
-
-    @Override
-    public PythonEnv getPythonEnv() {
-        return scalarFunctions[0].getPythonFunction().getPythonEnv();
-    }
-
-    @Override
-    public void bufferInput(Row input) {
-        Row forwardedFieldsRow = Row.project(input, forwardedFields);
-        if (getRuntimeContext().getExecutionConfig().isObjectReuseEnabled()) {
-            forwardedFieldsRow = forwardedInputSerializer.copy(forwardedFieldsRow);
-        }
-        forwardedInputQueue.add(forwardedFieldsRow);
-    }
-
-    @Override
-    public int getForwardedFieldsCount() {
-        return forwardedFields.length;
-    }
-
-    @Override
-    public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() {
-        FlinkFnApi.UserDefinedFunctions.Builder builder =
-                FlinkFnApi.UserDefinedFunctions.newBuilder();
-        // add udf proto
-        for (PythonFunctionInfo pythonFunctionInfo : scalarFunctions) {
-            builder.addUdfs(PythonOperatorUtils.getUserDefinedFunctionProto(pythonFunctionInfo));
-        }
-        builder.setMetricEnabled(getPythonConfig().isMetricEnabled());
-        return builder.build();
-    }
-
-    @Override
-    public String getFunctionUrn() {
-        return SCALAR_FUNCTION_URN;
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
deleted file mode 100644
index 62ca509..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.python;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.PythonConfig;
-import org.apache.flink.python.PythonFunctionRunner;
-import org.apache.flink.python.PythonOptions;
-import org.apache.flink.python.env.PythonDependencyInfo;
-import org.apache.flink.python.env.PythonEnvironmentManager;
-import org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
-import org.apache.flink.table.functions.python.PythonEnv;
-import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
-import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
-
-/**
- * Base Python stateless {@link RichFlatMapFunction} used to invoke Python stateless functions for
- * the old planner.
- */
-@Internal
-public abstract class AbstractPythonStatelessFunctionFlatMap extends RichFlatMapFunction<Row, Row>
-        implements ResultTypeQueryable<Row> {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOG =
-            LoggerFactory.getLogger(AbstractPythonStatelessFunctionFlatMap.class);
-
-    /** The python config. */
-    private final PythonConfig config;
-
-    /** The offsets of user-defined function inputs. */
-    private final int[] userDefinedFunctionInputOffsets;
-
-    /** The input logical type. */
-    protected final RowType inputType;
-
-    /** The output logical type. */
-    protected final RowType outputType;
-
-    /** The options used to configure the Python worker process. */
-    protected final Map<String, String> jobOptions;
-
-    /** The user-defined function input logical type. */
-    protected transient RowType userDefinedFunctionInputType;
-
-    /** The user-defined function output logical type. */
-    protected transient RowType userDefinedFunctionOutputType;
-
-    /**
-     * The queue holding the input elements for which the execution results have not been received.
-     */
-    protected transient LinkedBlockingQueue<Row> forwardedInputQueue;
-
-    /** Max number of elements to include in a bundle. */
-    private transient int maxBundleSize;
-
-    /** Number of processed elements in the current bundle. */
-    private transient int elementCount;
-
-    /** OutputStream Wrapper. */
-    transient DataOutputViewStreamWrapper baosWrapper;
-
-    /** The collector used to collect records. */
-    protected transient Collector<Row> resultCollector;
-
-    /**
-     * The {@link PythonFunctionRunner} which is responsible for Python user-defined function
-     * execution.
-     */
-    protected transient PythonFunctionRunner pythonFunctionRunner;
-
-    /** Reusable InputStream used to holding the execution results to be deserialized. */
-    protected transient ByteArrayInputStreamWithPos bais;
-
-    /** InputStream Wrapper. */
-    protected transient DataInputViewStreamWrapper baisWrapper;
-
-    /** The type serializer for the forwarded fields. */
-    protected transient TypeSerializer<Row> forwardedInputSerializer;
-
-    /** Reusable OutputStream used to holding the serialized input elements. */
-    protected transient ByteArrayOutputStreamWithPos baos;
-
-    public AbstractPythonStatelessFunctionFlatMap(
-            Configuration config,
-            RowType inputType,
-            RowType outputType,
-            int[] userDefinedFunctionInputOffsets) {
-        this.inputType = Preconditions.checkNotNull(inputType);
-        this.outputType = Preconditions.checkNotNull(outputType);
-        this.userDefinedFunctionInputOffsets =
-                Preconditions.checkNotNull(userDefinedFunctionInputOffsets);
-        this.config = new PythonConfig(Preconditions.checkNotNull(config));
-        this.jobOptions = buildJobOptions(config);
-    }
-
-    protected PythonConfig getPythonConfig() {
-        return config;
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-
-        this.elementCount = 0;
-        this.maxBundleSize = config.getMaxBundleSize();
-        if (this.maxBundleSize <= 0) {
-            this.maxBundleSize = PythonOptions.MAX_BUNDLE_SIZE.defaultValue();
-            LOG.error(
-                    "Invalid value for the maximum bundle size. Using default value of "
-                            + this.maxBundleSize
-                            + '.');
-        } else {
-            LOG.info("The maximum bundle size is configured to {}.", this.maxBundleSize);
-        }
-
-        if (config.getMaxBundleTimeMills() != PythonOptions.MAX_BUNDLE_TIME_MILLS.defaultValue()) {
-            LOG.info(
-                    "Maximum bundle time takes no effect in old planner under batch mode. "
-                            + "Config maximum bundle size instead! "
-                            + "Under batch mode, bundle size should be enough to control both throughput and latency.");
-        }
-        forwardedInputQueue = new LinkedBlockingQueue<>();
-        userDefinedFunctionInputType =
-                new RowType(
-                        Arrays.stream(userDefinedFunctionInputOffsets)
-                                .mapToObj(i -> inputType.getFields().get(i))
-                                .collect(Collectors.toList()));
-
-        bais = new ByteArrayInputStreamWithPos();
-        baisWrapper = new DataInputViewStreamWrapper(bais);
-
-        baos = new ByteArrayOutputStreamWithPos();
-        baosWrapper = new DataOutputViewStreamWrapper(baos);
-
-        userDefinedFunctionOutputType =
-                new RowType(
-                        outputType
-                                .getFields()
-                                .subList(getForwardedFieldsCount(), outputType.getFieldCount()));
-
-        this.pythonFunctionRunner = createPythonFunctionRunner();
-        this.pythonFunctionRunner.open(config);
-    }
-
-    @Override
-    public void flatMap(Row value, Collector<Row> out) throws Exception {
-        this.resultCollector = out;
-        bufferInput(value);
-        processElementInternal(value);
-        checkInvokeFinishBundleByCount();
-        emitResults();
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public TypeInformation<Row> getProducedType() {
-        return (TypeInformation<Row>)
-                LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
-                        LogicalTypeDataTypeConverter.toDataType(outputType));
-    }
-
-    @Override
-    public void close() throws Exception {
-        try {
-            invokeFinishBundle();
-
-            if (pythonFunctionRunner != null) {
-                pythonFunctionRunner.close();
-                pythonFunctionRunner = null;
-            }
-        } finally {
-            super.close();
-        }
-    }
-
-    /** Returns the {@link PythonEnv} used to create PythonEnvironmentManager.. */
-    public abstract PythonEnv getPythonEnv();
-
-    public abstract void bufferInput(Row input);
-
-    public abstract void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception;
-
-    public abstract int getForwardedFieldsCount();
-
-    /** Gets the proto representation of the Python user-defined functions to be executed. */
-    public abstract FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto();
-
-    public abstract String getInputOutputCoderUrn();
-
-    public abstract String getFunctionUrn();
-
-    public abstract void processElementInternal(Row value) throws Exception;
-
-    /** Checks whether to invoke finishBundle by elements count. Called in flatMap. */
-    protected void checkInvokeFinishBundleByCount() throws Exception {
-        elementCount++;
-        if (elementCount >= maxBundleSize) {
-            invokeFinishBundle();
-        }
-    }
-
-    protected PythonEnvironmentManager createPythonEnvironmentManager() throws IOException {
-        PythonDependencyInfo dependencyInfo =
-                PythonDependencyInfo.create(config, getRuntimeContext().getDistributedCache());
-        PythonEnv pythonEnv = getPythonEnv();
-        if (pythonEnv.getExecType() == PythonEnv.ExecType.PROCESS) {
-            return new ProcessPythonEnvironmentManager(
-                    dependencyInfo,
-                    ConfigurationUtils.splitPaths(System.getProperty("java.io.tmpdir")),
-                    System.getenv());
-        } else {
-            throw new UnsupportedOperationException(
-                    String.format(
-                            "Execution type '%s' is not supported.", pythonEnv.getExecType()));
-        }
-    }
-
-    protected FlinkMetricContainer getFlinkMetricContainer() {
-        return this.config.isMetricEnabled()
-                ? new FlinkMetricContainer(getRuntimeContext().getMetricGroup())
-                : null;
-    }
-
-    protected Row getFunctionInput(Row element) {
-        return Row.project(element, userDefinedFunctionInputOffsets);
-    }
-
-    private void emitResults() throws Exception {
-        Tuple2<byte[], Integer> resultTuple;
-        while ((resultTuple = pythonFunctionRunner.pollResult()) != null) {
-            emitResult(resultTuple);
-        }
-    }
-
-    protected void invokeFinishBundle() throws Exception {
-        if (elementCount > 0) {
-            pythonFunctionRunner.flush();
-            elementCount = 0;
-            emitResults();
-        }
-    }
-
-    private PythonFunctionRunner createPythonFunctionRunner() throws IOException {
-        return new BeamTableStatelessPythonFunctionRunner(
-                getRuntimeContext().getTaskName(),
-                createPythonEnvironmentManager(),
-                userDefinedFunctionInputType,
-                userDefinedFunctionOutputType,
-                getFunctionUrn(),
-                getUserDefinedFunctionsProto(),
-                getInputOutputCoderUrn(),
-                jobOptions,
-                getFlinkMetricContainer(),
-                null,
-                0.0,
-                FlinkFnApi.CoderParam.OutputMode.SINGLE);
-    }
-
-    private Map<String, String> buildJobOptions(Configuration config) {
-        Map<String, String> jobOptions = new HashMap<>();
-        if (config.containsKey("table.exec.timezone")) {
-            jobOptions.put("table.exec.timezone", config.getString("table.exec.timezone", null));
-        }
-        return jobOptions;
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java
deleted file mode 100644
index e1fc082..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.python;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-
-/**
- * The {@link RichFlatMapFunction} used to invoke Python {@link ScalarFunction} functions for the
- * old planner.
- */
-@Internal
-public class PythonScalarFunctionFlatMap extends AbstractPythonScalarFunctionFlatMap {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final String SCALAR_FUNCTION_SCHEMA_CODER_URN =
-            "flink:coder:schema:scalar_function:v1";
-
-    /** The TypeSerializer for udf input elements. */
-    private transient TypeSerializer<Row> userDefinedFunctionInputTypeSerializer;
-
-    /** The TypeSerializer for user-defined function execution results. */
-    private transient TypeSerializer<Row> userDefinedFunctionOutputTypeSerializer;
-
-    public PythonScalarFunctionFlatMap(
-            Configuration config,
-            PythonFunctionInfo[] scalarFunctions,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            int[] forwardedFields) {
-        super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-        userDefinedFunctionInputTypeSerializer =
-                PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionInputType);
-        userDefinedFunctionOutputTypeSerializer =
-                PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionOutputType);
-    }
-
-    @Override
-    public String getInputOutputCoderUrn() {
-        return SCALAR_FUNCTION_SCHEMA_CODER_URN;
-    }
-
-    @Override
-    public void processElementInternal(Row value) throws Exception {
-        userDefinedFunctionInputTypeSerializer.serialize(getFunctionInput(value), baosWrapper);
-        pythonFunctionRunner.process(baos.toByteArray());
-        baos.reset();
-    }
-
-    @Override
-    @SuppressWarnings("ConstantConditions")
-    public void emitResult(Tuple2<byte[], Integer> resultTuple) throws IOException {
-        byte[] rawUdfResult = resultTuple.f0;
-        int length = resultTuple.f1;
-        Row input = forwardedInputQueue.poll();
-        bais.setBuffer(rawUdfResult, 0, length);
-        Row udfResult = userDefinedFunctionOutputTypeSerializer.deserialize(baisWrapper);
-        this.resultCollector.collect(Row.join(input, udfResult));
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonTableFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonTableFunctionFlatMap.java
deleted file mode 100644
index fd68938..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonTableFunctionFlatMap.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.python;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.functions.python.PythonEnv;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.calcite.rel.core.JoinRelType;
-
-/**
- * The {@link RichFlatMapFunction} used to invoke Python {@link TableFunction} functions for the old
- * planner.
- */
-@Internal
-public final class PythonTableFunctionFlatMap extends AbstractPythonStatelessFunctionFlatMap {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final String TABLE_FUNCTION_SCHEMA_CODER_URN =
-            "flink:coder:schema:table_function:v1";
-
-    private static final String TABLE_FUNCTION_URN = "flink:transform:table_function:v1";
-
-    /** The Python {@link TableFunction} to be executed. */
-    private final PythonFunctionInfo tableFunction;
-
-    /** The correlate join type. */
-    private final JoinRelType joinType;
-
-    /** The TypeSerializer for udf input elements. */
-    private transient TypeSerializer<Row> userDefinedFunctionInputTypeSerializer;
-
-    /** The TypeSerializer for user-defined function execution results. */
-    private transient TypeSerializer<Row> userDefinedFunctionOutputTypeSerializer;
-
-    public PythonTableFunctionFlatMap(
-            Configuration config,
-            PythonFunctionInfo tableFunction,
-            RowType inputType,
-            RowType outputType,
-            int[] udtfInputOffsets,
-            JoinRelType joinType) {
-        super(config, inputType, outputType, udtfInputOffsets);
-        this.tableFunction = Preconditions.checkNotNull(tableFunction);
-        Preconditions.checkArgument(
-                joinType == JoinRelType.INNER || joinType == JoinRelType.LEFT,
-                "The join type should be inner join or left join");
-        this.joinType = joinType;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-
-        RowTypeInfo forwardedInputTypeInfo =
-                (RowTypeInfo)
-                        TypeConversions.fromDataTypeToLegacyInfo(
-                                TypeConversions.fromLogicalToDataType(inputType));
-        forwardedInputSerializer =
-                forwardedInputTypeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
-        userDefinedFunctionInputTypeSerializer =
-                PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionInputType);
-        userDefinedFunctionOutputTypeSerializer =
-                PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionOutputType);
-    }
-
-    @Override
-    public PythonEnv getPythonEnv() {
-        return tableFunction.getPythonFunction().getPythonEnv();
-    }
-
-    @Override
-    public void bufferInput(Row input) {
-        // If the input node is a DataSetCalc node, the RichFlatMapFunction generated by codegen
-        // will reuse the output Row, so here we always copy the input Row to solve this problem.
-        input = forwardedInputSerializer.copy(input);
-        forwardedInputQueue.add(input);
-    }
-
-    @Override
-    @SuppressWarnings("ConstantConditions")
-    public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
-        Row input = forwardedInputQueue.poll();
-        byte[] rawUdtfResult;
-        int length;
-        boolean isFinishResult;
-        boolean hasJoined = false;
-        Row udtfResult;
-        do {
-            rawUdtfResult = resultTuple.f0;
-            length = resultTuple.f1;
-            isFinishResult = isFinishResult(rawUdtfResult, length);
-            if (!isFinishResult) {
-                bais.setBuffer(rawUdtfResult, 0, length);
-                udtfResult = userDefinedFunctionOutputTypeSerializer.deserialize(baisWrapper);
-                this.resultCollector.collect(Row.join(input, udtfResult));
-                resultTuple = pythonFunctionRunner.pollResult();
-                hasJoined = true;
-            } else if (joinType == JoinRelType.LEFT && !hasJoined) {
-                udtfResult = new Row(userDefinedFunctionOutputType.getFieldCount());
-                for (int i = 0; i < udtfResult.getArity(); i++) {
-                    udtfResult.setField(0, null);
-                }
-                this.resultCollector.collect(Row.join(input, udtfResult));
-            }
-        } while (!isFinishResult);
-    }
-
-    @Override
-    public int getForwardedFieldsCount() {
-        return inputType.getFieldCount();
-    }
-
-    @Override
-    public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() {
-        FlinkFnApi.UserDefinedFunctions.Builder builder =
-                FlinkFnApi.UserDefinedFunctions.newBuilder();
-        builder.addUdfs(PythonOperatorUtils.getUserDefinedFunctionProto(tableFunction));
-        builder.setMetricEnabled(getPythonConfig().isMetricEnabled());
-        return builder.build();
-    }
-
-    @Override
-    public String getInputOutputCoderUrn() {
-        return TABLE_FUNCTION_SCHEMA_CODER_URN;
-    }
-
-    @Override
-    public String getFunctionUrn() {
-        return TABLE_FUNCTION_URN;
-    }
-
-    @Override
-    public void processElementInternal(Row value) throws Exception {
-        userDefinedFunctionInputTypeSerializer.serialize(getFunctionInput(value), baosWrapper);
-        pythonFunctionRunner.process(baos.toByteArray());
-        baos.reset();
-    }
-
-    private boolean isFinishResult(byte[] rawUdtfResult, int length) {
-        return length == 1 && rawUdtfResult[0] == 0x00;
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/arrow/ArrowPythonScalarFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/arrow/ArrowPythonScalarFunctionFlatMap.java
deleted file mode 100644
index d7253c5..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/arrow/ArrowPythonScalarFunctionFlatMap.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.python.arrow;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
-import org.apache.flink.table.runtime.arrow.serializers.RowArrowSerializer;
-import org.apache.flink.table.runtime.functions.python.AbstractPythonScalarFunctionFlatMap;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-
-/**
- * The {@link RichFlatMapFunction} used to invoke Arrow Python {@link ScalarFunction} functions for
- * the old planner.
- */
-@Internal
-public final class ArrowPythonScalarFunctionFlatMap extends AbstractPythonScalarFunctionFlatMap {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final String SCHEMA_ARROW_CODER_URN = "flink:coder:schema:arrow:v1";
-
-    /** The current number of elements to be included in an arrow batch. */
-    private transient int currentBatchCount;
-
-    /** Max number of elements to include in an arrow batch. */
-    private final int maxArrowBatchSize;
-
-    private transient ArrowSerializer<Row> arrowSerializer;
-
-    public ArrowPythonScalarFunctionFlatMap(
-            Configuration config,
-            PythonFunctionInfo[] scalarFunctions,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            int[] forwardedFields) {
-        super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-        maxArrowBatchSize = getPythonConfig().getMaxArrowBatchSize();
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-
-        arrowSerializer =
-                new RowArrowSerializer(userDefinedFunctionInputType, userDefinedFunctionOutputType);
-        arrowSerializer.open(bais, baos);
-        currentBatchCount = 0;
-    }
-
-    @Override
-    public void close() throws Exception {
-        invokeCurrentBatch();
-        try {
-            super.close();
-        } finally {
-            if (arrowSerializer != null) {
-                arrowSerializer.close();
-                arrowSerializer = null;
-            }
-        }
-    }
-
-    @Override
-    @SuppressWarnings("ConstantConditions")
-    public void emitResult(Tuple2<byte[], Integer> resultTuple) throws IOException {
-        byte[] udfResult = resultTuple.f0;
-        int length = resultTuple.f1;
-        bais.setBuffer(udfResult, 0, length);
-        int rowCount = arrowSerializer.load();
-        for (int i = 0; i < rowCount; i++) {
-            resultCollector.collect(Row.join(forwardedInputQueue.poll(), arrowSerializer.read(i)));
-        }
-        arrowSerializer.resetReader();
-    }
-
-    @Override
-    public String getInputOutputCoderUrn() {
-        return SCHEMA_ARROW_CODER_URN;
-    }
-
-    @Override
-    public void processElementInternal(Row value) throws Exception {
-        arrowSerializer.write(getFunctionInput(value));
-        currentBatchCount++;
-        if (currentBatchCount >= maxArrowBatchSize) {
-            invokeCurrentBatch();
-        }
-    }
-
-    @Override
-    protected void invokeFinishBundle() throws Exception {
-        invokeCurrentBatch();
-        super.invokeFinishBundle();
-    }
-
-    private void invokeCurrentBatch() throws Exception {
-        if (currentBatchCount > 0) {
-            arrowSerializer.finishCurrentBatch();
-            currentBatchCount = 0;
-            pythonFunctionRunner.process(baos.toByteArray());
-            checkInvokeFinishBundleByCount();
-            baos.reset();
-            arrowSerializer.resetWriter();
-        }
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractRowPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractRowPythonScalarFunctionOperator.java
deleted file mode 100644
index 507f6f9..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractRowPythonScalarFunctionOperator.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.scalar;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.operators.python.utils.StreamRecordCRowWrappingCollector;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.types.CRowTypeInfo;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-
-import java.util.Arrays;
-
-/** Base Python {@link ScalarFunction} operator for the legacy planner. */
-@Internal
-public abstract class AbstractRowPythonScalarFunctionOperator
-        extends AbstractPythonScalarFunctionOperator<CRow, CRow, Row> {
-
-    private static final long serialVersionUID = 1L;
-
-    /** The collector used to collect records. */
-    protected transient StreamRecordCRowWrappingCollector cRowWrapper;
-
-    /** The type serializer for the forwarded fields. */
-    private transient TypeSerializer<CRow> forwardedInputSerializer;
-
-    public AbstractRowPythonScalarFunctionOperator(
-            Configuration config,
-            PythonFunctionInfo[] scalarFunctions,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            int[] forwardedFields) {
-        super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-    }
-
-    @Override
-    public void open() throws Exception {
-        super.open();
-        this.cRowWrapper = new StreamRecordCRowWrappingCollector(output);
-
-        CRowTypeInfo forwardedInputTypeInfo =
-                new CRowTypeInfo(
-                        new RowTypeInfo(
-                                Arrays.stream(forwardedFields)
-                                        .mapToObj(i -> inputType.getFields().get(i))
-                                        .map(RowType.RowField::getType)
-                                        .map(TypeConversions::fromLogicalToDataType)
-                                        .map(TypeConversions::fromDataTypeToLegacyInfo)
-                                        .toArray(TypeInformation[]::new)));
-        forwardedInputSerializer = forwardedInputTypeInfo.createSerializer(getExecutionConfig());
-    }
-
-    @Override
-    public void bufferInput(CRow input) {
-        CRow forwardedFieldsRow =
-                new CRow(Row.project(input.row(), forwardedFields), input.change());
-        if (getExecutionConfig().isObjectReuseEnabled()) {
-            forwardedFieldsRow = forwardedInputSerializer.copy(forwardedFieldsRow);
-        }
-        forwardedInputQueue.add(forwardedFieldsRow);
-    }
-
-    @Override
-    public Row getFunctionInput(CRow element) {
-        return Row.project(element.row(), userDefinedFunctionInputOffsets);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
deleted file mode 100644
index b458850..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.scalar;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-/** The Python {@link ScalarFunction} operator for the legacy planner. */
-@Internal
-public class PythonScalarFunctionOperator extends AbstractRowPythonScalarFunctionOperator {
-
-    private static final long serialVersionUID = 1L;
-
-    /** The TypeSerializer for udf execution results. */
-    private transient TypeSerializer<Row> udfOutputTypeSerializer;
-
-    /** The TypeSerializer for udf input elements. */
-    private transient TypeSerializer<Row> udfInputTypeSerializer;
-
-    public PythonScalarFunctionOperator(
-            Configuration config,
-            PythonFunctionInfo[] scalarFunctions,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            int[] forwardedFields) {
-        super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void open() throws Exception {
-        super.open();
-        udfInputTypeSerializer =
-                PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionInputType);
-        udfOutputTypeSerializer =
-                PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionOutputType);
-    }
-
-    @Override
-    public void processElementInternal(CRow value) throws Exception {
-        udfInputTypeSerializer.serialize(getFunctionInput(value), baosWrapper);
-        pythonFunctionRunner.process(baos.toByteArray());
-        baos.reset();
-    }
-
-    @Override
-    @SuppressWarnings("ConstantConditions")
-    public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
-        byte[] rawUdfResult = resultTuple.f0;
-        int length = resultTuple.f1;
-        CRow input = forwardedInputQueue.poll();
-        cRowWrapper.setChange(input.change());
-        bais.setBuffer(rawUdfResult, 0, length);
-        Row udfResult = udfOutputTypeSerializer.deserialize(baisWrapper);
-        cRowWrapper.collect(Row.join(input.row(), udfResult));
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java
deleted file mode 100644
index d3ce04a..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.scalar.arrow;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
-import org.apache.flink.table.runtime.arrow.serializers.RowArrowSerializer;
-import org.apache.flink.table.runtime.operators.python.scalar.AbstractRowPythonScalarFunctionOperator;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-/** Arrow Python {@link ScalarFunction} operator for the old planner. */
-@Internal
-public class ArrowPythonScalarFunctionOperator extends AbstractRowPythonScalarFunctionOperator {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final String SCHEMA_ARROW_CODER_URN = "flink:coder:schema:arrow:v1";
-
-    /** The current number of elements to be included in an arrow batch. */
-    private transient int currentBatchCount;
-
-    /** Max number of elements to include in an arrow batch. */
-    private transient int maxArrowBatchSize;
-
-    private transient ArrowSerializer<Row> arrowSerializer;
-
-    public ArrowPythonScalarFunctionOperator(
-            Configuration config,
-            PythonFunctionInfo[] scalarFunctions,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            int[] forwardedFields) {
-        super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-    }
-
-    @Override
-    public void open() throws Exception {
-        super.open();
-        maxArrowBatchSize = Math.min(getPythonConfig().getMaxArrowBatchSize(), maxBundleSize);
-        arrowSerializer =
-                new RowArrowSerializer(userDefinedFunctionInputType, userDefinedFunctionOutputType);
-        arrowSerializer.open(bais, baos);
-        currentBatchCount = 0;
-    }
-
-    @Override
-    protected void invokeFinishBundle() throws Exception {
-        invokeCurrentBatch();
-        super.invokeFinishBundle();
-    }
-
-    @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        if (arrowSerializer != null) {
-            arrowSerializer.close();
-            arrowSerializer = null;
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        invokeCurrentBatch();
-        super.close();
-    }
-
-    @Override
-    public void endInput() throws Exception {
-        invokeCurrentBatch();
-        super.endInput();
-    }
-
-    @Override
-    @SuppressWarnings("ConstantConditions")
-    public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
-        byte[] udfResult = resultTuple.f0;
-        int length = resultTuple.f1;
-        bais.setBuffer(udfResult, 0, length);
-        int rowCount = arrowSerializer.load();
-        for (int i = 0; i < rowCount; i++) {
-            CRow input = forwardedInputQueue.poll();
-            cRowWrapper.setChange(input.change());
-            cRowWrapper.collect(Row.join(input.row(), arrowSerializer.read(i)));
-        }
-        arrowSerializer.resetReader();
-    }
-
-    @Override
-    public String getInputOutputCoderUrn() {
-        return SCHEMA_ARROW_CODER_URN;
-    }
-
-    @Override
-    public void processElementInternal(CRow value) throws Exception {
-        arrowSerializer.write(getFunctionInput(value));
-        currentBatchCount++;
-        if (currentBatchCount >= maxArrowBatchSize) {
-            invokeCurrentBatch();
-        }
-    }
-
-    private void invokeCurrentBatch() throws Exception {
-        if (currentBatchCount > 0) {
-            arrowSerializer.finishCurrentBatch();
-            currentBatchCount = 0;
-            pythonFunctionRunner.process(baos.toByteArray());
-            checkInvokeFinishBundleByCount();
-            baos.reset();
-            arrowSerializer.resetWriter();
-        }
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
deleted file mode 100644
index d6282dc..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.table;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.planner.plan.utils.JoinTypeUtil;
-import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
-import org.apache.flink.table.runtime.operators.python.utils.StreamRecordCRowWrappingCollector;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.types.CRowTypeInfo;
-import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-
-import org.apache.calcite.rel.core.JoinRelType;
-
-/** The Python {@link TableFunction} operator for the legacy planner. */
-@Internal
-public class PythonTableFunctionOperator
-        extends AbstractPythonTableFunctionOperator<CRow, CRow, Row> {
-
-    private static final long serialVersionUID = 1L;
-
-    /** The collector used to collect records. */
-    private transient StreamRecordCRowWrappingCollector cRowWrapper;
-
-    /** The type serializer for the forwarded fields. */
-    private transient TypeSerializer<CRow> forwardedInputSerializer;
-
-    /** The TypeSerializer for udtf execution results. */
-    private transient TypeSerializer<Row> udtfOutputTypeSerializer;
-
-    /** The TypeSerializer for udtf input elements. */
-    private transient TypeSerializer<Row> udtfInputTypeSerializer;
-
-    public PythonTableFunctionOperator(
-            Configuration config,
-            PythonFunctionInfo tableFunction,
-            RowType inputType,
-            RowType outputType,
-            int[] udtfInputOffsets,
-            JoinRelType joinType) {
-        super(
-                config,
-                tableFunction,
-                inputType,
-                outputType,
-                udtfInputOffsets,
-                JoinTypeUtil.getFlinkJoinType(joinType));
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void open() throws Exception {
-        super.open();
-        this.cRowWrapper = new StreamRecordCRowWrappingCollector(output);
-        CRowTypeInfo forwardedInputTypeInfo =
-                new CRowTypeInfo(
-                        (RowTypeInfo)
-                                TypeConversions.fromDataTypeToLegacyInfo(
-                                        TypeConversions.fromLogicalToDataType(inputType)));
-        forwardedInputSerializer = forwardedInputTypeInfo.createSerializer(getExecutionConfig());
-        udtfOutputTypeSerializer =
-                PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionOutputType);
-        udtfInputTypeSerializer =
-                PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionInputType);
-    }
-
-    @Override
-    @SuppressWarnings("ConstantConditions")
-    public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
-        CRow input = forwardedInputQueue.poll();
-        byte[] rawUdtfResult;
-        int length;
-        boolean isFinishResult;
-        boolean hasJoined = false;
-        Row udtfResult;
-        do {
-            rawUdtfResult = resultTuple.f0;
-            length = resultTuple.f1;
-            isFinishResult = isFinishResult(rawUdtfResult, length);
-            if (!isFinishResult) {
-                bais.setBuffer(rawUdtfResult, 0, length);
-                udtfResult = udtfOutputTypeSerializer.deserialize(baisWrapper);
-                cRowWrapper.setChange(input.change());
-                cRowWrapper.collect(Row.join(input.row(), udtfResult));
-                resultTuple = pythonFunctionRunner.pollResult();
-                hasJoined = true;
-            } else if (joinType == FlinkJoinType.LEFT && !hasJoined) {
-                udtfResult = new Row(userDefinedFunctionOutputType.getFieldCount());
-                for (int i = 0; i < udtfResult.getArity(); i++) {
-                    udtfResult.setField(0, null);
-                }
-                cRowWrapper.setChange(input.change());
-                cRowWrapper.collect(Row.join(input.row(), udtfResult));
-            }
-        } while (!isFinishResult);
-    }
-
-    @Override
-    public void bufferInput(CRow input) {
-        if (getExecutionConfig().isObjectReuseEnabled()) {
-            input = forwardedInputSerializer.copy(input);
-        }
-        forwardedInputQueue.add(input);
-    }
-
-    @Override
-    public Row getFunctionInput(CRow element) {
-        return Row.project(element.row(), userDefinedFunctionInputOffsets);
-    }
-
-    @Override
-    public void processElementInternal(CRow value) throws Exception {
-        udtfInputTypeSerializer.serialize(getFunctionInput(value), baosWrapper);
-        pythonFunctionRunner.process(baos.toByteArray());
-        baos.reset();
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/utils/StreamRecordCRowWrappingCollector.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/utils/StreamRecordCRowWrappingCollector.java
deleted file mode 100644
index 502de8e..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/utils/StreamRecordCRowWrappingCollector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.utils;
-
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Collector;
-
-/** The collector is used to convert a {@link Row} to a {@link CRow}. */
-public class StreamRecordCRowWrappingCollector implements Collector<Row> {
-
-    private final Collector<StreamRecord<CRow>> out;
-    private final CRow reuseCRow = new CRow();
-
-    /** For Table API & SQL jobs, the timestamp field is not used. */
-    private final StreamRecord<CRow> reuseStreamRecord = new StreamRecord<>(reuseCRow);
-
-    public StreamRecordCRowWrappingCollector(Collector<StreamRecord<CRow>> out) {
-        this.out = out;
-    }
-
-    public void setChange(boolean change) {
-        this.reuseCRow.change_$eq(change);
-    }
-
-    @Override
-    public void collect(Row record) {
-        reuseCRow.row_$eq(record);
-        out.collect(reuseStreamRecord);
-    }
-
-    @Override
-    public void close() {
-        out.close();
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java b/flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
index c8824fe..7676bfc 100644
--- a/flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
+++ b/flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
@@ -17,12 +17,10 @@
 
 package org.apache.flink.client.python;
 
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.util.FileUtils;
 
@@ -45,7 +43,6 @@ import static org.apache.flink.table.api.Expressions.call;
 public class PythonFunctionFactoryTest {
 
     private static String tmpdir = "";
-    private static BatchTableEnvironment flinkTableEnv;
     private static StreamTableEnvironment blinkTableEnv;
     private static Table flinkSourceTable;
     private static Table blinkSourceTable;
@@ -72,13 +69,6 @@ public class PythonFunctionFactoryTest {
                             + "    return str + str\n";
             out.write(code.getBytes());
         }
-        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-        flinkTableEnv = BatchTableEnvironment.create(env);
-        flinkTableEnv
-                .getConfig()
-                .getConfiguration()
-                .set(PYTHON_FILES, pyFilePath.getAbsolutePath());
-        flinkTableEnv.getConfig().getConfiguration().setString(TASK_OFF_HEAP_MEMORY.key(), "80mb");
         StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
         blinkTableEnv =
                 StreamTableEnvironment.create(
@@ -92,7 +82,6 @@ public class PythonFunctionFactoryTest {
                 .getConfiguration()
                 .set(PYTHON_FILES, pyFilePath.getAbsolutePath());
         blinkTableEnv.getConfig().getConfiguration().setString(TASK_OFF_HEAP_MEMORY.key(), "80mb");
-        flinkSourceTable = flinkTableEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str");
         blinkSourceTable = blinkTableEnv.fromDataStream(sEnv.fromElements("1", "2", "3")).as("str");
     }
 
@@ -102,24 +91,6 @@ public class PythonFunctionFactoryTest {
     }
 
     public static void testPythonFunctionFactory() {
-        // flink catalog
-        flinkTableEnv.executeSql("create function func1 as 'test1.func1' language python");
-        verifyPlan(flinkSourceTable.select(call("func1", $("str"))), flinkTableEnv);
-
-        // flink catalog
-        flinkTableEnv.executeSql("alter function func1 as 'test1.func1' language python");
-        verifyPlan(flinkSourceTable.select(call("func1", $("str"))), flinkTableEnv);
-
-        // flink temporary catalog
-        flinkTableEnv.executeSql(
-                "create temporary function func1 as 'test1.func1' language python");
-        verifyPlan(flinkSourceTable.select(call("func1", $("str"))), flinkTableEnv);
-
-        // flink temporary system
-        flinkTableEnv.executeSql(
-                "create temporary system function func1 as 'test1.func1' language python");
-        verifyPlan(flinkSourceTable.select(call("func1", $("str"))), flinkTableEnv);
-
         // blink catalog
         blinkTableEnv.executeSql("create function func1 as 'test1.func1' language python");
         verifyPlan(blinkSourceTable.select(call("func1", $("str"))), blinkTableEnv);
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
deleted file mode 100644
index 6f4cd18..0000000
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.scalar;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.PythonFunctionRunner;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.utils.PassThroughPythonScalarFunctionRunner;
-import org.apache.flink.table.runtime.utils.PythonTestUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Queue;
-
-/** Tests for {@link PythonScalarFunctionOperator}. */
-public class PythonScalarFunctionOperatorTest
-        extends PythonScalarFunctionOperatorTestBase<CRow, CRow, Row> {
-
-    @Override
-    public AbstractPythonScalarFunctionOperator<CRow, CRow, Row> getTestOperator(
-            Configuration config,
-            PythonFunctionInfo[] scalarFunctions,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            int[] forwardedFields) {
-        return new PassThroughPythonScalarFunctionOperator(
-                config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-    }
-
-    @Override
-    public CRow newRow(boolean accumulateMsg, Object... fields) {
-        return new CRow(Row.of(fields), accumulateMsg);
-    }
-
-    @Override
-    public void assertOutputEquals(
-            String message, Collection<Object> expected, Collection<Object> actual) {
-        TestHarnessUtil.assertOutputEquals(
-                message, (Queue<Object>) expected, (Queue<Object>) actual);
-    }
-
-    @Override
-    public StreamTableEnvironment createTableEnvironment(StreamExecutionEnvironment env) {
-        return StreamTableEnvironment.create(env);
-    }
-
-    @Override
-    public TypeSerializer<CRow> getOutputTypeSerializer(RowType dataType) {
-        // If set to null, PojoSerializer is used by default which works well here.
-        return null;
-    }
-
-    private static class PassThroughPythonScalarFunctionOperator
-            extends PythonScalarFunctionOperator {
-
-        PassThroughPythonScalarFunctionOperator(
-                Configuration config,
-                PythonFunctionInfo[] scalarFunctions,
-                RowType inputType,
-                RowType outputType,
-                int[] udfInputOffsets,
-                int[] forwardedFields) {
-            super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-        }
-
-        @Override
-        public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
-            return new PassThroughPythonScalarFunctionRunner(
-                    getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
-                    userDefinedFunctionInputType,
-                    userDefinedFunctionOutputType,
-                    getFunctionUrn(),
-                    getUserDefinedFunctionsProto(),
-                    getInputOutputCoderUrn(),
-                    new HashMap<>(),
-                    PythonTestUtils.createMockFlinkMetricContainer());
-        }
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
deleted file mode 100644
index 6274a83..0000000
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.scalar.arrow;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.PythonFunctionRunner;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator;
-import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.utils.PassThroughPythonScalarFunctionRunner;
-import org.apache.flink.table.runtime.utils.PythonTestUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Queue;
-
-/** Tests for {@link ArrowPythonScalarFunctionOperator}. */
-public class ArrowPythonScalarFunctionOperatorTest
-        extends PythonScalarFunctionOperatorTestBase<CRow, CRow, Row> {
-
-    public AbstractPythonScalarFunctionOperator<CRow, CRow, Row> getTestOperator(
-            Configuration config,
-            PythonFunctionInfo[] scalarFunctions,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            int[] forwardedFields) {
-        return new PassThroughArrowPythonScalarFunctionOperator(
-                config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-    }
-
-    public CRow newRow(boolean accumulateMsg, Object... fields) {
-        return new CRow(Row.of(fields), accumulateMsg);
-    }
-
-    public void assertOutputEquals(
-            String message, Collection<Object> expected, Collection<Object> actual) {
-        TestHarnessUtil.assertOutputEquals(
-                message, (Queue<Object>) expected, (Queue<Object>) actual);
-    }
-
-    public StreamTableEnvironment createTableEnvironment(StreamExecutionEnvironment env) {
-        return StreamTableEnvironment.create(env);
-    }
-
-    @Override
-    public TypeSerializer<CRow> getOutputTypeSerializer(RowType dataType) {
-        // If set to null, PojoSerializer is used by default which works well here.
-        return null;
-    }
-
-    private static class PassThroughArrowPythonScalarFunctionOperator
-            extends ArrowPythonScalarFunctionOperator {
-
-        PassThroughArrowPythonScalarFunctionOperator(
-                Configuration config,
-                PythonFunctionInfo[] scalarFunctions,
-                RowType inputType,
-                RowType outputType,
-                int[] udfInputOffsets,
-                int[] forwardedFields) {
-            super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
-        }
-
-        @Override
-        public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
-            return new PassThroughPythonScalarFunctionRunner(
-                    getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
-                    userDefinedFunctionInputType,
-                    userDefinedFunctionOutputType,
-                    getFunctionUrn(),
-                    getUserDefinedFunctionsProto(),
-                    getInputOutputCoderUrn(),
-                    new HashMap<>(),
-                    PythonTestUtils.createMockFlinkMetricContainer());
-        }
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
deleted file mode 100644
index 1b3cc01..0000000
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.table;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.PythonFunctionRunner;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.utils.PassThroughPythonTableFunctionRunner;
-import org.apache.flink.table.runtime.utils.PythonTestUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import org.apache.calcite.rel.core.JoinRelType;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Queue;
-
-/** Tests for {@link PythonTableFunctionOperator}. */
-public class PythonTableFunctionOperatorTest
-        extends PythonTableFunctionOperatorTestBase<CRow, CRow, Row> {
-    @Override
-    public AbstractPythonTableFunctionOperator<CRow, CRow, Row> getTestOperator(
-            Configuration config,
-            PythonFunctionInfo tableFunction,
-            RowType inputType,
-            RowType outputType,
-            int[] udfInputOffsets,
-            JoinRelType joinRelType) {
-        return new PassThroughPythonTableFunctionOperator(
-                config, tableFunction, inputType, outputType, udfInputOffsets, joinRelType);
-    }
-
-    @Override
-    public CRow newRow(boolean accumulateMsg, Object... fields) {
-        return new CRow(Row.of(fields), accumulateMsg);
-    }
-
-    @Override
-    public void assertOutputEquals(
-            String message, Collection<Object> expected, Collection<Object> actual) {
-        TestHarnessUtil.assertOutputEquals(
-                message, (Queue<Object>) expected, (Queue<Object>) actual);
-    }
-
-    private static class PassThroughPythonTableFunctionOperator
-            extends PythonTableFunctionOperator {
-
-        PassThroughPythonTableFunctionOperator(
-                Configuration config,
-                PythonFunctionInfo tableFunction,
-                RowType inputType,
-                RowType outputType,
-                int[] udfInputOffsets,
-                JoinRelType joinRelType) {
-            super(config, tableFunction, inputType, outputType, udfInputOffsets, joinRelType);
-        }
-
-        @Override
-        public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
-            return new PassThroughPythonTableFunctionRunner(
-                    getRuntimeContext().getTaskName(),
-                    PythonTestUtils.createTestEnvironmentManager(),
-                    userDefinedFunctionInputType,
-                    userDefinedFunctionOutputType,
-                    getFunctionUrn(),
-                    getUserDefinedFunctionsProto(),
-                    getInputOutputCoderUrn(),
-                    new HashMap<>(),
-                    PythonTestUtils.createMockFlinkMetricContainer());
-        }
-    }
-}
diff --git a/flink-python/tox.ini b/flink-python/tox.ini
index ce7d02c..2cc04f6 100644
--- a/flink-python/tox.ini
+++ b/flink-python/tox.ini
@@ -53,7 +53,7 @@ max-line-length=100
 exclude=.tox/*,dev/*,lib/*,target/*,build/*,dist/*,pyflink/shell.py,.eggs/*,pyflink/fn_execution/tests/process_mode_test_data.py,pyflink/fn_execution/*_pb2.py
 
 [mypy]
-files=pyflink/common/*.py,pyflink/table/*.py,pyflink/dataset/*.py,pyflink/datastream/*.py,pyflink/metrics/*.py
+files=pyflink/common/*.py,pyflink/table/*.py,pyflink/datastream/*.py,pyflink/metrics/*.py
 ignore_missing_imports = True
 strict_optional=False