You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/09/20 12:29:53 UTC

[incubator-seatunnel] branch dev updated: [Improve][Connector-V2] Imporve orc read strategy (#2747)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new af34beda3 [Improve][Connector-V2] Imporve orc read strategy (#2747)
af34beda3 is described below

commit af34beda376e49ffa376bfb07bf2222241557638
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Tue Sep 20 20:29:47 2022 +0800

    [Improve][Connector-V2] Imporve orc read strategy (#2747)
    
    * [Improve][Connector-V2] Imporve read orc
---
 .../file/source/reader/OrcReadStrategy.java        | 217 ++++++++++++++++-----
 .../seatunnel/file/writer/OrcReadStrategyTest.java |  58 ++++++
 .../src/test/resources/test.orc                    | Bin 0 -> 1959 bytes
 3 files changed, 222 insertions(+), 53 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
index 209965b63..3ebffb5a1 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -18,7 +18,12 @@
 package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -52,9 +57,10 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -112,7 +118,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
             SeaTunnelDataType<?>[] types = new SeaTunnelDataType[schema.getFieldNames().size()];
             for (int i = 0; i < schema.getFieldNames().size(); i++) {
                 fields[i] = schema.getFieldNames().get(i);
-                types[i] = BasicType.STRING_TYPE;
+                types[i] = orcDataType2SeaTunnelDataType(schema.getChildren().get(i));
             }
             seaTunnelRowTypeInfo = new SeaTunnelRowType(fields, types);
             return seaTunnelRowTypeInfo;
@@ -159,7 +165,78 @@ public class OrcReadStrategy extends AbstractReadStrategy {
             return checkResult;
         } catch (FilePluginException | IOException e) {
             String errorMsg = String.format("Check orc file [%s] error", path);
-            throw new RuntimeException(errorMsg, e);
+            throw new UnsupportedOperationException(errorMsg, e);
+        }
+    }
+
+    private SeaTunnelDataType<?> orcDataType2SeaTunnelDataType(TypeDescription typeDescription) {
+        switch (typeDescription.getCategory()) {
+            case BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case INT:
+                return BasicType.INT_TYPE;
+            case BYTE:
+                return BasicType.BYTE_TYPE;
+            case SHORT:
+                return BasicType.SHORT_TYPE;
+            case LONG:
+                return BasicType.LONG_TYPE;
+            case FLOAT:
+                return BasicType.FLOAT_TYPE;
+            case DOUBLE:
+                return BasicType.DOUBLE_TYPE;
+            case BINARY:
+                return PrimitiveByteArrayType.INSTANCE;
+            case STRING:
+            case VARCHAR:
+            case CHAR:
+                return BasicType.STRING_TYPE;
+            case DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case TIMESTAMP:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            case DECIMAL:
+                int precision = typeDescription.getPrecision();
+                int scale = typeDescription.getScale();
+                return new DecimalType(precision, scale);
+            case LIST:
+                TypeDescription listType = typeDescription.getChildren().get(0);
+                SeaTunnelDataType<?> seaTunnelDataType = orcDataType2SeaTunnelDataType(listType);
+                switch (seaTunnelDataType.getSqlType()) {
+                    case STRING:
+                        return ArrayType.STRING_ARRAY_TYPE;
+                    case BOOLEAN:
+                        return ArrayType.BOOLEAN_ARRAY_TYPE;
+                    case TINYINT:
+                        return ArrayType.BYTE_ARRAY_TYPE;
+                    case SMALLINT:
+                        return ArrayType.SHORT_ARRAY_TYPE;
+                    case INT:
+                        return ArrayType.INT_ARRAY_TYPE;
+                    case BIGINT:
+                        return ArrayType.LONG_ARRAY_TYPE;
+                    case FLOAT:
+                        return ArrayType.FLOAT_ARRAY_TYPE;
+                    case DOUBLE:
+                        return ArrayType.DOUBLE_ARRAY_TYPE;
+                    default:
+                        String errorMsg = String.format("SeaTunnel array type not supported this genericType [%s] yet", seaTunnelDataType);
+                        throw new UnsupportedOperationException(errorMsg);
+                }
+            case MAP:
+                TypeDescription keyType = typeDescription.getChildren().get(0);
+                TypeDescription valueType = typeDescription.getChildren().get(1);
+                return new MapType<>(orcDataType2SeaTunnelDataType(keyType), orcDataType2SeaTunnelDataType(valueType));
+            case STRUCT:
+                List<TypeDescription> children = typeDescription.getChildren();
+                String[] fieldNames = typeDescription.getFieldNames().toArray(new String[0]);
+                SeaTunnelDataType<?>[] fieldTypes = children.stream().map(this::orcDataType2SeaTunnelDataType).toArray(SeaTunnelDataType<?>[]::new);
+                return new SeaTunnelRowType(fieldNames, fieldTypes);
+            default:
+                // do nothing
+                // never get in there
+                String errorMsg = String.format("SeaTunnel file connector not supported this orc type [%s] yet", typeDescription.getCategory());
+                throw new UnsupportedOperationException(errorMsg);
         }
     }
 
@@ -172,9 +249,12 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                     break;
                 case DOUBLE:
                     columnObj = ((DoubleColumnVector) colVec).vector[rowNum];
+                    if (colType.getCategory() == TypeDescription.Category.FLOAT) {
+                        columnObj = ((Double) columnObj).floatValue();
+                    }
                     break;
                 case BYTES:
-                    columnObj = readBytesVal(colVec, rowNum);
+                    columnObj = readBytesVal(colVec, colType, rowNum);
                     break;
                 case DECIMAL:
                     columnObj = readDecimalVal(colVec, rowNum);
@@ -195,9 +275,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                     columnObj = readUnionVal(colVec, colType, rowNum);
                     break;
                 default:
-                    throw new RuntimeException(
-                            "ReadColumn: unsupported ORC file column type: " + colVec.type.name()
-                    );
+                    throw new UnsupportedOperationException("ReadColumn: unsupported ORC file column type: " + colVec.type.name());
             }
         }
         return columnObj;
