You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/05/28 13:21:55 UTC

[flink] 02/02: [FLINK-22619][python] Drop Python UDF serializers for old planner

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a2afa88d910bc5d99e95c8f9069fb13f5f75edd5
Author: huangxingbo <hx...@gmail.com>
AuthorDate: Thu May 27 20:01:26 2021 +0800

    [FLINK-22619][python] Drop Python UDF serializers for old planner
    
    This closes #16008.
---
 .../flink/table/runtime/arrow/ArrowUtils.java      | 171 +---------------
 .../runtime/arrow/readers/ArrayFieldReader.java    |  82 --------
 .../runtime/arrow/readers/ArrowFieldReader.java    |  48 -----
 .../runtime/arrow/readers/BigIntFieldReader.java   |  37 ----
 .../runtime/arrow/readers/BooleanFieldReader.java  |  37 ----
 .../runtime/arrow/readers/DateFieldReader.java     |  44 ----
 .../runtime/arrow/readers/DecimalFieldReader.java  |  39 ----
 .../runtime/arrow/readers/DoubleFieldReader.java   |  37 ----
 .../runtime/arrow/readers/FloatFieldReader.java    |  37 ----
 .../runtime/arrow/readers/IntFieldReader.java      |  37 ----
 .../runtime/arrow/readers/RowArrowReader.java      |  55 -----
 .../runtime/arrow/readers/RowFieldReader.java      |  50 -----
 .../runtime/arrow/readers/SmallIntFieldReader.java |  37 ----
 .../runtime/arrow/readers/TimeFieldReader.java     |  68 -------
 .../arrow/readers/TimestampFieldReader.java        |  66 ------
 .../runtime/arrow/readers/TinyIntFieldReader.java  |  37 ----
 .../arrow/readers/VarBinaryFieldReader.java        |  37 ----
 .../runtime/arrow/readers/VarCharFieldReader.java  |  43 ----
 .../arrow/serializers/RowArrowSerializer.java      |  46 -----
 .../arrow/sources/RowArrowSourceFunction.java      |  52 -----
 .../runtime/arrow/sources/RowArrowTableSource.java |  38 ----
 .../table/runtime/typeutils/PythonTypeUtils.java   | 144 +------------
 .../flink/table/runtime/arrow/ArrowUtilsTest.java  | 124 +++--------
 .../runtime/arrow/RowArrowReaderWriterTest.java    | 226 ---------------------
 .../arrow/sources/RowArrowSourceFunctionTest.java  |  87 --------
 .../runtime/typeutils/PythonTypeUtilsTest.java     |  26 +--
 26 files changed, 40 insertions(+), 1665 deletions(-)

diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
index 1264247..1821f5c 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
@@ -32,26 +32,8 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.table.data.vector.ColumnVector;
 import org.apache.flink.table.operations.OutputConversionModifyOperation;
-import org.apache.flink.table.runtime.arrow.readers.ArrayFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.ArrowFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.BooleanFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DateFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DecimalFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DoubleFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.FloatFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.IntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.RowArrowReader;
-import org.apache.flink.table.runtime.arrow.readers.RowFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.SmallIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TimeFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TimestampFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TinyIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.VarBinaryFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.VarCharFieldReader;
 import org.apache.flink.table.runtime.arrow.sources.AbstractArrowTableSource;
 import org.apache.flink.table.runtime.arrow.sources.ArrowTableSource;
-import org.apache.flink.table.runtime.arrow.sources.RowArrowTableSource;
 import org.apache.flink.table.runtime.arrow.vectors.ArrowArrayColumnVector;
 import org.apache.flink.table.runtime.arrow.vectors.ArrowBigIntColumnVector;
 import org.apache.flink.table.runtime.arrow.vectors.ArrowBooleanColumnVector;
@@ -77,21 +59,6 @@ import org.apache.flink.table.runtime.arrow.writers.DecimalWriter;
 import org.apache.flink.table.runtime.arrow.writers.DoubleWriter;
 import org.apache.flink.table.runtime.arrow.writers.FloatWriter;
 import org.apache.flink.table.runtime.arrow.writers.IntWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowArrayWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowBigIntWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowBooleanWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowDateWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowDecimalWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowDoubleWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowFloatWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowIntWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowRowWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowSmallIntWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowTimeWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowTimestampWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowTinyIntWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowVarBinaryWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowVarCharWriter;
 import org.apache.flink.table.runtime.arrow.writers.RowWriter;
 import org.apache.flink.table.runtime.arrow.writers.SmallIntWriter;
 import org.apache.flink.table.runtime.arrow.writers.TimeWriter;
@@ -238,73 +205,6 @@ public final class ArrowUtils {
         return new Field(fieldName, fieldType, children);
     }
 
-    /** Creates an {@link ArrowWriter} for the specified {@link VectorSchemaRoot}. */
-    public static ArrowWriter<Row> createRowArrowWriter(VectorSchemaRoot root, RowType rowType) {
-        ArrowFieldWriter<Row>[] fieldWriters = new ArrowFieldWriter[root.getFieldVectors().size()];
-        List<FieldVector> vectors = root.getFieldVectors();
-        for (int i = 0; i < vectors.size(); i++) {
-            FieldVector vector = vectors.get(i);
-            vector.allocateNew();
-            fieldWriters[i] = createRowArrowFieldWriter(vector, rowType.getTypeAt(i));
-        }
-
-        return new ArrowWriter<>(root, fieldWriters);
-    }
-
-    private static ArrowFieldWriter<Row> createRowArrowFieldWriter(
-            ValueVector vector, LogicalType fieldType) {
-        if (vector instanceof TinyIntVector) {
-            return new RowTinyIntWriter((TinyIntVector) vector);
-        } else if (vector instanceof SmallIntVector) {
-            return new RowSmallIntWriter((SmallIntVector) vector);
-        } else if (vector instanceof IntVector) {
-            return new RowIntWriter((IntVector) vector);
-        } else if (vector instanceof BigIntVector) {
-            return new RowBigIntWriter((BigIntVector) vector);
-        } else if (vector instanceof BitVector) {
-            return new RowBooleanWriter((BitVector) vector);
-        } else if (vector instanceof Float4Vector) {
-            return new RowFloatWriter((Float4Vector) vector);
-        } else if (vector instanceof Float8Vector) {
-            return new RowDoubleWriter((Float8Vector) vector);
-        } else if (vector instanceof VarCharVector) {
-            return new RowVarCharWriter((VarCharVector) vector);
-        } else if (vector instanceof VarBinaryVector) {
-            return new RowVarBinaryWriter((VarBinaryVector) vector);
-        } else if (vector instanceof DecimalVector) {
-            DecimalVector decimalVector = (DecimalVector) vector;
-            return new RowDecimalWriter(
-                    decimalVector, getPrecision(decimalVector), decimalVector.getScale());
-        } else if (vector instanceof DateDayVector) {
-            return new RowDateWriter((DateDayVector) vector);
-        } else if (vector instanceof TimeSecVector
-                || vector instanceof TimeMilliVector
-                || vector instanceof TimeMicroVector
-                || vector instanceof TimeNanoVector) {
-            return new RowTimeWriter(vector);
-        } else if (vector instanceof TimeStampVector
-                && ((ArrowType.Timestamp) vector.getField().getType()).getTimezone() == null) {
-            return new RowTimestampWriter(vector);
-        } else if (vector instanceof ListVector) {
-            ListVector listVector = (ListVector) vector;
-            LogicalType elementType = ((ArrayType) fieldType).getElementType();
-            return new RowArrayWriter(
-                    listVector, createRowArrowFieldWriter(listVector.getDataVector(), elementType));
-        } else if (vector instanceof StructVector) {
-            RowType rowType = (RowType) fieldType;
-            ArrowFieldWriter<Row>[] fieldsWriters = new ArrowFieldWriter[rowType.getFieldCount()];
-            for (int i = 0; i < fieldsWriters.length; i++) {
-                fieldsWriters[i] =
-                        createRowArrowFieldWriter(
-                                ((StructVector) vector).getVectorById(i), rowType.getTypeAt(i));
-            }
-            return new RowRowWriter((StructVector) vector, fieldsWriters);
-        } else {
-            throw new UnsupportedOperationException(
-                    String.format("Unsupported type %s.", fieldType));
-        }
-    }
-
     /**
      * Creates an {@link ArrowWriter} for blink planner for the specified {@link VectorSchemaRoot}.
      */
