You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/05/28 13:21:55 UTC
[flink] 02/02: [FLINK-22619][python] Drop Python UDF serializers
for old planner
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a2afa88d910bc5d99e95c8f9069fb13f5f75edd5
Author: huangxingbo <hx...@gmail.com>
AuthorDate: Thu May 27 20:01:26 2021 +0800
[FLINK-22619][python] Drop Python UDF serializers for old planner
This closes #16008.
---
.../flink/table/runtime/arrow/ArrowUtils.java | 171 +---------------
.../runtime/arrow/readers/ArrayFieldReader.java | 82 --------
.../runtime/arrow/readers/ArrowFieldReader.java | 48 -----
.../runtime/arrow/readers/BigIntFieldReader.java | 37 ----
.../runtime/arrow/readers/BooleanFieldReader.java | 37 ----
.../runtime/arrow/readers/DateFieldReader.java | 44 ----
.../runtime/arrow/readers/DecimalFieldReader.java | 39 ----
.../runtime/arrow/readers/DoubleFieldReader.java | 37 ----
.../runtime/arrow/readers/FloatFieldReader.java | 37 ----
.../runtime/arrow/readers/IntFieldReader.java | 37 ----
.../runtime/arrow/readers/RowArrowReader.java | 55 -----
.../runtime/arrow/readers/RowFieldReader.java | 50 -----
.../runtime/arrow/readers/SmallIntFieldReader.java | 37 ----
.../runtime/arrow/readers/TimeFieldReader.java | 68 -------
.../arrow/readers/TimestampFieldReader.java | 66 ------
.../runtime/arrow/readers/TinyIntFieldReader.java | 37 ----
.../arrow/readers/VarBinaryFieldReader.java | 37 ----
.../runtime/arrow/readers/VarCharFieldReader.java | 43 ----
.../arrow/serializers/RowArrowSerializer.java | 46 -----
.../arrow/sources/RowArrowSourceFunction.java | 52 -----
.../runtime/arrow/sources/RowArrowTableSource.java | 38 ----
.../table/runtime/typeutils/PythonTypeUtils.java | 144 +------------
.../flink/table/runtime/arrow/ArrowUtilsTest.java | 124 +++--------
.../runtime/arrow/RowArrowReaderWriterTest.java | 226 ---------------------
.../arrow/sources/RowArrowSourceFunctionTest.java | 87 --------
.../runtime/typeutils/PythonTypeUtilsTest.java | 26 +--
26 files changed, 40 insertions(+), 1665 deletions(-)
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
index 1264247..1821f5c 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
@@ -32,26 +32,8 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.data.vector.ColumnVector;
import org.apache.flink.table.operations.OutputConversionModifyOperation;
-import org.apache.flink.table.runtime.arrow.readers.ArrayFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.ArrowFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.BooleanFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DateFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DecimalFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DoubleFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.FloatFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.IntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.RowArrowReader;
-import org.apache.flink.table.runtime.arrow.readers.RowFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.SmallIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TimeFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TimestampFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TinyIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.VarBinaryFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.VarCharFieldReader;
import org.apache.flink.table.runtime.arrow.sources.AbstractArrowTableSource;
import org.apache.flink.table.runtime.arrow.sources.ArrowTableSource;
-import org.apache.flink.table.runtime.arrow.sources.RowArrowTableSource;
import org.apache.flink.table.runtime.arrow.vectors.ArrowArrayColumnVector;
import org.apache.flink.table.runtime.arrow.vectors.ArrowBigIntColumnVector;
import org.apache.flink.table.runtime.arrow.vectors.ArrowBooleanColumnVector;
@@ -77,21 +59,6 @@ import org.apache.flink.table.runtime.arrow.writers.DecimalWriter;
import org.apache.flink.table.runtime.arrow.writers.DoubleWriter;
import org.apache.flink.table.runtime.arrow.writers.FloatWriter;
import org.apache.flink.table.runtime.arrow.writers.IntWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowArrayWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowBigIntWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowBooleanWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowDateWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowDecimalWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowDoubleWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowFloatWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowIntWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowRowWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowSmallIntWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowTimeWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowTimestampWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowTinyIntWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowVarBinaryWriter;
-import org.apache.flink.table.runtime.arrow.writers.RowVarCharWriter;
import org.apache.flink.table.runtime.arrow.writers.RowWriter;
import org.apache.flink.table.runtime.arrow.writers.SmallIntWriter;
import org.apache.flink.table.runtime.arrow.writers.TimeWriter;
@@ -238,73 +205,6 @@ public final class ArrowUtils {
return new Field(fieldName, fieldType, children);
}
- /** Creates an {@link ArrowWriter} for the specified {@link VectorSchemaRoot}. */
- public static ArrowWriter<Row> createRowArrowWriter(VectorSchemaRoot root, RowType rowType) {
- ArrowFieldWriter<Row>[] fieldWriters = new ArrowFieldWriter[root.getFieldVectors().size()];
- List<FieldVector> vectors = root.getFieldVectors();
- for (int i = 0; i < vectors.size(); i++) {
- FieldVector vector = vectors.get(i);
- vector.allocateNew();
- fieldWriters[i] = createRowArrowFieldWriter(vector, rowType.getTypeAt(i));
- }
-
- return new ArrowWriter<>(root, fieldWriters);
- }
-
- private static ArrowFieldWriter<Row> createRowArrowFieldWriter(
- ValueVector vector, LogicalType fieldType) {
- if (vector instanceof TinyIntVector) {
- return new RowTinyIntWriter((TinyIntVector) vector);
- } else if (vector instanceof SmallIntVector) {
- return new RowSmallIntWriter((SmallIntVector) vector);
- } else if (vector instanceof IntVector) {
- return new RowIntWriter((IntVector) vector);
- } else if (vector instanceof BigIntVector) {
- return new RowBigIntWriter((BigIntVector) vector);
- } else if (vector instanceof BitVector) {
- return new RowBooleanWriter((BitVector) vector);
- } else if (vector instanceof Float4Vector) {
- return new RowFloatWriter((Float4Vector) vector);
- } else if (vector instanceof Float8Vector) {
- return new RowDoubleWriter((Float8Vector) vector);
- } else if (vector instanceof VarCharVector) {
- return new RowVarCharWriter((VarCharVector) vector);
- } else if (vector instanceof VarBinaryVector) {
- return new RowVarBinaryWriter((VarBinaryVector) vector);
- } else if (vector instanceof DecimalVector) {
- DecimalVector decimalVector = (DecimalVector) vector;
- return new RowDecimalWriter(
- decimalVector, getPrecision(decimalVector), decimalVector.getScale());
- } else if (vector instanceof DateDayVector) {
- return new RowDateWriter((DateDayVector) vector);
- } else if (vector instanceof TimeSecVector
- || vector instanceof TimeMilliVector
- || vector instanceof TimeMicroVector
- || vector instanceof TimeNanoVector) {
- return new RowTimeWriter(vector);
- } else if (vector instanceof TimeStampVector
- && ((ArrowType.Timestamp) vector.getField().getType()).getTimezone() == null) {
- return new RowTimestampWriter(vector);
- } else if (vector instanceof ListVector) {
- ListVector listVector = (ListVector) vector;
- LogicalType elementType = ((ArrayType) fieldType).getElementType();
- return new RowArrayWriter(
- listVector, createRowArrowFieldWriter(listVector.getDataVector(), elementType));
- } else if (vector instanceof StructVector) {
- RowType rowType = (RowType) fieldType;
- ArrowFieldWriter<Row>[] fieldsWriters = new ArrowFieldWriter[rowType.getFieldCount()];
- for (int i = 0; i < fieldsWriters.length; i++) {
- fieldsWriters[i] =
- createRowArrowFieldWriter(
- ((StructVector) vector).getVectorById(i), rowType.getTypeAt(i));
- }
- return new RowRowWriter((StructVector) vector, fieldsWriters);
- } else {
- throw new UnsupportedOperationException(
- String.format("Unsupported type %s.", fieldType));
- }
- }
-
/**
* Creates an {@link ArrowWriter} for blink planner for the specified {@link VectorSchemaRoot}.
*/
@@ -446,71 +346,6 @@ public final class ArrowUtils {
}
}
- /** Creates an {@link ArrowReader} for the specified {@link VectorSchemaRoot}. */
- public static RowArrowReader createRowArrowReader(VectorSchemaRoot root, RowType rowType) {
- List<ArrowFieldReader> fieldReaders = new ArrayList<>();
- List<FieldVector> fieldVectors = root.getFieldVectors();
- for (int i = 0; i < fieldVectors.size(); i++) {
- fieldReaders.add(createRowArrowFieldReader(fieldVectors.get(i), rowType.getTypeAt(i)));
- }
-
- return new RowArrowReader(fieldReaders.toArray(new ArrowFieldReader[0]));
- }
-
- public static ArrowFieldReader createRowArrowFieldReader(
- ValueVector vector, LogicalType fieldType) {
- if (vector instanceof TinyIntVector) {
- return new TinyIntFieldReader((TinyIntVector) vector);
- } else if (vector instanceof SmallIntVector) {
- return new SmallIntFieldReader((SmallIntVector) vector);
- } else if (vector instanceof IntVector) {
- return new IntFieldReader((IntVector) vector);
- } else if (vector instanceof BigIntVector) {
- return new BigIntFieldReader((BigIntVector) vector);
- } else if (vector instanceof BitVector) {
- return new BooleanFieldReader((BitVector) vector);
- } else if (vector instanceof Float4Vector) {
- return new FloatFieldReader((Float4Vector) vector);
- } else if (vector instanceof Float8Vector) {
- return new DoubleFieldReader((Float8Vector) vector);
- } else if (vector instanceof VarCharVector) {
- return new VarCharFieldReader((VarCharVector) vector);
- } else if (vector instanceof VarBinaryVector) {
- return new VarBinaryFieldReader((VarBinaryVector) vector);
- } else if (vector instanceof DecimalVector) {
- return new DecimalFieldReader((DecimalVector) vector);
- } else if (vector instanceof DateDayVector) {
- return new DateFieldReader((DateDayVector) vector);
- } else if (vector instanceof TimeSecVector
- || vector instanceof TimeMilliVector
- || vector instanceof TimeMicroVector
- || vector instanceof TimeNanoVector) {
- return new TimeFieldReader(vector);
- } else if (vector instanceof TimeStampVector
- && ((ArrowType.Timestamp) vector.getField().getType()).getTimezone() == null) {
- return new TimestampFieldReader(vector);
- } else if (vector instanceof ListVector) {
- ListVector listVector = (ListVector) vector;
- LogicalType elementType = ((ArrayType) fieldType).getElementType();
- return new ArrayFieldReader(
- listVector,
- createRowArrowFieldReader(listVector.getDataVector(), elementType),
- elementType);
- } else if (vector instanceof StructVector) {
- StructVector structVector = (StructVector) vector;
- ArrowFieldReader[] fieldReaders = new ArrowFieldReader[structVector.size()];
- for (int i = 0; i < fieldReaders.length; i++) {
- fieldReaders[i] =
- createRowArrowFieldReader(
- structVector.getVectorById(i), ((RowType) fieldType).getTypeAt(i));
- }
- return new RowFieldReader(structVector, fieldReaders);
- } else {
- throw new UnsupportedOperationException(
- String.format("Unsupported type %s.", fieldType));
- }
- }
-
/**
* Creates an {@link ArrowReader} for blink planner for the specified {@link VectorSchemaRoot}.
*/
@@ -580,11 +415,7 @@ public final class ArrowUtils {
public static AbstractArrowTableSource createArrowTableSource(
DataType dataType, String fileName) throws IOException {
try (FileInputStream fis = new FileInputStream(fileName)) {
- if (RowData.class.isAssignableFrom(dataType.getConversionClass())) {
- return new ArrowTableSource(dataType, readArrowBatches(fis.getChannel()));
- } else {
- return new RowArrowTableSource(dataType, readArrowBatches(fis.getChannel()));
- }
+ return new ArrowTableSource(dataType, readArrowBatches(fis.getChannel()));
}
}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrayFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrayFieldReader.java
deleted file mode 100644
index 32130a1..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrayFieldReader.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.DateType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.TimeType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.arrow.vector.complex.ListVector;
-
-import java.lang.reflect.Array;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-
-/** {@link ArrowFieldReader} for Array. */
-@Internal
-public final class ArrayFieldReader extends ArrowFieldReader<Object[]> {
-
- private final ArrowFieldReader arrayData;
- private final Class<?> elementClass;
-
- public ArrayFieldReader(
- ListVector listVector, ArrowFieldReader arrayData, LogicalType elementType) {
- super(listVector);
- this.arrayData = Preconditions.checkNotNull(arrayData);
- this.elementClass = getElementClass(elementType);
- }
-
- @Override
- public Object[] read(int index) {
- if (getValueVector().isNull(index)) {
- return null;
- } else {
- int startIndex = index * ListVector.OFFSET_WIDTH;
- int start = getValueVector().getOffsetBuffer().getInt(startIndex);
- int end =
- getValueVector().getOffsetBuffer().getInt(startIndex + ListVector.OFFSET_WIDTH);
- Object[] result = (Object[]) Array.newInstance(elementClass, end - start);
- for (int i = 0; i < result.length; i++) {
- result[i] = arrayData.read(start + i);
- }
- return result;
- }
- }
-
- private Class<?> getElementClass(LogicalType elementType) {
- DataType dataType = TypeConversions.fromLogicalToDataType(elementType);
- if (elementType instanceof TimestampType) {
- // the default conversion class is java.time.LocalDateTime
- dataType = dataType.bridgedTo(Timestamp.class);
- } else if (elementType instanceof DateType) {
- // the default conversion class is java.time.LocalDate
- dataType = dataType.bridgedTo(Date.class);
- } else if (elementType instanceof TimeType) {
- // the default conversion class is java.time.LocalTime
- dataType = dataType.bridgedTo(Time.class);
- }
- return dataType.getConversionClass();
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrowFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrowFieldReader.java
deleted file mode 100644
index 6773b9e..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/ArrowFieldReader.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.arrow.vector.ValueVector;
-
-/**
- * Base class for arrow field reader.
- *
- * @param <OUT> Type of the row to write.
- */
-@Internal
-public abstract class ArrowFieldReader<OUT> {
-
- /** Container which is used to store the sequence of values of a column to read. */
- private final ValueVector valueVector;
-
- public ArrowFieldReader(ValueVector valueVector) {
- this.valueVector = Preconditions.checkNotNull(valueVector);
- }
-
- /** Returns the underlying container which stores the sequence of values of a column to read. */
- public ValueVector getValueVector() {
- return valueVector;
- }
-
- /** Sets the field value as the specified value. */
- public abstract OUT read(int index);
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BigIntFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BigIntFieldReader.java
deleted file mode 100644
index ef786db..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BigIntFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.BigIntVector;
-
-/** {@link ArrowFieldReader} for BigInt. */
-@Internal
-public final class BigIntFieldReader extends ArrowFieldReader<Long> {
-
- public BigIntFieldReader(BigIntVector bigIntVector) {
- super(bigIntVector);
- }
-
- @Override
- public Long read(int index) {
- return ((BigIntVector) getValueVector()).getObject(index);
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BooleanFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BooleanFieldReader.java
deleted file mode 100644
index 17ca9aa..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/BooleanFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.BitVector;
-
-/** {@link ArrowFieldReader} for Boolean. */
-@Internal
-public final class BooleanFieldReader extends ArrowFieldReader<Boolean> {
-
- public BooleanFieldReader(BitVector bitVector) {
- super(bitVector);
- }
-
- @Override
- public Boolean read(int index) {
- return ((BitVector) getValueVector()).getObject(index);
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DateFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DateFieldReader.java
deleted file mode 100644
index 5eeb614..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DateFieldReader.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
-
-import org.apache.arrow.vector.DateDayVector;
-
-import java.sql.Date;
-
-/** {@link ArrowFieldReader} for Date. */
-@Internal
-public final class DateFieldReader extends ArrowFieldReader<Date> {
-
- public DateFieldReader(DateDayVector dateDayVector) {
- super(dateDayVector);
- }
-
- @Override
- public Date read(int index) {
- if (getValueVector().isNull(index)) {
- return null;
- } else {
- return SqlDateTimeUtils.internalToDate(((DateDayVector) getValueVector()).get(index));
- }
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DecimalFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DecimalFieldReader.java
deleted file mode 100644
index dc6744b..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DecimalFieldReader.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.DecimalVector;
-
-import java.math.BigDecimal;
-
-/** {@link ArrowFieldReader} for Decimal. */
-@Internal
-public final class DecimalFieldReader extends ArrowFieldReader<BigDecimal> {
-
- public DecimalFieldReader(DecimalVector decimalVector) {
- super(decimalVector);
- }
-
- @Override
- public BigDecimal read(int index) {
- return ((DecimalVector) getValueVector()).getObject(index);
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DoubleFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DoubleFieldReader.java
deleted file mode 100644
index c7d7573..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/DoubleFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.Float8Vector;
-
-/** {@link ArrowFieldReader} for Double. */
-@Internal
-public final class DoubleFieldReader extends ArrowFieldReader<Double> {
-
- public DoubleFieldReader(Float8Vector doubleVector) {
- super(doubleVector);
- }
-
- @Override
- public Double read(int index) {
- return ((Float8Vector) getValueVector()).getObject(index);
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/FloatFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/FloatFieldReader.java
deleted file mode 100644
index 3276074..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/FloatFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.Float4Vector;
-
-/** {@link ArrowFieldReader} for Float. */
-@Internal
-public final class FloatFieldReader extends ArrowFieldReader<Float> {
-
- public FloatFieldReader(Float4Vector floatVector) {
- super(floatVector);
- }
-
- @Override
- public Float read(int index) {
- return ((Float4Vector) getValueVector()).getObject(index);
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/IntFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/IntFieldReader.java
deleted file mode 100644
index 4e9a605..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/IntFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.IntVector;
-
-/** {@link ArrowFieldReader} for Int. */
-@Internal
-public final class IntFieldReader extends ArrowFieldReader<Integer> {
-
- public IntFieldReader(IntVector intVector) {
- super(intVector);
- }
-
- @Override
- public Integer read(int index) {
- return ((IntVector) getValueVector()).getObject(index);
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowArrowReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowArrowReader.java
deleted file mode 100644
index 2d1403d..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowArrowReader.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.runtime.arrow.ArrowReader;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-/** {@link ArrowReader} which read the underlying Arrow format data as {@link Row}. */
-@Internal
-public final class RowArrowReader implements ArrowReader<Row> {
-
- /**
- * An array of readers which are responsible for the deserialization of each column of the rows.
- */
- private final ArrowFieldReader[] fieldReaders;
-
- /** Reusable row used to hold the deserialized result. */
- private final Row reuseRow;
-
- public RowArrowReader(ArrowFieldReader[] fieldReaders) {
- this.fieldReaders = Preconditions.checkNotNull(fieldReaders);
- this.reuseRow = new Row(fieldReaders.length);
- }
-
- /** Gets the field readers. */
- public ArrowFieldReader[] getFieldReaders() {
- return fieldReaders;
- }
-
- @Override
- public Row read(int rowId) {
- for (int i = 0; i < fieldReaders.length; i++) {
- reuseRow.setField(i, fieldReaders[i].read(rowId));
- }
- return reuseRow;
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowFieldReader.java
deleted file mode 100644
index 673bd0a..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/RowFieldReader.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.arrow.vector.complex.StructVector;
-
-/** {@link ArrowFieldReader} for Row. */
-@Internal
-public final class RowFieldReader extends ArrowFieldReader<Row> {
-
- private final ArrowFieldReader[] fieldReaders;
-
- public RowFieldReader(StructVector structVector, ArrowFieldReader[] fieldReaders) {
- super(structVector);
- this.fieldReaders = Preconditions.checkNotNull(fieldReaders);
- }
-
- @Override
- public Row read(int index) {
- if (getValueVector().isNull(index)) {
- return null;
- } else {
- Row row = new Row(fieldReaders.length);
- for (int i = 0; i < fieldReaders.length; i++) {
- row.setField(i, fieldReaders[i].read(index));
- }
- return row;
- }
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/SmallIntFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/SmallIntFieldReader.java
deleted file mode 100644
index 160dc51..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/SmallIntFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.SmallIntVector;
-
-/** {@link ArrowFieldReader} for SmallInt. */
-@Internal
-public final class SmallIntFieldReader extends ArrowFieldReader<Short> {
-
- public SmallIntFieldReader(SmallIntVector smallIntVector) {
- super(smallIntVector);
- }
-
- @Override
- public Short read(int index) {
- return ((SmallIntVector) getValueVector()).getObject(index);
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimeFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimeFieldReader.java
deleted file mode 100644
index d29b737..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimeFieldReader.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.arrow.vector.TimeMicroVector;
-import org.apache.arrow.vector.TimeMilliVector;
-import org.apache.arrow.vector.TimeNanoVector;
-import org.apache.arrow.vector.TimeSecVector;
-import org.apache.arrow.vector.ValueVector;
-
-import java.sql.Time;
-import java.util.TimeZone;
-
-/** {@link ArrowFieldReader} for Time. */
-@Internal
-public final class TimeFieldReader extends ArrowFieldReader<Time> {
-
- // The local time zone.
- private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
-
- public TimeFieldReader(ValueVector valueVector) {
- super(valueVector);
- Preconditions.checkState(
- valueVector instanceof TimeSecVector
- || valueVector instanceof TimeMilliVector
- || valueVector instanceof TimeMicroVector
- || valueVector instanceof TimeNanoVector);
- }
-
- @Override
- public Time read(int index) {
- ValueVector valueVector = getValueVector();
- if (valueVector.isNull(index)) {
- return null;
- } else {
- long timeMilli;
- if (valueVector instanceof TimeSecVector) {
- timeMilli = ((TimeSecVector) getValueVector()).get(index) * 1000;
- } else if (valueVector instanceof TimeMilliVector) {
- timeMilli = ((TimeMilliVector) getValueVector()).get(index);
- } else if (valueVector instanceof TimeMicroVector) {
- timeMilli = ((TimeMicroVector) getValueVector()).get(index) / 1000;
- } else {
- timeMilli = ((TimeNanoVector) getValueVector()).get(index) / 1000000;
- }
- return new Time(timeMilli - LOCAL_TZ.getOffset(timeMilli));
- }
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimestampFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimestampFieldReader.java
deleted file mode 100644
index 8dc29c7..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TimestampFieldReader.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.arrow.vector.TimeStampMicroVector;
-import org.apache.arrow.vector.TimeStampMilliVector;
-import org.apache.arrow.vector.TimeStampNanoVector;
-import org.apache.arrow.vector.TimeStampSecVector;
-import org.apache.arrow.vector.TimeStampVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.types.pojo.ArrowType;
-
-import java.sql.Timestamp;
-
-/** {@link ArrowFieldReader} for Timestamp. */
-@Internal
-public final class TimestampFieldReader extends ArrowFieldReader<Timestamp> {
-
- public TimestampFieldReader(ValueVector valueVector) {
- super(valueVector);
- Preconditions.checkState(
- valueVector instanceof TimeStampVector
- && ((ArrowType.Timestamp) valueVector.getField().getType()).getTimezone()
- == null);
- }
-
- @Override
- public Timestamp read(int i) {
- ValueVector valueVector = getValueVector();
- if (valueVector.isNull(i)) {
- return null;
- } else {
- long millisecond;
- if (valueVector instanceof TimeStampSecVector) {
- millisecond = ((TimeStampSecVector) valueVector).get(i) * 1000;
- } else if (valueVector instanceof TimeStampMilliVector) {
- millisecond = ((TimeStampMilliVector) valueVector).get(i);
- } else if (valueVector instanceof TimeStampMicroVector) {
- millisecond = ((TimeStampMicroVector) valueVector).get(i) / 1000;
- } else {
- millisecond = ((TimeStampNanoVector) valueVector).get(i) / 1_000_000;
- }
- return PythonTypeUtils.internalToTimestamp(millisecond);
- }
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TinyIntFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TinyIntFieldReader.java
deleted file mode 100644
index f27f936..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/TinyIntFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.TinyIntVector;
-
-/** {@link ArrowFieldReader} for TinyInt. */
-@Internal
-public final class TinyIntFieldReader extends ArrowFieldReader<Byte> {
-
- public TinyIntFieldReader(TinyIntVector tinyIntVector) {
- super(tinyIntVector);
- }
-
- @Override
- public Byte read(int index) {
- return ((TinyIntVector) getValueVector()).getObject(index);
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarBinaryFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarBinaryFieldReader.java
deleted file mode 100644
index 354d031..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarBinaryFieldReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.arrow.vector.VarBinaryVector;
-
-/** {@link ArrowFieldReader} for VarBinary. */
-@Internal
-public final class VarBinaryFieldReader extends ArrowFieldReader<byte[]> {
-
- public VarBinaryFieldReader(VarBinaryVector varBinaryVector) {
- super(varBinaryVector);
- }
-
- @Override
- public byte[] read(int index) {
- return ((VarBinaryVector) getValueVector()).get(index);
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarCharFieldReader.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarCharFieldReader.java
deleted file mode 100644
index d69b546..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/readers/VarCharFieldReader.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.readers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.runtime.util.StringUtf8Utils;
-
-import org.apache.arrow.vector.VarCharVector;
-
-/** {@link ArrowFieldReader} for VarChar. */
-@Internal
-public final class VarCharFieldReader extends ArrowFieldReader<String> {
-
- public VarCharFieldReader(VarCharVector varCharVector) {
- super(varCharVector);
- }
-
- @Override
- public String read(int index) {
- if (getValueVector().isNull(index)) {
- return null;
- } else {
- byte[] bytes = ((VarCharVector) getValueVector()).get(index);
- return StringUtf8Utils.decodeUTF8(bytes, 0, bytes.length);
- }
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/RowArrowSerializer.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/RowArrowSerializer.java
deleted file mode 100644
index d5527c7..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/RowArrowSerializer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.serializers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.runtime.arrow.ArrowReader;
-import org.apache.flink.table.runtime.arrow.ArrowUtils;
-import org.apache.flink.table.runtime.arrow.ArrowWriter;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import org.apache.arrow.vector.VectorSchemaRoot;
-
-/** It takes {@link Row} as the input type. */
-@Internal
-public class RowArrowSerializer extends ArrowSerializer<Row> {
- public RowArrowSerializer(RowType inputType, RowType outputType) {
- super(inputType, outputType);
- }
-
- @Override
- public ArrowWriter<Row> createArrowWriter() {
- return ArrowUtils.createRowArrowWriter(rootWriter, inputType);
- }
-
- @Override
- public ArrowReader<Row> createArrowReader(VectorSchemaRoot root) {
- return ArrowUtils.createRowArrowReader(root, outputType);
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunction.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunction.java
deleted file mode 100644
index 39cefc2..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunction.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.sources;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.table.runtime.arrow.ArrowReader;
-import org.apache.flink.table.runtime.arrow.ArrowUtils;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-
-import org.apache.arrow.vector.VectorSchemaRoot;
-
-/** An Arrow {@link SourceFunction} which takes {@link Row} as the type of the produced records. */
-@Internal
-public class RowArrowSourceFunction extends AbstractArrowSourceFunction<Row> {
-
- private static final long serialVersionUID = 1L;
-
- RowArrowSourceFunction(DataType dataType, byte[][] arrowData) {
- super(dataType, arrowData);
- }
-
- @Override
- ArrowReader<Row> createArrowReader(VectorSchemaRoot root) {
- return ArrowUtils.createRowArrowReader(root, (RowType) dataType.getLogicalType());
- }
-
- @Override
- public TypeInformation<Row> getProducedType() {
- return (TypeInformation<Row>) TypeConversions.fromDataTypeToLegacyInfo(dataType);
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowTableSource.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowTableSource.java
deleted file mode 100644
index 463f69b..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/RowArrowTableSource.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.sources;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.types.Row;
-
-/** An Arrow TableSource which takes {@link Row} as the type of the produced records. */
-@Internal
-public class RowArrowTableSource extends AbstractArrowTableSource<Row> {
- public RowArrowTableSource(DataType dataType, byte[][] arrowData) {
- super(dataType, arrowData);
- }
-
- @Override
- public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
- return execEnv.addSource(new RowArrowSourceFunction(dataType, arrowData));
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
index 67bebc2..a5259e1 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
@@ -24,25 +24,17 @@ import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
import org.apache.flink.api.common.typeutils.base.ByteSerializer;
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
-import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.ShortSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
import org.apache.flink.table.runtime.typeutils.serializers.python.ArrayDataSerializer;
-import org.apache.flink.table.runtime.typeutils.serializers.python.BigDecSerializer;
-import org.apache.flink.table.runtime.typeutils.serializers.python.DateSerializer;
import org.apache.flink.table.runtime.typeutils.serializers.python.DecimalDataSerializer;
import org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer;
import org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer;
-import org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer;
-import org.apache.flink.table.runtime.typeutils.serializers.python.TimeSerializer;
-import org.apache.flink.table.runtime.typeutils.serializers.python.TimestampSerializer;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
@@ -68,12 +60,8 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
import org.apache.beam.model.pipeline.v1.RunnerApi;
-import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.math.RoundingMode;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
import java.util.TimeZone;
/**
@@ -113,10 +101,6 @@ public final class PythonTypeUtils {
return logicalType.accept(new PythonTypeUtils.LogicalTypeToProtoTypeConverter());
}
- public static TypeSerializer toFlinkTypeSerializer(LogicalType logicalType) {
- return logicalType.accept(new LogicalTypeToTypeSerializerConverter());
- }
-
public static TypeSerializer toBlinkTypeSerializer(LogicalType logicalType) {
return logicalType.accept(new LogicalTypeToBlinkTypeSerializerConverter());
}
@@ -163,16 +147,6 @@ public final class PythonTypeUtils {
}
/**
- * Converts the internal representation of a SQL TIMESTAMP (long) to the Java type used for UDF
- * parameters ({@link java.sql.Timestamp}).
- *
- * <p>Note: The implementation refers to {@link SqlDateTimeUtils#internalToTimestamp}.
- */
- public static java.sql.Timestamp internalToTimestamp(long v) {
- return new java.sql.Timestamp(v - LOCAL_TZ.getOffset(v));
- }
-
- /**
* Converts the Java type used for UDF parameters of SQL TIMESTAMP type ({@link
* java.sql.Timestamp}) to internal representation (long).
*
@@ -183,43 +157,7 @@ public final class PythonTypeUtils {
return time + LOCAL_TZ.getOffset(time);
}
- /** Convert LogicalType to conversion class for flink planner. */
- public static class LogicalTypeToConversionClassConverter
- extends LogicalTypeDefaultVisitor<Class> {
-
- public static final LogicalTypeToConversionClassConverter INSTANCE =
- new LogicalTypeToConversionClassConverter();
-
- private LogicalTypeToConversionClassConverter() {}
-
- @Override
- public Class visit(DateType dateType) {
- return Date.class;
- }
-
- @Override
- public Class visit(TimeType timeType) {
- return Time.class;
- }
-
- @Override
- public Class visit(TimestampType timestampType) {
- return Timestamp.class;
- }
-
- @Override
- public Class visit(ArrayType arrayType) {
- Class elementClass = arrayType.getElementType().accept(this);
- return Array.newInstance(elementClass, 0).getClass();
- }
-
- @Override
- protected Class defaultMethod(LogicalType logicalType) {
- return logicalType.getDefaultConversion();
- }
- }
-
- private static class LogicalTypeToTypeSerializerConverter
+ private static class LogicalTypeToBlinkTypeSerializerConverter
extends LogicalTypeDefaultVisitor<TypeSerializer> {
@Override
public TypeSerializer visit(BooleanType booleanType) {
@@ -267,78 +205,6 @@ public final class PythonTypeUtils {
}
@Override
- public TypeSerializer visit(VarCharType varCharType) {
- return StringSerializer.INSTANCE;
- }
-
- @Override
- public TypeSerializer visit(CharType charType) {
- return StringSerializer.INSTANCE;
- }
-
- @Override
- public TypeSerializer visit(DateType dateType) {
- return DateSerializer.INSTANCE;
- }
-
- @Override
- public TypeSerializer visit(TimeType timeType) {
- return TimeSerializer.INSTANCE;
- }
-
- @Override
- public TypeSerializer visit(TimestampType timestampType) {
- return new TimestampSerializer(timestampType.getPrecision());
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public TypeSerializer visit(ArrayType arrayType) {
- LogicalType elementType = arrayType.getElementType();
- TypeSerializer<?> elementTypeSerializer = elementType.accept(this);
- Class<?> elementClass =
- elementType.accept(LogicalTypeToConversionClassConverter.INSTANCE);
- return new GenericArraySerializer(elementClass, elementTypeSerializer);
- }
-
- @Override
- public TypeSerializer visit(MapType mapType) {
- TypeSerializer<?> keyTypeSerializer = mapType.getKeyType().accept(this);
- TypeSerializer<?> valueTypeSerializer = mapType.getValueType().accept(this);
- return new MapSerializer<>(keyTypeSerializer, valueTypeSerializer);
- }
-
- @Override
- public TypeSerializer visit(RowType rowType) {
- final TypeSerializer[] fieldTypeSerializers =
- rowType.getFields().stream()
- .map(f -> f.getType().accept(this))
- .toArray(TypeSerializer[]::new);
- return new RowSerializer(fieldTypeSerializers);
- }
-
- @Override
- protected TypeSerializer defaultMethod(LogicalType logicalType) {
- if (logicalType instanceof LegacyTypeInformationType) {
- Class<?> typeClass =
- ((LegacyTypeInformationType) logicalType)
- .getTypeInformation()
- .getTypeClass();
- if (typeClass == BigDecimal.class) {
- return BigDecSerializer.INSTANCE;
- }
- }
- throw new UnsupportedOperationException(
- String.format(
- "Python UDF doesn't support logical type %s currently.",
- logicalType.asSummaryString()));
- }
- }
-
- private static class LogicalTypeToBlinkTypeSerializerConverter
- extends LogicalTypeToTypeSerializerConverter {
-
- @Override
public TypeSerializer visit(RowType rowType) {
final TypeSerializer[] fieldTypeSerializers =
rowType.getFields().stream()
@@ -398,6 +264,14 @@ public final class PythonTypeUtils {
public TypeSerializer visit(DecimalType decimalType) {
return new DecimalDataSerializer(decimalType.getPrecision(), decimalType.getScale());
}
+
+ @Override
+ protected TypeSerializer defaultMethod(LogicalType logicalType) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Python UDF doesn't support logical type %s currently.",
+ logicalType.asSummaryString()));
+ }
}
/** Converter That convert the logicalType to the related Prototype. */
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java
index 13a8d4a..33258c2 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/ArrowUtilsTest.java
@@ -18,27 +18,10 @@
package org.apache.flink.table.runtime.arrow;
-import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.runtime.arrow.readers.ArrayFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.ArrowFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.BooleanFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DateFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DecimalFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.DoubleFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.FloatFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.IntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.RowArrowReader;
-import org.apache.flink.table.runtime.arrow.readers.RowFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.SmallIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TimeFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TimestampFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.TinyIntFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.VarBinaryFieldReader;
-import org.apache.flink.table.runtime.arrow.readers.VarCharFieldReader;
import org.apache.flink.table.runtime.arrow.vectors.ArrowArrayColumnVector;
import org.apache.flink.table.runtime.arrow.vectors.ArrowBigIntColumnVector;
import org.apache.flink.table.runtime.arrow.vectors.ArrowBooleanColumnVector;
@@ -103,7 +86,6 @@ import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.types.Row;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -132,8 +114,7 @@ import static org.junit.Assert.assertEquals;
/** Tests for {@link ArrowUtils}. */
public class ArrowUtilsTest {
- private static List<
- Tuple7<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>, Class<?>>>
+ private static List<Tuple6<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>>>
testFields;
private static RowType rowType;
private static BufferAllocator allocator;
@@ -142,243 +123,219 @@ public class ArrowUtilsTest {
public static void init() {
testFields = new ArrayList<>();
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f1",
new TinyIntType(),
new ArrowType.Int(8, true),
RowTinyIntWriter.class,
TinyIntWriter.TinyIntWriterForRow.class,
- TinyIntFieldReader.class,
ArrowTinyIntColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f2",
new SmallIntType(),
new ArrowType.Int(8 * 2, true),
RowSmallIntWriter.class,
SmallIntWriter.SmallIntWriterForRow.class,
- SmallIntFieldReader.class,
ArrowSmallIntColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f3",
new IntType(),
new ArrowType.Int(8 * 4, true),
RowIntWriter.class,
IntWriter.IntWriterForRow.class,
- IntFieldReader.class,
ArrowIntColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f4",
new BigIntType(),
new ArrowType.Int(8 * 8, true),
RowBigIntWriter.class,
BigIntWriter.BigIntWriterForRow.class,
- BigIntFieldReader.class,
ArrowBigIntColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f5",
new BooleanType(),
new ArrowType.Bool(),
RowBooleanWriter.class,
BooleanWriter.BooleanWriterForRow.class,
- BooleanFieldReader.class,
ArrowBooleanColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f6",
new FloatType(),
new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE),
RowFloatWriter.class,
FloatWriter.FloatWriterForRow.class,
- FloatFieldReader.class,
ArrowFloatColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f7",
new DoubleType(),
new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE),
RowDoubleWriter.class,
DoubleWriter.DoubleWriterForRow.class,
- DoubleFieldReader.class,
ArrowDoubleColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f8",
new VarCharType(),
ArrowType.Utf8.INSTANCE,
RowVarCharWriter.class,
VarCharWriter.VarCharWriterForRow.class,
- VarCharFieldReader.class,
ArrowVarCharColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f9",
new VarBinaryType(),
ArrowType.Binary.INSTANCE,
RowVarBinaryWriter.class,
VarBinaryWriter.VarBinaryWriterForRow.class,
- VarBinaryFieldReader.class,
ArrowVarBinaryColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f10",
new DecimalType(10, 3),
new ArrowType.Decimal(10, 3),
RowDecimalWriter.class,
DecimalWriter.DecimalWriterForRow.class,
- DecimalFieldReader.class,
ArrowDecimalColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f11",
new DateType(),
new ArrowType.Date(DateUnit.DAY),
RowDateWriter.class,
DateWriter.DateWriterForRow.class,
- DateFieldReader.class,
ArrowDateColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f13",
new TimeType(0),
new ArrowType.Time(TimeUnit.SECOND, 32),
RowTimeWriter.class,
TimeWriter.TimeWriterForRow.class,
- TimeFieldReader.class,
ArrowTimeColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f14",
new TimeType(2),
new ArrowType.Time(TimeUnit.MILLISECOND, 32),
RowTimeWriter.class,
TimeWriter.TimeWriterForRow.class,
- TimeFieldReader.class,
ArrowTimeColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f15",
new TimeType(4),
new ArrowType.Time(TimeUnit.MICROSECOND, 64),
RowTimeWriter.class,
TimeWriter.TimeWriterForRow.class,
- TimeFieldReader.class,
ArrowTimeColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f16",
new TimeType(8),
new ArrowType.Time(TimeUnit.NANOSECOND, 64),
RowTimeWriter.class,
TimeWriter.TimeWriterForRow.class,
- TimeFieldReader.class,
ArrowTimeColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f17",
new LocalZonedTimestampType(0),
new ArrowType.Timestamp(TimeUnit.SECOND, null),
RowTimestampWriter.class,
TimestampWriter.TimestampWriterForRow.class,
- TimestampFieldReader.class,
ArrowTimestampColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f18",
new LocalZonedTimestampType(2),
new ArrowType.Timestamp(TimeUnit.MILLISECOND, null),
RowTimestampWriter.class,
TimestampWriter.TimestampWriterForRow.class,
- TimestampFieldReader.class,
ArrowTimestampColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f19",
new LocalZonedTimestampType(4),
new ArrowType.Timestamp(TimeUnit.MICROSECOND, null),
RowTimestampWriter.class,
TimestampWriter.TimestampWriterForRow.class,
- TimestampFieldReader.class,
ArrowTimestampColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f20",
new LocalZonedTimestampType(8),
new ArrowType.Timestamp(TimeUnit.NANOSECOND, null),
RowTimestampWriter.class,
TimestampWriter.TimestampWriterForRow.class,
- TimestampFieldReader.class,
ArrowTimestampColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f21",
new TimestampType(0),
new ArrowType.Timestamp(TimeUnit.SECOND, null),
RowTimestampWriter.class,
TimestampWriter.TimestampWriterForRow.class,
- TimestampFieldReader.class,
ArrowTimestampColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f22",
new TimestampType(2),
new ArrowType.Timestamp(TimeUnit.MILLISECOND, null),
RowTimestampWriter.class,
TimestampWriter.TimestampWriterForRow.class,
- TimestampFieldReader.class,
ArrowTimestampColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f23",
new TimestampType(4),
new ArrowType.Timestamp(TimeUnit.MICROSECOND, null),
RowTimestampWriter.class,
TimestampWriter.TimestampWriterForRow.class,
- TimestampFieldReader.class,
ArrowTimestampColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f24",
new TimestampType(8),
new ArrowType.Timestamp(TimeUnit.NANOSECOND, null),
RowTimestampWriter.class,
TimestampWriter.TimestampWriterForRow.class,
- TimestampFieldReader.class,
ArrowTimestampColumnVector.class));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f25",
new ArrayType(new VarCharType()),
ArrowType.List.INSTANCE,
RowArrayWriter.class,
ArrayWriter.ArrayWriterForRow.class,
- ArrayFieldReader.class,
ArrowArrayColumnVector.class));
RowType rowFieldType =
@@ -396,17 +353,16 @@ public class ArrowUtilsTest {
new RowType.RowField(
"e2", new VarCharType())))))));
testFields.add(
- Tuple7.of(
+ Tuple6.of(
"f26",
rowFieldType,
ArrowType.Struct.INSTANCE,
RowRowWriter.class,
RowWriter.RowWriterForRow.class,
- RowFieldReader.class,
ArrowRowColumnVector.class));
List<RowType.RowField> rowFields = new ArrayList<>();
- for (Tuple7<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>, Class<?>> field :
+ for (Tuple6<String, LogicalType, ArrowType, Class<?>, Class<?>, Class<?>> field :
testFields) {
rowFields.add(new RowType.RowField(field.f0, field.f1));
}
@@ -429,35 +385,13 @@ public class ArrowUtilsTest {
}
@Test
- public void testCreateRowArrowReader() {
- VectorSchemaRoot root =
- VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
- RowArrowReader reader = ArrowUtils.createRowArrowReader(root, rowType);
- ArrowFieldReader[] fieldReaders = reader.getFieldReaders();
- for (int i = 0; i < fieldReaders.length; i++) {
- assertEquals(testFields.get(i).f5, fieldReaders[i].getClass());
- }
- }
-
- @Test
public void testCreateRowDataArrowReader() {
VectorSchemaRoot root =
VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
RowDataArrowReader reader = ArrowUtils.createRowDataArrowReader(root, rowType);
ColumnVector[] columnVectors = reader.getColumnVectors();
for (int i = 0; i < columnVectors.length; i++) {
- assertEquals(testFields.get(i).f6, columnVectors[i].getClass());
- }
- }
-
- @Test
- public void testCreateRowArrowWriter() {
- VectorSchemaRoot root =
- VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
- ArrowWriter<Row> writer = ArrowUtils.createRowArrowWriter(root, rowType);
- ArrowFieldWriter<Row>[] fieldWriters = writer.getFieldWriters();
- for (int i = 0; i < fieldWriters.length; i++) {
- assertEquals(testFields.get(i).f3, fieldWriters[i].getClass());
+ assertEquals(testFields.get(i).f5, columnVectors[i].getClass());
}
}
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/RowArrowReaderWriterTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/RowArrowReaderWriterTest.java
deleted file mode 100644
index 22c9e1b..0000000
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/RowArrowReaderWriterTest.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
-import org.apache.flink.table.types.logical.ArrayType;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.BooleanType;
-import org.apache.flink.table.types.logical.DateType;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.DoubleType;
-import org.apache.flink.table.types.logical.FloatType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.SmallIntType;
-import org.apache.flink.table.types.logical.TimeType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.TinyIntType;
-import org.apache.flink.table.types.logical.VarBinaryType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.types.Row;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.ipc.ArrowStreamWriter;
-import org.junit.BeforeClass;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.math.BigDecimal;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/** Tests for {@link ArrowReader} and {@link ArrowWriter} of Row. */
-public class RowArrowReaderWriterTest extends ArrowReaderWriterTestBase<Row> {
- private static RowType rowType;
- private static BufferAllocator allocator;
-
- @BeforeClass
- public static void init() {
- List<LogicalType> fieldTypes = new ArrayList<>();
- fieldTypes.add(new TinyIntType());
- fieldTypes.add(new SmallIntType());
- fieldTypes.add(new IntType());
- fieldTypes.add(new BigIntType());
- fieldTypes.add(new BooleanType());
- fieldTypes.add(new FloatType());
- fieldTypes.add(new DoubleType());
- fieldTypes.add(new VarCharType());
- fieldTypes.add(new VarBinaryType());
- fieldTypes.add(new DecimalType(10, 0));
- fieldTypes.add(new DateType());
- fieldTypes.add(new TimeType(0));
- fieldTypes.add(new TimeType(2));
- fieldTypes.add(new TimeType(4));
- fieldTypes.add(new TimeType(8));
- fieldTypes.add(new LocalZonedTimestampType(0));
- fieldTypes.add(new LocalZonedTimestampType(2));
- fieldTypes.add(new LocalZonedTimestampType(4));
- fieldTypes.add(new LocalZonedTimestampType(8));
- fieldTypes.add(new TimestampType(0));
- fieldTypes.add(new TimestampType(2));
- fieldTypes.add(new TimestampType(4));
- fieldTypes.add(new TimestampType(8));
- fieldTypes.add(new ArrayType(new VarCharType()));
- fieldTypes.add(
- new RowType(
- Arrays.asList(
- new RowType.RowField("a", new IntType()),
- new RowType.RowField("b", new VarCharType()),
- new RowType.RowField("c", new ArrayType(new VarCharType())),
- new RowType.RowField("d", new TimestampType(2)),
- new RowType.RowField(
- "e",
- new RowType(
- Arrays.asList(
- new RowType.RowField("e1", new IntType()),
- new RowType.RowField(
- "e2", new VarCharType())))))));
-
- List<RowType.RowField> rowFields = new ArrayList<>();
- for (int i = 0; i < fieldTypes.size(); i++) {
- rowFields.add(new RowType.RowField("f" + i, fieldTypes.get(i)));
- }
- rowType = new RowType(rowFields);
- allocator = ArrowUtils.getRootAllocator().newChildAllocator("stdout", 0, Long.MAX_VALUE);
- }
-
- @Override
- public ArrowReader<Row> createArrowReader(InputStream inputStream) throws IOException {
- ArrowStreamReader reader = new ArrowStreamReader(inputStream, allocator);
- reader.loadNextBatch();
- return ArrowUtils.createRowArrowReader(reader.getVectorSchemaRoot(), rowType);
- }
-
- @Override
- public Tuple2<ArrowWriter<Row>, ArrowStreamWriter> createArrowWriter(OutputStream outputStream)
- throws IOException {
- VectorSchemaRoot root =
- VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
- ArrowWriter<Row> arrowWriter = ArrowUtils.createRowArrowWriter(root, rowType);
- ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(root, null, outputStream);
- arrowStreamWriter.start();
- return Tuple2.of(arrowWriter, arrowStreamWriter);
- }
-
- @Override
- public Row[] getTestData() {
- Row row1 =
- Row.of(
- (byte) 1,
- (short) 2,
- 3,
- 4L,
- true,
- 1.0f,
- 1.0,
- "hello",
- "hello".getBytes(),
- new BigDecimal(1),
- SqlDateTimeUtils.internalToDate(100),
- SqlDateTimeUtils.internalToTime(3600000),
- SqlDateTimeUtils.internalToTime(3600000),
- SqlDateTimeUtils.internalToTime(3600000),
- SqlDateTimeUtils.internalToTime(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new String[] {null, null, null},
- Row.of(
- 1,
- "hello",
- new String[] {null, null, null},
- new Timestamp(3600000),
- Row.of(1, "hello")));
- Row row2 =
- Row.of(
- null,
- (short) 2,
- 3,
- 4L,
- false,
- 1.0f,
- 1.0,
- "中文",
- "中文".getBytes(),
- new BigDecimal(1),
- SqlDateTimeUtils.internalToDate(100),
- SqlDateTimeUtils.internalToTime(3600000),
- SqlDateTimeUtils.internalToTime(3600000),
- SqlDateTimeUtils.internalToTime(3600000),
- SqlDateTimeUtils.internalToTime(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new String[] {"hello", "中文", null},
- Row.of(
- 1,
- "hello",
- new String[] {"hello", "中文", null},
- new Timestamp(3600000),
- Row.of(1, "hello")));
- Row row3 =
- Row.of(
- (byte) 1,
- null,
- 3,
- 4L,
- true,
- 1.0f,
- 1.0,
- "hello",
- "hello".getBytes(),
- new BigDecimal(1),
- SqlDateTimeUtils.internalToDate(100),
- SqlDateTimeUtils.internalToTime(3600000),
- SqlDateTimeUtils.internalToTime(3600000),
- SqlDateTimeUtils.internalToTime(3600000),
- SqlDateTimeUtils.internalToTime(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- new Timestamp(3600000),
- null,
- null);
- Row row4 = new Row(rowType.getFieldCount());
- return new Row[] {row1, row2, row3, row4};
- }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java
deleted file mode 100644
index 329bcec..0000000
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.arrow.sources;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
-import org.apache.flink.table.runtime.arrow.ArrowUtils;
-import org.apache.flink.table.runtime.arrow.ArrowWriter;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.junit.BeforeClass;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-
-/** Tests for {@link RowArrowSourceFunction}. */
-public class RowArrowSourceFunctionTest extends ArrowSourceFunctionTestBase<Row> {
-
- private static List<LogicalType> fieldTypes = new ArrayList<>();
- private static RowType rowType;
- private static DataType dataType;
- private static BufferAllocator allocator;
-
- public RowArrowSourceFunctionTest() {
- super(
- VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator),
- new RowSerializer(new TypeSerializer[] {StringSerializer.INSTANCE}),
- Comparator.comparing(o -> (String) (o.getField(0))));
- }
-
- @BeforeClass
- public static void init() {
- fieldTypes.add(new VarCharType());
- List<RowType.RowField> rowFields = new ArrayList<>();
- for (int i = 0; i < fieldTypes.size(); i++) {
- rowFields.add(new RowType.RowField("f" + i, fieldTypes.get(i)));
- }
- rowType = new RowType(rowFields);
- dataType = TypeConversions.fromLogicalToDataType(rowType);
- allocator = ArrowUtils.getRootAllocator().newChildAllocator("stdout", 0, Long.MAX_VALUE);
- }
-
- @Override
- public Tuple2<List<Row>, Integer> getTestData() {
- return Tuple2.of(
- Arrays.asList(
- Row.of("aaa"), Row.of("bbb"), Row.of("ccc"), Row.of("ddd"), Row.of("eee")),
- 3);
- }
-
- @Override
- public ArrowWriter<Row> createArrowWriter() {
- return ArrowUtils.createRowArrowWriter(root, rowType);
- }
-
- @Override
- public AbstractArrowSourceFunction<Row> createArrowSourceFunction(byte[][] arrowData) {
- return new RowArrowSourceFunction(dataType, arrowData);
- }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java
index 8e0b2ec..ab2f1c1 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtilsTest.java
@@ -19,12 +19,9 @@
package org.apache.flink.table.runtime.typeutils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
-import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
@@ -32,7 +29,6 @@ import org.apache.flink.util.ExceptionUtils;
import org.junit.Test;
-import java.sql.Date;
import java.util.ArrayList;
import java.util.List;
@@ -43,17 +39,6 @@ import static org.junit.Assert.assertTrue;
public class PythonTypeUtilsTest {
@Test
- public void testLogicalTypeToFlinkTypeSerializer() {
- List<RowType.RowField> rowFields = new ArrayList<>();
- rowFields.add(new RowType.RowField("f1", new BigIntType()));
- RowType rowType = new RowType(rowFields);
- TypeSerializer rowSerializer = PythonTypeUtils.toFlinkTypeSerializer(rowType);
- assertTrue(rowSerializer instanceof RowSerializer);
-
- assertEquals(1, ((RowSerializer) rowSerializer).getArity());
- }
-
- @Test
public void testLogicalTypeToBlinkTypeSerializer() {
List<RowType.RowField> rowFields = new ArrayList<>();
rowFields.add(new RowType.RowField("f1", new BigIntType()));
@@ -85,19 +70,10 @@ public class PythonTypeUtilsTest {
String expectedTestException =
"Python UDF doesn't support logical type `cat`.`db`.`MyType` currently.";
try {
- PythonTypeUtils.toFlinkTypeSerializer(logicalType);
+ PythonTypeUtils.toBlinkTypeSerializer(logicalType);
} catch (Exception e) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(e, expectedTestException).isPresent());
}
}
-
- @Test
- public void testLogicalTypeToConversionClassConverter() {
- PythonTypeUtils.LogicalTypeToConversionClassConverter converter =
- PythonTypeUtils.LogicalTypeToConversionClassConverter.INSTANCE;
- ArrayType arrayType = new ArrayType(new ArrayType(new DateType()));
- Class<?> conversionClass = converter.visit(arrayType);
- assertEquals(Date[][].class, conversionClass);
- }
}