@@ -214,17 +292,24 @@ public class OrcReadStrategy extends AbstractReadStrategy {
             } else if (colType.getCategory() == TypeDescription.Category.BOOLEAN) {
                 colObj = longVal == 1 ? Boolean.TRUE : Boolean.FALSE;
             } else if (colType.getCategory() == TypeDescription.Category.DATE) {
-                colObj = new Date(longVal);
+                colObj = LocalDate.ofEpochDay(longVal);
+            } else if (colType.getCategory() == TypeDescription.Category.BYTE) {
+                colObj = (byte) longVal;
+            } else if (colType.getCategory() == TypeDescription.Category.SHORT) {
+                colObj = (short) longVal;
             }
         }
         return colObj;
     }
 
-    private Object readBytesVal(ColumnVector colVec, int rowNum) {
+    private Object readBytesVal(ColumnVector colVec, TypeDescription typeDescription, int rowNum) {
         Object bytesObj = null;
         if (!colVec.isNull[rowNum]) {
             BytesColumnVector bytesVector = (BytesColumnVector) colVec;
             bytesObj = bytesVector.toString(rowNum);
+            if (typeDescription.getCategory() == TypeDescription.Category.BINARY) {
+                bytesObj = ((String) bytesObj).getBytes();
+            }
         }
         return bytesObj;
     }
@@ -246,9 +331,9 @@ public class OrcReadStrategy extends AbstractReadStrategy {
             long millis = timestampVec.time[rowNum];
             Timestamp timestamp = new Timestamp(millis);
             timestamp.setNanos(nanos);
-            timestampVal = timestamp;
+            timestampVal = timestamp.toLocalDateTime();
             if (colType.getCategory() == TypeDescription.Category.DATE) {
-                timestampVal = new Date(timestamp.getTime());
+                timestampVal = LocalDate.ofEpochDay(timestamp.getTime());
             }
         }
         return timestampVal;
@@ -257,15 +342,15 @@ public class OrcReadStrategy extends AbstractReadStrategy {
     private Object readStructVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
         Object structObj = null;
         if (!colVec.isNull[rowNum]) {
-            List<Object> fieldValList = new ArrayList<>();
             StructColumnVector structVector = (StructColumnVector) colVec;
             ColumnVector[] fieldVec = structVector.fields;
+            Object[] fieldValues = new Object[fieldVec.length];
             List<TypeDescription> fieldTypes = colType.getChildren();
             for (int i = 0; i < fieldVec.length; i++) {
                 Object fieldObj = readColumn(fieldVec[i], fieldTypes.get(i), rowNum);
-                fieldValList.add(fieldObj);
+                fieldValues[i] = fieldObj;
             }
-            structObj = fieldValList;
+            structObj = new SeaTunnelRow(fieldValues);
         }
         return structObj;
     }
@@ -281,13 +366,13 @@ public class OrcReadStrategy extends AbstractReadStrategy {
             TypeDescription valueType = mapTypes.get(1);
             ColumnVector keyChild = mapVector.keys;
             ColumnVector valueChild = mapVector.values;
-            List<Object> keyList = readMapVector(keyChild, keyType, offset, mapSize);
-            List<Object> valueList = readMapVector(valueChild, valueType, offset, mapSize);
-            for (int i = 0; i < keyList.size(); i++) {
-                objMap.put(keyList.get(i), valueList.get(i));
+            Object[] keyList = readMapVector(keyChild, keyType, offset, mapSize);
+            Object[] valueList = readMapVector(valueChild, valueType, offset, mapSize);
+            for (int i = 0; i < keyList.length; i++) {
+                objMap.put(keyList[i], valueList[i]);
             }
         } else {
-            throw new RuntimeException("readMapVal: unsupported key or value types");
+            throw new UnsupportedOperationException("readMapVal: unsupported key or value types");
         }
         return objMap;
     }
@@ -307,13 +392,12 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                         valueType == ColumnVector.Type.TIMESTAMP;
     }
 