@@ -446,71 +346,6 @@ public final class ArrowUtils {
         }
     }
 
-    /** Creates an {@link ArrowReader} for the specified {@link VectorSchemaRoot}. */
-    public static RowArrowReader createRowArrowReader(VectorSchemaRoot root, RowType rowType) {
-        List<ArrowFieldReader> fieldReaders = new ArrayList<>();
-        List<FieldVector> fieldVectors = root.getFieldVectors();
-        for (int i = 0; i < fieldVectors.size(); i++) {
-            fieldReaders.add(createRowArrowFieldReader(fieldVectors.get(i), rowType.getTypeAt(i)));
-        }
-
-        return new RowArrowReader(fieldReaders.toArray(new ArrowFieldReader[0]));
-    }
-
-    public static ArrowFieldReader createRowArrowFieldReader(
-            ValueVector vector, LogicalType fieldType) {
-        if (vector instanceof TinyIntVector) {
-            return new TinyIntFieldReader((TinyIntVector) vector);
-        } else if (vector instanceof SmallIntVector) {
-            return new SmallIntFieldReader((SmallIntVector) vector);
-        } else if (vector instanceof IntVector) {
-            return new IntFieldReader((IntVector) vector);
-        } else if (vector instanceof BigIntVector) {
-            return new BigIntFieldReader((BigIntVector) vector);
-        } else if (vector instanceof BitVector) {
-            return new BooleanFieldReader((BitVector) vector);
-        } else if (vector instanceof Float4Vector) {
-            return new FloatFieldReader((Float4Vector) vector);
-        } else if (vector instanceof Float8Vector) {
-            return new DoubleFieldReader((Float8Vector) vector);
-        } else if (vector instanceof VarCharVector) {
-            return new VarCharFieldReader((VarCharVector) vector);
-        } else if (vector instanceof VarBinaryVector) {
-            return new VarBinaryFieldReader((VarBinaryVector) vector);
-        } else if (vector instanceof DecimalVector) {
-            return new DecimalFieldReader((DecimalVector) vector);
-        } else if (vector instanceof DateDayVector) {
-            return new DateFieldReader((DateDayVector) vector);
-        } else if (vector instanceof TimeSecVector
-                || vector instanceof TimeMilliVector
-                || vector instanceof TimeMicroVector
-                || vector instanceof TimeNanoVector) {
-            return new TimeFieldReader(vector);
-        } else if (vector instanceof TimeStampVector
-                && ((ArrowType.Timestamp) vector.getField().getType()).getTimezone() == null) {
-            return new TimestampFieldReader(vector);
-        } else if (vector instanceof ListVector) {
-            ListVector listVector = (ListVector) vector;
-            LogicalType elementType = ((ArrayType) fieldType).getElementType();
-            return new ArrayFieldReader(
-                    listVector,
-                    createRowArrowFieldReader(listVector.getDataVector(), elementType),
-                    elementType);
-        } else if (vector instanceof StructVector) {
-            StructVector structVector = (StructVector) vector;
-            ArrowFieldReader[] fieldReaders = new ArrowFieldReader[structVector.size()];
-            for (int i = 0; i < fieldReaders.length; i++) {
-                fieldReaders[i] =
-                        createRowArrowFieldReader(
-                                structVector.getVectorById(i), ((RowType) fieldType).getTypeAt(i));
-            }
-            return new RowFieldReader(structVector, fieldReaders);
-        } else {
-            throw new UnsupportedOperationException(
-                    String.format("Unsupported type %s.", fieldType));
-        }
-    }
-
     /**
      * Creates an {@link ArrowReader} for blink planner for the specified {@link VectorSchemaRoot}.
      */
@@ -580,11 +415,7 @@ public final class ArrowUtils {
     public static AbstractArrowTableSource createArrowTableSource(
             DataType dataType, String fileName) throws IOException {
         try (FileInputStream fis = new FileInputStream(fileName)) {
-            if (RowData.class.isAssignableFrom(dataType.getConversionClass())) {
-                return new ArrowTableSource(dataType, readArrowBatches(fis.getChannel()));
-            } else {
-                return new RowArrowTableSource(dataType, readArrowBatches(fis.getChannel()));
-            }
+            return new ArrowTableSource(dataType, readArrowBatches(fis.getChannel()));
         }
     }
 
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrayFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrayFieldReader.java
deleted file mode 100644
index 32130a1..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrayFieldReader.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.DateType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.TimeType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.arrow.vector.complex.ListVector;
-
-import java.lang.reflect.Array;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-
-/** {@link ArrowFieldReader} for Array. */
-@Internal
-public final class ArrayFieldReader extends ArrowFieldReader<Object[]> {
-
-    private final ArrowFieldReader arrayData;
-    private final Class<?> elementClass;
-
-    public ArrayFieldReader(
-            ListVector listVector, ArrowFieldReader arrayData, LogicalType elementType) {
-        super(listVector);
-        this.arrayData = Preconditions.checkNotNull(arrayData);
-        this.elementClass = getElementClass(elementType);
-    }
-
-    @Override
-    public Object[] read(int index) {
-        if (getValueVector().isNull(index)) {
-            return null;
-        } else {
-            int startIndex = index * ListVector.OFFSET_WIDTH;
-            int start = getValueVector().getOffsetBuffer().getInt(startIndex);
-            int end =
-                    getValueVector().getOffsetBuffer().getInt(startIndex + ListVector.OFFSET_WIDTH);
-            Object[] result = (Object[]) Array.newInstance(elementClass, end - start);
-            for (int i = 0; i < result.length; i++) {
-                result[i] = arrayData.read(start + i);
-            }
-            return result;
-        }
-    }
-
-    private Class<?> getElementClass(LogicalType elementType) {
-        DataType dataType = TypeConversions.fromLogicalToDataType(elementType);
-        if (elementType instanceof TimestampType) {
-            // the default conversion class is java.time.LocalDateTime
-            dataType = dataType.bridgedTo(Timestamp.class);
-        } else if (elementType instanceof DateType) {
-            // the default conversion class is java.time.LocalDate
-            dataType = dataType.bridgedTo(Date.class);
-        } else if (elementType instanceof TimeType) {
-            // the default conversion class is java.time.LocalTime
-            dataType = dataType.bridgedTo(Time.class);
-        }
-        return dataType.getConversionClass();
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrowFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrowFieldReader.java
deleted file mode 100644
index 6773b9e..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrowFieldReader.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.arrow.vector.ValueVector;
-
-/**
- * Base class for arrow field reader.
- *
- * @param <OUT> Type of the row to write.
- */
-@Internal
-public abstract class ArrowFieldReader<OUT> {
-
-    /** Container which is used to store the sequence of values of a column to read. */
-    private final ValueVector valueVector;
-
-    public ArrowFieldReader(ValueVector valueVector) {
-        this.valueVector = Preconditions.checkNotNull(valueVector);
-    }
-
-    /** Returns the underlying container which stores the sequence of values of a column to read. */
-    public ValueVector getValueVector() {
-        return valueVector;
-    }
-
-    /** Sets the field value as the specified value. */
-    public abstract OUT read(int index);
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BigIntFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BigIntFieldReader.java
deleted file mode 100644
index ef786db..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BigIntFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.BigIntVector;
-
-/** {@link ArrowFieldReader} for BigInt. */
-@Internal
-public final class BigIntFieldReader extends ArrowFieldReader<Long> {
-
-    public BigIntFieldReader(BigIntVector bigIntVector) {
-        super(bigIntVector);
-    }
-
-    @Override
-    public Long read(int index) {
-        return ((BigIntVector) getValueVector()).getObject(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BooleanFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BooleanFieldReader.java
deleted file mode 100644
index 17ca9aa..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BooleanFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.BitVector;
-
-/** {@link ArrowFieldReader} for Boolean. */
-@Internal
-public final class BooleanFieldReader extends ArrowFieldReader<Boolean> {
-
-    public BooleanFieldReader(BitVector bitVector) {
-        super(bitVector);
-    }
-
-    @Override
-    public Boolean read(int index) {
-        return ((BitVector) getValueVector()).getObject(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DateFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DateFieldReader.java
deleted file mode 100644
index 5eeb614..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DateFieldReader.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
-
-import org.apache.arrow.vector.DateDayVector;
-
-import java.sql.Date;
-
-/** {@link ArrowFieldReader} for Date. */
-@Internal
-public final class DateFieldReader extends ArrowFieldReader<Date> {
-
-    public DateFieldReader(DateDayVector dateDayVector) {
-        super(dateDayVector);
-    }
-
-    @Override
-    public Date read(int index) {
-        if (getValueVector().isNull(index)) {
-            return null;
-        } else {
-            return SqlDateTimeUtils.internalToDate(((DateDayVector) getValueVector()).get(index));
-        }
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DecimalFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DecimalFieldReader.java
deleted file mode 100644
index dc6744b..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DecimalFieldReader.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.DecimalVector;
-
-import java.math.BigDecimal;
-
-/** {@link ArrowFieldReader} for Decimal. */
-@Internal
-public final class DecimalFieldReader extends ArrowFieldReader<BigDecimal> {
-
-    public DecimalFieldReader(DecimalVector decimalVector) {
-        super(decimalVector);
-    }
-
-    @Override
-    public BigDecimal read(int index) {
-        return ((DecimalVector) getValueVector()).getObject(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DoubleFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DoubleFieldReader.java
deleted file mode 100644
index c7d7573..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DoubleFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.Float8Vector;
-
-/** {@link ArrowFieldReader} for Double. */
-@Internal
-public final class DoubleFieldReader extends ArrowFieldReader<Double> {
-
-    public DoubleFieldReader(Float8Vector doubleVector) {
-        super(doubleVector);
-    }
-
-    @Override
-    public Double read(int index) {
-        return ((Float8Vector) getValueVector()).getObject(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/FloatFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/FloatFieldReader.java
deleted file mode 100644
index 3276074..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/FloatFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.Float4Vector;
-
-/** {@link ArrowFieldReader} for Float. */
-@Internal
-public final class FloatFieldReader extends ArrowFieldReader<Float> {
-
-    public FloatFieldReader(Float4Vector floatVector) {
-        super(floatVector);
-    }
-
-    @Override
-    public Float read(int index) {
-        return ((Float4Vector) getValueVector()).getObject(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/IntFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/IntFieldReader.java
deleted file mode 100644
index 4e9a605..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/IntFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.IntVector;
-
-/** {@link ArrowFieldReader} for Int. */
-@Internal
-public final class IntFieldReader extends ArrowFieldReader<Integer> {
-
-    public IntFieldReader(IntVector intVector) {
-        super(intVector);
-    }
-
-    @Override
-    public Integer read(int index) {
-        return ((IntVector) getValueVector()).getObject(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowArrowReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowArrowReader.java
deleted file mode 100644
index 2d1403d..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowArrowReader.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.runtime.arrow.ArrowReader;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-/** {@link ArrowReader} which read the underlying Arrow format data as {@link Row}. */
-@Internal
-public final class RowArrowReader implements ArrowReader<Row> {
-
-    /**
-     * An array of readers which are responsible for the deserialization of each column of the rows.
-     */
-    private final ArrowFieldReader[] fieldReaders;
-
-    /** Reusable row used to hold the deserialized result. */
-    private final Row reuseRow;
-
-    public RowArrowReader(ArrowFieldReader[] fieldReaders) {
-        this.fieldReaders = Preconditions.checkNotNull(fieldReaders);
-        this.reuseRow = new Row(fieldReaders.length);
-    }
-
-    /** Gets the field readers. */
-    public ArrowFieldReader[] getFieldReaders() {
-        return fieldReaders;
-    }
-
-    @Override
-    public Row read(int rowId) {
-        for (int i = 0; i < fieldReaders.length; i++) {
-            reuseRow.setField(i, fieldReaders[i].read(rowId));
-        }
-        return reuseRow;
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowFieldReader.java
deleted file mode 100644
index 673bd0a..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowFieldReader.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.arrow.vector.complex.StructVector;
-
-/** {@link ArrowFieldReader} for Row. */
-@Internal
-public final class RowFieldReader extends ArrowFieldReader<Row> {
-
-    private final ArrowFieldReader[] fieldReaders;
-
-    public RowFieldReader(StructVector structVector, ArrowFieldReader[] fieldReaders) {
-        super(structVector);
-        this.fieldReaders = Preconditions.checkNotNull(fieldReaders);
-    }
-
-    @Override
-    public Row read(int index) {
-        if (getValueVector().isNull(index)) {
-            return null;
-        } else {
-            Row row = new Row(fieldReaders.length);
-            for (int i = 0; i < fieldReaders.length; i++) {
-                row.setField(i, fieldReaders[i].read(index));
-            }
-            return row;
-        }
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/SmallIntFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/SmallIntFieldReader.java
deleted file mode 100644
index 160dc51..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/SmallIntFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.SmallIntVector;
-
-/** {@link ArrowFieldReader} for SmallInt. */
-@Internal
-public final class SmallIntFieldReader extends ArrowFieldReader<Short> {
-
-    public SmallIntFieldReader(SmallIntVector smallIntVector) {
-        super(smallIntVector);
-    }
-
-    @Override
-    public Short read(int index) {
-        return ((SmallIntVector) getValueVector()).getObject(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimeFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimeFieldReader.java
deleted file mode 100644
index d29b737..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimeFieldReader.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.arrow.vector.TimeMicroVector;
-import org.apache.arrow.vector.TimeMilliVector;
-import org.apache.arrow.vector.TimeNanoVector;
-import org.apache.arrow.vector.TimeSecVector;
-import org.apache.arrow.vector.ValueVector;
-
-import java.sql.Time;
-import java.util.TimeZone;
-
-/** {@link ArrowFieldReader} for Time. */
-@Internal
-public final class TimeFieldReader extends ArrowFieldReader<Time> {
-
-    // The local time zone.
-    private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
-
-    public TimeFieldReader(ValueVector valueVector) {
-        super(valueVector);
-        Preconditions.checkState(
-                valueVector instanceof TimeSecVector
-                        || valueVector instanceof TimeMilliVector
-                        || valueVector instanceof TimeMicroVector
-                        || valueVector instanceof TimeNanoVector);
-    }
-
-    @Override
-    public Time read(int index) {
-        ValueVector valueVector = getValueVector();
-        if (valueVector.isNull(index)) {
-            return null;
-        } else {
-            long timeMilli;
-            if (valueVector instanceof TimeSecVector) {
-                timeMilli = ((TimeSecVector) getValueVector()).get(index) * 1000;
-            } else if (valueVector instanceof TimeMilliVector) {
-                timeMilli = ((TimeMilliVector) getValueVector()).get(index);
-            } else if (valueVector instanceof TimeMicroVector) {
-                timeMilli = ((TimeMicroVector) getValueVector()).get(index) / 1000;
-            } else {
-                timeMilli = ((TimeNanoVector) getValueVector()).get(index) / 1000000;
-            }
-            return new Time(timeMilli - LOCAL_TZ.getOffset(timeMilli));
-        }
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimestampFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimestampFieldReader.java
deleted file mode 100644
index 8dc29c7..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimestampFieldReader.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.arrow.vector.TimeStampMicroVector;
-import org.apache.arrow.vector.TimeStampMilliVector;
-import org.apache.arrow.vector.TimeStampNanoVector;
-import org.apache.arrow.vector.TimeStampSecVector;
-import org.apache.arrow.vector.TimeStampVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.types.pojo.ArrowType;
-
-import java.sql.Timestamp;
-
-/** {@link ArrowFieldReader} for Timestamp. */
-@Internal
-public final class TimestampFieldReader extends ArrowFieldReader<Timestamp> {
-
-    public TimestampFieldReader(ValueVector valueVector) {
-        super(valueVector);
-        Preconditions.checkState(
-                valueVector instanceof TimeStampVector
-                        && ((ArrowType.Timestamp) valueVector.getField().getType()).getTimezone()
-                                == null);
-    }
-
-    @Override
-    public Timestamp read(int i) {
-        ValueVector valueVector = getValueVector();
-        if (valueVector.isNull(i)) {
-            return null;
-        } else {
-            long millisecond;
-            if (valueVector instanceof TimeStampSecVector) {
-                millisecond = ((TimeStampSecVector) valueVector).get(i) * 1000;
-            } else if (valueVector instanceof TimeStampMilliVector) {
-                millisecond = ((TimeStampMilliVector) valueVector).get(i);
-            } else if (valueVector instanceof TimeStampMicroVector) {
-                millisecond = ((TimeStampMicroVector) valueVector).get(i) / 1000;
-            } else {
-                millisecond = ((TimeStampNanoVector) valueVector).get(i) / 1_000_000;
-            }
-            return PythonTypeUtils.internalToTimestamp(millisecond);
-        }
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TinyIntFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TinyIntFieldReader.java
deleted file mode 100644
index f27f936..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TinyIntFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.TinyIntVector;
-
-/** {@link ArrowFieldReader} for TinyInt. */
-@Internal
-public final class TinyIntFieldReader extends ArrowFieldReader<Byte> {
-
-    public TinyIntFieldReader(TinyIntVector tinyIntVector) {
-        super(tinyIntVector);
-    }
-
-    @Override
-    public Byte read(int index) {
-        return ((TinyIntVector) getValueVector()).getObject(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarBinaryFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarBinaryFieldReader.java
deleted file mode 100644
index 354d031..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarBinaryFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.VarBinaryVector;
-
-/** {@link ArrowFieldReader} for VarBinary. */
-@Internal
-public final class VarBinaryFieldReader extends ArrowFieldReader<byte[]> {
-
-    public VarBinaryFieldReader(VarBinaryVector varBinaryVector) {
-        super(varBinaryVector);
-    }
-
-    @Override
-    public byte[] read(int index) {
-        return ((VarBinaryVector) getValueVector()).get(index);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarCharFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarCharFieldReader.java
deleted file mode 100644
index d69b546..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarCharFieldReader.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.runtime.util.StringUtf8Utils;
-
-import org.apache.arrow.vector.VarCharVector;
-
-/** {@link ArrowFieldReader} for VarChar. */
-@Internal
-public final class VarCharFieldReader extends ArrowFieldReader<String> {
-
-    public VarCharFieldReader(VarCharVector varCharVector) {
-        super(varCharVector);
-    }
-
-    @Override
-    public String read(int index) {
-        if (getValueVector().isNull(index)) {
-            return null;
-        } else {
-            byte[] bytes = ((VarCharVector) getValueVector()).get(index);
-            return StringUtf8Utils.decodeUTF8(bytes, 0, bytes.length);
-        }
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/RowArrowSerializer.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/RowArrowSerializer.java
deleted file mode 100644
index d5527c7..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/RowArrowSerializer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.serializers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.runtime.arrow.ArrowReader;
-import org.apache.flink.table.runtime.arrow.ArrowUtils;
-import org.apache.flink.table.runtime.arrow.ArrowWriter;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import org.apache.arrow.vector.VectorSchemaRoot;
-
-/** It takes {@link Row} as the input type. */
-@Internal
-public class RowArrowSerializer extends ArrowSerializer<Row> {
-    public RowArrowSerializer(RowType inputType, RowType outputType) {
-        super(inputType, outputType);
-    }
-
-    @Override
-    public ArrowWriter<Row> createArrowWriter() {
-        return ArrowUtils.createRowArrowWriter(rootWriter, inputType);
-    }
-
-    @Override
-    public ArrowReader<Row> createArrowReader(VectorSchemaRoot root) {
-        return ArrowUtils.createRowArrowReader(root, outputType);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunction.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunction.java
deleted file mode 100644
index 39cefc2..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunction.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.sources;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.table.runtime.arrow.ArrowReader;
-import org.apache.flink.table.runtime.arrow.ArrowUtils;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-
-import org.apache.arrow.vector.VectorSchemaRoot;
-
-/** An Arrow {@link SourceFunction} which takes {@link Row} as the type of the produced records. */
-@Internal
-public class RowArrowSourceFunction extends AbstractArrowSourceFunction<Row> {
-
-    private static final long serialVersionUID = 1L;
-
-    RowArrowSourceFunction(DataType dataType, byte[][] arrowData) {
-        super(dataType, arrowData);
-    }
-
-    @Override
-    ArrowReader<Row> createArrowReader(VectorSchemaRoot root) {
-        return ArrowUtils.createRowArrowReader(root, (RowType) dataType.getLogicalType());
-    }
-
-    @Override
-    public TypeInformation<Row> getProducedType() {
-        return (TypeInformation<Row>) TypeConversions.fromDataTypeToLegacyInfo(dataType);
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowTableSource.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowTableSource.java
deleted file mode 100644
index 463f69b..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowTableSource.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.sources;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.types.Row;
-
-/** An Arrow TableSource which takes {@link Row} as the type of the produced records. */
-@Internal
-public class RowArrowTableSource extends AbstractArrowTableSource<Row> {
-    public RowArrowTableSource(DataType dataType, byte[][] arrowData) {
-        super(dataType, arrowData);
-    }
-
-    @Override
-    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
-        return execEnv.addSource(new RowArrowSourceFunction(dataType, arrowData));
-    }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
index 67bebc2..a5259e1 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
@@ -24,25 +24,17 @@ import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
 import org.apache.flink.api.common.typeutils.base.ByteSerializer;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
-import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.api.common.typeutils.base.ShortSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
 import org.apache.flink.table.runtime.typeutils.serializers.python.ArrayDataSerializer;
-import org.apache.flink.table.runtime.typeutils.serializers.python.BigDecSerializer;
-import org.apache.flink.table.runtime.typeutils.serializers.python.DateSerializer;
 import org.apache.flink.table.runtime.typeutils.serializers.python.DecimalDataSerializer;
 import org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer;
 import org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer;
-import org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer;
-import org.apache.flink.table.runtime.typeutils.serializers.python.TimeSerializer;
-import org.apache.flink.table.runtime.typeutils.serializers.python.TimestampSerializer;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BinaryType;
@@ -68,12 +60,8 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 
-import java.lang.reflect.Array;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
 import java.util.TimeZone;
 
 /**
@@ -113,10 +101,6 @@ public final class PythonTypeUtils {
         return logicalType.accept(new PythonTypeUtils.LogicalTypeToProtoTypeConverter());
     }
 
-    public static TypeSerializer toFlinkTypeSerializer(LogicalType logicalType) {
-        return logicalType.accept(new LogicalTypeToTypeSerializerConverter());
-    }
-
     public static TypeSerializer toBlinkTypeSerializer(LogicalType logicalType) {
         return logicalType.accept(new LogicalTypeToBlinkTypeSerializerConverter());
     }
@@ -163,16 +147,6 @@ public final class PythonTypeUtils {
     }
 
     /**
-     * Converts the internal representation of a SQL TIMESTAMP (long) to the Java type used for UDF
-     * parameters ({@link java.sql.Timestamp}).
-     *
-     * <p>Note: The implementation refers to {@link SqlDateTimeUtils#internalToTimestamp}.
-     */
-    public static java.sql.Timestamp internalToTimestamp(long v) {
-        return new java.sql.Timestamp(v - LOCAL_TZ.getOffset(v));
-    }
-
-    /**
      * Converts the Java type used for UDF parameters of SQL TIMESTAMP type ({@link
      * java.sql.Timestamp}) to internal representation (long).
      *
@@ -183,43 +157,7 @@ public final class PythonTypeUtils {
         return time + LOCAL_TZ.getOffset(time);
     }
 
-    /** Convert LogicalType to conversion class for flink planner. */
-    public static class LogicalTypeToConversionClassConverter
-            extends LogicalTypeDefaultVisitor<Class> {
-
-        public static final LogicalTypeToConversionClassConverter INSTANCE =
-                new LogicalTypeToConversionClassConverter();
-
-        private LogicalTypeToConversionClassConverter() {}
-
-        @Override
-        public Class visit(DateType dateType) {
-            return Date.class;
-        }
-
-        @Override
-        public Class visit(TimeType timeType) {
-            return Time.class;
-        }
-
-        @Override
-        public Class visit(TimestampType timestampType) {
-            return Timestamp.class;
-        }
-
-        @Override
-        public Class visit(ArrayType arrayType) {
-            Class elementClass = arrayType.getElementType().accept(this);
-            return Array.newInstance(elementClass, 0).getClass();
-        }
-
-        @Override
-        protected Class defaultMethod(LogicalType logicalType) {
-            return logicalType.getDefaultConversion();
-        }
-    }
-
-    private static class LogicalTypeToTypeSerializerConverter
+    private static class LogicalTypeToBlinkTypeSerializerConverter
             extends LogicalTypeDefaultVisitor<TypeSerializer> {
         @Override
         public TypeSerializer visit(BooleanType booleanType) {
@@ -267,78 +205,6 @@ public final class PythonTypeUtils {
         }
 
         @Override
-        public TypeSerializer visit(VarCharType varCharType) {
-            return StringSerializer.INSTANCE;
-        }
-
-        @Override
-        public TypeSerializer visit(CharType charType) {
-            return StringSerializer.INSTANCE;
-        }
-
-        @Override
-        public TypeSerializer visit(DateType dateType) {
-            return DateSerializer.INSTANCE;
-        }
-
-        @Override
-        public TypeSerializer visit(TimeType timeType) {
-            return TimeSerializer.INSTANCE;
-        }
-
-        @Override
-        public TypeSerializer visit(TimestampType timestampType) {
-            return new TimestampSerializer(timestampType.getPrecision());
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public TypeSerializer visit(ArrayType arrayType) {
-            LogicalType elementType = arrayType.getElementType();
-            TypeSerializer<?> elementTypeSerializer = elementType.accept(this);
-            Class<?> elementClass =
-                    elementType.accept(LogicalTypeToConversionClassConverter.INSTANCE);
-            return new GenericArraySerializer(elementClass, elementTypeSerializer);
-        }
-
-        @Override
-        public TypeSerializer visit(MapType mapType) {
-            TypeSerializer<?> keyTypeSerializer = mapType.getKeyType().accept(this);
-            TypeSerializer<?> valueTypeSerializer = mapType.getValueType().accept(this);
-            return new MapSerializer<>(keyTypeSerializer, valueTypeSerializer);
-        }
-
-        @Override
-        public TypeSerializer visit(RowType rowType) {
-            final TypeSerializer[] fieldTypeSerializers =
-                    rowType.getFields().stream()
-                            .map(f -> f.getType().accept(this))
-                            .toArray(TypeSerializer[]::new);
-            return new RowSerializer(fieldTypeSerializers);
-        }
-
-        @Override
-        protected TypeSerializer defaultMethod(LogicalType logicalType) {
-            if (logicalType instanceof LegacyTypeInformationType) {
-                Class<?> typeClass =
-                        ((LegacyTypeInformationType) logicalType)
-                                .getTypeInformation()
-                                .getTypeClass();
-                if (typeClass == BigDecimal.class) {
-                    return BigDecSerializer.INSTANCE;
-                }
-            }
-            throw new UnsupportedOperationException(
-                    String.format(
-                            "Python UDF doesn't support logical type %s currently.",
-                            logicalType.asSummaryString()));
-        }
-    }
-
-    private static class LogicalTypeToBlinkTypeSerializerConverter
-            extends LogicalTypeToTypeSerializerConverter {
-
-        @Override
         public TypeSerializer visit(RowType rowType) {
             final TypeSerializer[] fieldTypeSerializers =
                     rowType.getFields().stream()
@@ -398,6 +264,14 @@ public final class PythonTypeUtils {
         public TypeSerializer visit(DecimalType decimalType) {
             return new DecimalDataSerializer(decimalType.getPrecision(), decimalType.getScale());
         }
+
+        @Override
+        protected TypeSerializer defaultMethod(LogicalType logicalType) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Python UDF doesn't support logical type %s currently.",
+                            logicalType.asSummaryString()));
+        }
     }
 
     /** Converter That convert the logicalType to the related Prototype. */
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java
index 13a8d4a..33258c2 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java
@@ -18,27 +18,10 @@
 
 package org.apache.flink.table.runtime.arrow;
 
-import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple6;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.runtime.arrow.readers.ArrayFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.ArrowFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.BooleanFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DateFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DecimalFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DoubleFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.FloatFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.IntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.RowArrowReader;
-import org.apache.flink.table.runtime.arrow.readers.RowFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.SmallIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TimeFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TimestampFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TinyIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.VarBinaryFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.VarCharFieldReader;
 import org.apache.flink.table.runtime.arrow.vectors.ArrowArrayColumnVector;
 import org.apache.flink.table.runtime.arrow.vectors.ArrowBigIntColumnVector;
 import org.apache.flink.table.runtime.arrow.vectors.ArrowBooleanColumnVector;
@@ -103,7 +86,6 @@ import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.types.Row;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
@@ -132,8 +114,7 @@ import static org.junit.Assert.assertEquals;
 /** Tests for {@link ArrowUtils}. */
 public class ArrowUtilsTest {
 
-    private static List<
-                    Tuple7<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>, Class<?>>>
+    private static List<Tuple6<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>>>
             testFields;
     private static RowType rowType;
     private static BufferAllocator allocator;
@@ -142,243 +123,219 @@ public class ArrowUtilsTest {
     public static void init() {
         testFields = new ArrayList<>();
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f1",
                         new TinyIntType(),
                         new ArrowType.Int(8, true),
                         RowTinyIntWriter.class,
                         TinyIntWriter.TinyIntWriterForRow.class,
-                        TinyIntFieldReader.class,
                         ArrowTinyIntColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f2",
                         new SmallIntType(),
                         new ArrowType.Int(8 * 2, true),
                         RowSmallIntWriter.class,
                         SmallIntWriter.SmallIntWriterForRow.class,
-                        SmallIntFieldReader.class,
                         ArrowSmallIntColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f3",
                         new IntType(),
                         new ArrowType.Int(8 * 4, true),
                         RowIntWriter.class,
                         IntWriter.IntWriterForRow.class,
-                        IntFieldReader.class,
                         ArrowIntColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f4",
                         new BigIntType(),
                         new ArrowType.Int(8 * 8, true),
                         RowBigIntWriter.class,
                         BigIntWriter.BigIntWriterForRow.class,
-                        BigIntFieldReader.class,
                         ArrowBigIntColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f5",
                         new BooleanType(),
                         new ArrowType.Bool(),
                         RowBooleanWriter.class,
                         BooleanWriter.BooleanWriterForRow.class,
-                        BooleanFieldReader.class,
                         ArrowBooleanColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f6",
                         new FloatType(),
                         new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE),
                         RowFloatWriter.class,
                         FloatWriter.FloatWriterForRow.class,
-                        FloatFieldReader.class,
                         ArrowFloatColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f7",
                         new DoubleType(),
                         new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE),
                         RowDoubleWriter.class,
                         DoubleWriter.DoubleWriterForRow.class,
-                        DoubleFieldReader.class,
                         ArrowDoubleColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f8",
                         new VarCharType(),
                         ArrowType.Utf8.INSTANCE,
                         RowVarCharWriter.class,
                         VarCharWriter.VarCharWriterForRow.class,
-                        VarCharFieldReader.class,
                         ArrowVarCharColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f9",
                         new VarBinaryType(),
                         ArrowType.Binary.INSTANCE,
                         RowVarBinaryWriter.class,
                         VarBinaryWriter.VarBinaryWriterForRow.class,
-                        VarBinaryFieldReader.class,
                         ArrowVarBinaryColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f10",
                         new DecimalType(10, 3),
                         new ArrowType.Decimal(10, 3),
                         RowDecimalWriter.class,
                         DecimalWriter.DecimalWriterForRow.class,
-                        DecimalFieldReader.class,
                         ArrowDecimalColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f11",
                         new DateType(),
                         new ArrowType.Date(DateUnit.DAY),
                         RowDateWriter.class,
                         DateWriter.DateWriterForRow.class,
-                        DateFieldReader.class,
                         ArrowDateColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f13",
                         new TimeType(0),
                         new ArrowType.Time(TimeUnit.SECOND, 32),
                         RowTimeWriter.class,
                         TimeWriter.TimeWriterForRow.class,
-                        TimeFieldReader.class,
                         ArrowTimeColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f14",
                         new TimeType(2),
                         new ArrowType.Time(TimeUnit.MILLISECOND, 32),
                         RowTimeWriter.class,
                         TimeWriter.TimeWriterForRow.class,
-                        TimeFieldReader.class,
                         ArrowTimeColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f15",
                         new TimeType(4),
                         new ArrowType.Time(TimeUnit.MICROSECOND, 64),
                         RowTimeWriter.class,
                         TimeWriter.TimeWriterForRow.class,
-                        TimeFieldReader.class,
                         ArrowTimeColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f16",
                         new TimeType(8),
                         new ArrowType.Time(TimeUnit.NANOSECOND, 64),
                         RowTimeWriter.class,
                         TimeWriter.TimeWriterForRow.class,
-                        TimeFieldReader.class,
                         ArrowTimeColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f17",
                         new LocalZonedTimestampType(0),
                         new ArrowType.Timestamp(TimeUnit.SECOND, null),
                         RowTimestampWriter.class,
                         TimestampWriter.TimestampWriterForRow.class,
-                        TimestampFieldReader.class,
                         ArrowTimestampColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f18",
                         new LocalZonedTimestampType(2),
                         new ArrowType.Timestamp(TimeUnit.MILLISECOND, null),
                         RowTimestampWriter.class,
                         TimestampWriter.TimestampWriterForRow.class,
-                        TimestampFieldReader.class,
                         ArrowTimestampColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f19",
                         new LocalZonedTimestampType(4),
                         new ArrowType.Timestamp(TimeUnit.MICROSECOND, null),
                         RowTimestampWriter.class,
                         TimestampWriter.TimestampWriterForRow.class,
-                        TimestampFieldReader.class,
                         ArrowTimestampColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f20",
                         new LocalZonedTimestampType(8),
                         new ArrowType.Timestamp(TimeUnit.NANOSECOND, null),
                         RowTimestampWriter.class,
                         TimestampWriter.TimestampWriterForRow.class,
-                        TimestampFieldReader.class,
                         ArrowTimestampColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f21",
                         new TimestampType(0),
                         new ArrowType.Timestamp(TimeUnit.SECOND, null),
                         RowTimestampWriter.class,
                         TimestampWriter.TimestampWriterForRow.class,
-                        TimestampFieldReader.class,
                         ArrowTimestampColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f22",
                         new TimestampType(2),
                         new ArrowType.Timestamp(TimeUnit.MILLISECOND, null),
                         RowTimestampWriter.class,
                         TimestampWriter.TimestampWriterForRow.class,
-                        TimestampFieldReader.class,
                         ArrowTimestampColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f23",
                         new TimestampType(4),
                         new ArrowType.Timestamp(TimeUnit.MICROSECOND, null),
                         RowTimestampWriter.class,
                         TimestampWriter.TimestampWriterForRow.class,
-                        TimestampFieldReader.class,
                         ArrowTimestampColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f24",
                         new TimestampType(8),
                         new ArrowType.Timestamp(TimeUnit.NANOSECOND, null),
                         RowTimestampWriter.class,
                         TimestampWriter.TimestampWriterForRow.class,
-                        TimestampFieldReader.class,
                         ArrowTimestampColumnVector.class));
 
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f25",
                         new ArrayType(new VarCharType()),
                         ArrowType.List.INSTANCE,
                         RowArrayWriter.class,
                         ArrayWriter.ArrayWriterForRow.class,
-                        ArrayFieldReader.class,
                         ArrowArrayColumnVector.class));
 
         RowType rowFieldType =
@@ -396,17 +353,16 @@ public class ArrowUtilsTest {
                                                         new RowType.RowField(
                                                                 "e2", new VarCharType())))))));
         testFields.add(
-                Tuple7.of(
+                Tuple6.of(
                         "f26",
                         rowFieldType,
                         ArrowType.Struct.INSTANCE,
                         RowRowWriter.class,
                         RowWriter.RowWriterForRow.class,
-                        RowFieldReader.class,
                         ArrowRowColumnVector.class));
 
         List<RowType.RowField> rowFields = new ArrayList<>();
-        for (Tuple7<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>, Class<?>> field :
+        for (Tuple6<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>> field :
                 testFields) {
             rowFields.add(new RowType.RowField(field.f0, field.f1));
         }
@@ -429,35 +385,13 @@ public class ArrowUtilsTest {
     }
 
     @Test
-    public void testCreateRowArrowReader() {
-        VectorSchemaRoot root =
-                VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
-        RowArrowReader reader = ArrowUtils.createRowArrowReader(root, rowType);
-        ArrowFieldReader[] fieldReaders = reader.getFieldReaders();
-        for (int i = 0; i < fieldReaders.length; i++) {
-            assertEquals(testFields.get(i).f5, fieldReaders[i].getClass());
-        }
-    }
-
-    @Test
     public void testCreateRowDataArrowReader() {
         VectorSchemaRoot root =
                 VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
         RowDataArrowReader reader = ArrowUtils.createRowDataArrowReader(root, rowType);
         ColumnVector[] columnVectors = reader.getColumnVectors();
         for (int i = 0; i < columnVectors.length; i++) {
-            assertEquals(testFields.get(i).f6, columnVectors[i].getClass());
-        }
-    }
-
-    @Test
-    public void testCreateRowArrowWriter() {
-        VectorSchemaRoot root =
-                VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
-        ArrowWriter<Row> writer = ArrowUtils.createRowArrowWriter(root, rowType);
-        ArrowFieldWriter<Row>[] fieldWriters = writer.getFieldWriters();
-        for (int i = 0; i < fieldWriters.length; i++) {
-            assertEquals(testFields.get(i).f3, fieldWriters[i].getClass());
+            assertEquals(testFields.get(i).f5, columnVectors[i].getClass());
         }
     }
 
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/RowArrowReaderWriterTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/RowArrowReaderWriterTest.java
deleted file mode 100644
index 22c9e1b..0000000
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/RowArrowReaderWriterTest.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
-import org.apache.flink.table.types.logical.ArrayType;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.BooleanType;
-import org.apache.flink.table.types.logical.DateType;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.DoubleType;
-import org.apache.flink.table.types.logical.FloatType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.SmallIntType;
-import org.apache.flink.table.types.logical.TimeType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.TinyIntType;
-import org.apache.flink.table.types.logical.VarBinaryType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.types.Row;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.ipc.ArrowStreamWriter;
-import org.junit.BeforeClass;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.math.BigDecimal;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/** Tests for {@link ArrowReader} and {@link ArrowWriter} of Row. */
-public class RowArrowReaderWriterTest extends ArrowReaderWriterTestBase<Row> {
-    private static RowType rowType;
-    private static BufferAllocator allocator;
-
-    @BeforeClass
-    public static void init() {
-        List<LogicalType> fieldTypes = new ArrayList<>();
-        fieldTypes.add(new TinyIntType());
-        fieldTypes.add(new SmallIntType());
-        fieldTypes.add(new IntType());
-        fieldTypes.add(new BigIntType());
-        fieldTypes.add(new BooleanType());
-        fieldTypes.add(new FloatType());
-        fieldTypes.add(new DoubleType());
-        fieldTypes.add(new VarCharType());
-        fieldTypes.add(new VarBinaryType());
-        fieldTypes.add(new DecimalType(10, 0));
-        fieldTypes.add(new DateType());
-        fieldTypes.add(new TimeType(0));
-        fieldTypes.add(new TimeType(2));
-        fieldTypes.add(new TimeType(4));
-        fieldTypes.add(new TimeType(8));
-        fieldTypes.add(new LocalZonedTimestampType(0));
-        fieldTypes.add(new LocalZonedTimestampType(2));
-        fieldTypes.add(new LocalZonedTimestampType(4));
-        fieldTypes.add(new LocalZonedTimestampType(8));
-        fieldTypes.add(new TimestampType(0));
-        fieldTypes.add(new TimestampType(2));
-        fieldTypes.add(new TimestampType(4));
-        fieldTypes.add(new TimestampType(8));
-        fieldTypes.add(new ArrayType(new VarCharType()));
-        fieldTypes.add(
-                new RowType(
-                        Arrays.asList(
-                                new RowType.RowField("a", new IntType()),
-                                new RowType.RowField("b", new VarCharType()),
-                                new RowType.RowField("c", new ArrayType(new VarCharType())),
-                                new RowType.RowField("d", new TimestampType(2)),
-                                new RowType.RowField(
-                                        "e",
-                                        new RowType(
-                                                Arrays.asList(
-                                                        new RowType.RowField("e1", new IntType()),
-                                                        new RowType.RowField(
-                                                                "e2", new VarCharType())))))));
-
-        List<RowType.RowField> rowFields = new ArrayList<>();
-        for (int i = 0; i < fieldTypes.size(); i++) {
-            rowFields.add(new RowType.RowField("f" + i, fieldTypes.get(i)));
-        }
-        rowType = new RowType(rowFields);
-        allocator = ArrowUtils.getRootAllocator().newChildAllocator("stdout", 0, Long.MAX_VALUE);
-    }
-
-    @Override
-    public ArrowReader<Row> createArrowReader(InputStream inputStream) throws IOException {
-        ArrowStreamReader reader = new ArrowStreamReader(inputStream, allocator);
-        reader.loadNextBatch();
-        return ArrowUtils.createRowArrowReader(reader.getVectorSchemaRoot(), rowType);
-    }
-
-    @Override
-    public Tuple2<ArrowWriter<Row>, ArrowStreamWriter> createArrowWriter(OutputStream outputStream)
-            throws IOException {
-        VectorSchemaRoot root =
-                VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
-        ArrowWriter<Row> arrowWriter = ArrowUtils.createRowArrowWriter(root, rowType);
-        ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(root, null, outputStream);
-        arrowStreamWriter.start();
-        return Tuple2.of(arrowWriter, arrowStreamWriter);
-    }
-
-    @Override
-    public Row[] getTestData() {
-        Row row1 =
-                Row.of(
-                        (byte) 1,
-                        (short) 2,
-                        3,
-                        4L,
-                        true,
-                        1.0f,
-                        1.0,
-                        "hello",
-                        "hello".getBytes(),
-                        new BigDecimal(1),
-                        SqlDateTimeUtils.internalToDate(100),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new String[] {null, null, null},
-                        Row.of(
-                                1,
-                                "hello",
-                                new String[] {null, null, null},
-                                new Timestamp(3600000),
-                                Row.of(1, "hello")));
-        Row row2 =
-                Row.of(
-                        null,
-                        (short) 2,
-                        3,
-                        4L,
-                        false,
-                        1.0f,
-                        1.0,
-                        "中文",
-                        "中文".getBytes(),
-                        new BigDecimal(1),
-                        SqlDateTimeUtils.internalToDate(100),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new String[] {"hello", "中文", null},
-                        Row.of(
-                                1,
-                                "hello",
-                                new String[] {"hello", "中文", null},
-                                new Timestamp(3600000),
-                                Row.of(1, "hello")));
-        Row row3 =
-                Row.of(
-                        (byte) 1,
-                        null,
-                        3,
-                        4L,
-                        true,
-                        1.0f,
-                        1.0,
-                        "hello",
-                        "hello".getBytes(),
-                        new BigDecimal(1),
-                        SqlDateTimeUtils.internalToDate(100),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        SqlDateTimeUtils.internalToTime(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        new Timestamp(3600000),
-                        null,
-                        null);
-        Row row4 = new Row(rowType.getFieldCount());
-        return new Row[] {row1, row2, row3, row4};
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java
deleted file mode 100644
index 329bcec..0000000
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.sources;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
-import org.apache.flink.table.runtime.arrow.ArrowUtils;
-import org.apache.flink.table.runtime.arrow.ArrowWriter;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.junit.BeforeClass;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-
-/** Tests for {@link RowArrowSourceFunction}. */
-public class RowArrowSourceFunctionTest extends ArrowSourceFunctionTestBase<Row> {
-
-    private static List<LogicalType> fieldTypes = new ArrayList<>();
-    private static RowType rowType;
-    private static DataType dataType;
-    private static BufferAllocator allocator;
-
-    public RowArrowSourceFunctionTest() {
-        super(
-                VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator),
-                new RowSerializer(new TypeSerializer[] {StringSerializer.INSTANCE}),
-                Comparator.comparing(o -> (String) (o.getField(0))));
-    }
-
-    @BeforeClass
-    public static void init() {
-        fieldTypes.add(new VarCharType());
-        List<RowType.RowField> rowFields = new ArrayList<>();
-        for (int i = 0; i < fieldTypes.size(); i++) {
-            rowFields.add(new RowType.RowField("f" + i, fieldTypes.get(i)));
-        }
-        rowType = new RowType(rowFields);
-        dataType = TypeConversions.fromLogicalToDataType(rowType);
-        allocator = ArrowUtils.getRootAllocator().newChildAllocator("stdout", 0, Long.MAX_VALUE);
-    }
-
-    @Override
-    public Tuple2<List<Row>, Integer> getTestData() {
-        return Tuple2.of(
-                Arrays.asList(
-                        Row.of("aaa"), Row.of("bbb"), Row.of("ccc"), Row.of("ddd"), Row.of("eee")),
-                3);
-    }
-
-    @Override
-    public ArrowWriter<Row> createArrowWriter() {
-        return ArrowUtils.createRowArrowWriter(root, rowType);
-    }
-
-    @Override
-    public AbstractArrowSourceFunction<Row> createArrowSourceFunction(byte[][] arrowData) {
-        return new RowArrowSourceFunction(dataType, arrowData);
-    }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java
index 8e0b2ec..ab2f1c1 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java
@@ -19,12 +19,9 @@
 package org.apache.flink.table.runtime.typeutils;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
-import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.DateType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
@@ -32,7 +29,6 @@ import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.Test;
 
-import java.sql.Date;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -43,17 +39,6 @@ import static org.junit.Assert.assertTrue;
 public class PythonTypeUtilsTest {
 
     @Test
-    public void testLogicalTypeToFlinkTypeSerializer() {
-        List<RowType.RowField> rowFields = new ArrayList<>();
-        rowFields.add(new RowType.RowField("f1", new BigIntType()));
-        RowType rowType = new RowType(rowFields);
-        TypeSerializer rowSerializer = PythonTypeUtils.toFlinkTypeSerializer(rowType);
-        assertTrue(rowSerializer instanceof RowSerializer);
-
-        assertEquals(1, ((RowSerializer) rowSerializer).getArity());
-    }
-
-    @Test
     public void testLogicalTypeToBlinkTypeSerializer() {
         List<RowType.RowField> rowFields = new ArrayList<>();
         rowFields.add(new RowType.RowField("f1", new BigIntType()));
@@ -85,19 +70,10 @@ public class PythonTypeUtilsTest {
         String expectedTestException =
                 "Python UDF doesn't support logical type `cat`.`db`.`MyType` currently.";
         try {
-            PythonTypeUtils.toFlinkTypeSerializer(logicalType);
+            PythonTypeUtils.toBlinkTypeSerializer(logicalType);
         } catch (Exception e) {
             assertTrue(
                     ExceptionUtils.findThrowableWithMessage(e, expectedTestException).isPresent());
         }
     }
-
-    @Test
-    public void testLogicalTypeToConversionClassConverter() {
-        PythonTypeUtils.LogicalTypeToConversionClassConverter converter =
-                PythonTypeUtils.LogicalTypeToConversionClassConverter.INSTANCE;
-        ArrayType arrayType = new ArrayType(new ArrayType(new DateType()));
-        Class<?> conversionClass = converter.visit(arrayType);
-        assertEquals(Date[][].class, conversionClass);
-    }
 }