-    @SuppressWarnings("unchecked")
-    private List<Object> readMapVector(ColumnVector mapVector, TypeDescription childType, int offset, int numValues) {
-        List<Object> mapList;
+    private Object[] readMapVector(ColumnVector mapVector, TypeDescription childType, int offset, int numValues) {
+        Object[] mapList;
         switch (mapVector.type) {
             case BYTES:
                 mapList =
-                        (List<Object>) readBytesListVector(
+                        readBytesListVector(
                                 (BytesColumnVector) mapVector,
                                 childType,
                                 offset,
@@ -331,15 +415,16 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                 break;
             case DOUBLE:
                 mapList =
-                        (List<Object>) readDoubleListVector(
+                        readDoubleListVector(
                                 (DoubleColumnVector) mapVector,
+                                childType,
                                 offset,
                                 numValues
                         );
                 break;
             case DECIMAL:
                 mapList =
-                        (List<Object>) readDecimalListVector(
+                        readDecimalListVector(
                                 (DecimalColumnVector) mapVector,
                                 offset,
                                 numValues
@@ -347,7 +432,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                 break;
             case TIMESTAMP:
                 mapList =
-                        (List<Object>) readTimestampListVector(
+                        readTimestampListVector(
                                 (TimestampColumnVector) mapVector,
                                 childType,
                                 offset,
@@ -355,7 +440,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                         );
                 break;
             default:
-                throw new RuntimeException(mapVector.type.name() + " is not supported for MapColumnVectors");
+                throw new UnsupportedOperationException(mapVector.type.name() + " is not supported for MapColumnVectors");
         }
         return mapList;
     }
@@ -372,14 +457,10 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                 Object unionValue = readColumn(fieldVector, fieldType, rowNum);
                 columnValuePair = Pair.of(fieldType, unionValue);
             } else {
-                throw new RuntimeException(
-                        "readUnionVal: union tag value out of range for union column vectors"
-                );
+                throw new UnsupportedOperationException("readUnionVal: union tag value out of range for union column vectors");
             }
         } else {
-            throw new RuntimeException(
-                    "readUnionVal: union tag value out of range for union types"
-            );
+            throw new UnsupportedOperationException("readUnionVal: union tag value out of range for union types");
         }
         return columnValuePair;
     }
@@ -395,7 +476,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                     listValues = readLongListValues(listVector, childType, rowNum);
                     break;
                 case DOUBLE:
-                    listValues = readDoubleListValues(listVector, rowNum);
+                    listValues = readDoubleListValues(listVector, colType, rowNum);
                     break;
                 case BYTES:
                     listValues = readBytesListValues(listVector, childType, rowNum);
@@ -407,9 +488,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                     listValues = readTimestampListValues(listVector, childType, rowNum);
                     break;
                 default:
-                    throw new RuntimeException(
-                            listVector.type.name() + " is not supported for ListColumnVectors"
-                    );
+                    throw new UnsupportedOperationException(listVector.type.name() + " is not supported for ListColumnVectors");
             }
         }
         return listValues;
@@ -422,7 +501,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
         return readLongListVector(longVector, childType, offset, numValues);
     }
 
-    private List<Object> readLongListVector(LongColumnVector longVector, TypeDescription childType, int offset, int numValues) {
+    private Object[] readLongListVector(LongColumnVector longVector, TypeDescription childType, int offset, int numValues) {
         List<Object> longList = new ArrayList<>();
         for (int i = 0; i < numValues; i++) {
             if (!longVector.isNull[offset + i]) {
@@ -433,6 +512,12 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                 } else if (childType.getCategory() == TypeDescription.Category.INT) {
                     Integer intObj = (int) longVal;
                     longList.add(intObj);
+                } else if (childType.getCategory() == TypeDescription.Category.BYTE) {
+                    Byte byteObj = (byte) longVal;
+                    longList.add(byteObj);
+                } else if (childType.getCategory() == TypeDescription.Category.SHORT) {
+                    Short shortObj = (short) longVal;
+                    longList.add(shortObj);
                 } else {
                     longList.add(longVal);
                 }
@@ -440,27 +525,45 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                 longList.add(null);
             }
         }
-        return longList;
+        if (childType.getCategory() == TypeDescription.Category.BOOLEAN) {
+            return longList.toArray(new Boolean[0]);
+        } else if (childType.getCategory() == TypeDescription.Category.INT) {
+            return longList.toArray(new Integer[0]);
+        } else if (childType.getCategory() == TypeDescription.Category.BYTE) {
+            return longList.toArray(new Byte[0]);
+        } else if (childType.getCategory() == TypeDescription.Category.SHORT) {
+            return longList.toArray(new Short[0]);
+        } else {
+            return longList.toArray(new Long[0]);
+        }
     }
 
-    private Object readDoubleListValues(ListColumnVector listVector, int rowNum) {
+    private Object readDoubleListValues(ListColumnVector listVector, TypeDescription colType, int rowNum) {
         int offset = (int) listVector.offsets[rowNum];
         int numValues = (int) listVector.lengths[rowNum];
         DoubleColumnVector doubleVec = (DoubleColumnVector) listVector.child;
-        return readDoubleListVector(doubleVec, offset, numValues);
+        return readDoubleListVector(doubleVec, colType, offset, numValues);
     }
 
-    private Object readDoubleListVector(DoubleColumnVector doubleVec, int offset, int numValues) {
+    private Object[] readDoubleListVector(DoubleColumnVector doubleVec, TypeDescription colType, int offset, int numValues) {
         List<Object> doubleList = new ArrayList<>();
         for (int i = 0; i < numValues; i++) {
             if (!doubleVec.isNull[offset + i]) {
                 Double doubleVal = doubleVec.vector[offset + i];
-                doubleList.add(doubleVal);
+                if (colType.getCategory() == TypeDescription.Category.FLOAT) {
+                    doubleList.add(doubleVal.floatValue());
+                } else {
+                    doubleList.add(doubleVal);
+                }
             } else {
                 doubleList.add(null);
             }
         }
-        return doubleList;
+        if (colType.getCategory() == TypeDescription.Category.FLOAT) {
+            return doubleList.toArray(new Float[0]);
+        } else {
+            return doubleList.toArray(new Double[0]);
+        }
     }
 
     private Object readBytesListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
@@ -470,7 +573,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
         return readBytesListVector(bytesVec, childType, offset, numValues);
     }
 
-    private Object readBytesListVector(BytesColumnVector bytesVec, TypeDescription childType, int offset, int numValues) {
+    private Object[] readBytesListVector(BytesColumnVector bytesVec, TypeDescription childType, int offset, int numValues) {
         List<Object> bytesValList = new ArrayList<>();
         for (int i = 0; i < numValues; i++) {
             if (!bytesVec.isNull[offset + i]) {
@@ -488,7 +591,11 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                 bytesValList.add(null);
             }
         }
-        return bytesValList;
+        if (childType.getCategory() == TypeDescription.Category.STRING) {
+            return bytesValList.toArray(new String[0]);
+        } else {
+            return bytesValList.toArray();
+        }
     }
 
     private Object readDecimalListValues(ListColumnVector listVector, int rowNum) {
@@ -498,7 +605,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
         return readDecimalListVector(decimalVec, offset, numValues);
     }
 
-    private Object readDecimalListVector(DecimalColumnVector decimalVector, int offset, int numValues) {
+    private Object[] readDecimalListVector(DecimalColumnVector decimalVector, int offset, int numValues) {
         List<Object> decimalList = new ArrayList<>();
         for (int i = 0; i < numValues; i++) {
             if (!decimalVector.isNull[offset + i]) {
@@ -508,7 +615,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                 decimalList.add(null);
             }
         }
-        return decimalList;
+        return decimalList.toArray(new BigDecimal[0]);
     }
 
     private Object readTimestampListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
@@ -518,7 +625,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
         return readTimestampListVector(timestampVec, childType, offset, numValues);
     }
 
-    private Object readTimestampListVector(TimestampColumnVector timestampVector, TypeDescription childType, int offset, int numValues) {
+    private Object[] readTimestampListVector(TimestampColumnVector timestampVector, TypeDescription childType, int offset, int numValues) {
         List<Object> timestampList = new ArrayList<>();
         for (int i = 0; i < numValues; i++) {
             if (!timestampVector.isNull[offset + i]) {
@@ -527,16 +634,20 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                 Timestamp timestamp = new Timestamp(millis);
                 timestamp.setNanos(nanos);
                 if (childType.getCategory() == TypeDescription.Category.DATE) {
-                    Date date = new Date(timestamp.getTime());
-                    timestampList.add(date);
+                    LocalDate localDate = LocalDate.ofEpochDay(timestamp.getTime());
+                    timestampList.add(localDate);
                 } else {
-                    timestampList.add(timestamp);
+                    timestampList.add(timestamp.toLocalDateTime());
                 }
             } else {
                 timestampList.add(null);
             }
         }
-        return timestampList;
+        if (childType.getCategory() == TypeDescription.Category.DATE) {
+            return timestampList.toArray(new LocalDate[0]);
+        } else {
+            return timestampList.toArray(new LocalDateTime[0]);
+        }
     }
 }
 
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
new file mode 100644
index 000000000..dc32fbe22
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.writer;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URL;
+import java.nio.file.Paths;
+
+public class OrcReadStrategyTest {
+
+    @Test
+    public void testOrcRead() throws Exception {
+        URL resource = OrcReadStrategyTest.class.getResource("/test.orc");
+        assert resource != null;
+        String path = Paths.get(resource.toURI()).toString();
+        OrcReadStrategy orcReadStrategy = new OrcReadStrategy();
+        orcReadStrategy.init(null);
+        TestCollector testCollector = new TestCollector();
+        orcReadStrategy.read(path, testCollector);
+    }
+
+    public static class TestCollector implements Collector<SeaTunnelRow> {
+
+        @SuppressWarnings("checkstyle:RegexpSingleline")
+        @Override
+        public void collect(SeaTunnelRow record) {
+            System.out.println(record);
+            Assertions.assertEquals(record.getField(16).getClass(), SeaTunnelRow.class);
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return null;
+        }
+    }
+
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test.orc b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test.orc
new file mode 100644
index 000000000..5f5d5b969
Binary files /dev/null and b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test.orc differ