You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/09/20 12:29:53 UTC
[incubator-seatunnel] branch dev updated: [Improve][Connector-V2] Imporve orc read strategy (#2747)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new af34beda3 [Improve][Connector-V2] Imporve orc read strategy (#2747)
af34beda3 is described below
commit af34beda376e49ffa376bfb07bf2222241557638
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Tue Sep 20 20:29:47 2022 +0800
[Improve][Connector-V2] Imporve orc read strategy (#2747)
* [Improve][Connector-V2] Imporve read orc
---
.../file/source/reader/OrcReadStrategy.java | 217 ++++++++++++++++-----
.../seatunnel/file/writer/OrcReadStrategyTest.java | 58 ++++++
.../src/test/resources/test.orc | Bin 0 -> 1959 bytes
3 files changed, 222 insertions(+), 53 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
index 209965b63..3ebffb5a1 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -18,7 +18,12 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -52,9 +57,10 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -112,7 +118,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
SeaTunnelDataType<?>[] types = new SeaTunnelDataType[schema.getFieldNames().size()];
for (int i = 0; i < schema.getFieldNames().size(); i++) {
fields[i] = schema.getFieldNames().get(i);
- types[i] = BasicType.STRING_TYPE;
+ types[i] = orcDataType2SeaTunnelDataType(schema.getChildren().get(i));
}
seaTunnelRowTypeInfo = new SeaTunnelRowType(fields, types);
return seaTunnelRowTypeInfo;
@@ -159,7 +165,78 @@ public class OrcReadStrategy extends AbstractReadStrategy {
return checkResult;
} catch (FilePluginException | IOException e) {
String errorMsg = String.format("Check orc file [%s] error", path);
- throw new RuntimeException(errorMsg, e);
+ throw new UnsupportedOperationException(errorMsg, e);
+ }
+ }
+
+ private SeaTunnelDataType<?> orcDataType2SeaTunnelDataType(TypeDescription typeDescription) {
+ switch (typeDescription.getCategory()) {
+ case BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case INT:
+ return BasicType.INT_TYPE;
+ case BYTE:
+ return BasicType.BYTE_TYPE;
+ case SHORT:
+ return BasicType.SHORT_TYPE;
+ case LONG:
+ return BasicType.LONG_TYPE;
+ case FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case DOUBLE:
+ return BasicType.DOUBLE_TYPE;
+ case BINARY:
+ return PrimitiveByteArrayType.INSTANCE;
+ case STRING:
+ case VARCHAR:
+ case CHAR:
+ return BasicType.STRING_TYPE;
+ case DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case TIMESTAMP:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ case DECIMAL:
+ int precision = typeDescription.getPrecision();
+ int scale = typeDescription.getScale();
+ return new DecimalType(precision, scale);
+ case LIST:
+ TypeDescription listType = typeDescription.getChildren().get(0);
+ SeaTunnelDataType<?> seaTunnelDataType = orcDataType2SeaTunnelDataType(listType);
+ switch (seaTunnelDataType.getSqlType()) {
+ case STRING:
+ return ArrayType.STRING_ARRAY_TYPE;
+ case BOOLEAN:
+ return ArrayType.BOOLEAN_ARRAY_TYPE;
+ case TINYINT:
+ return ArrayType.BYTE_ARRAY_TYPE;
+ case SMALLINT:
+ return ArrayType.SHORT_ARRAY_TYPE;
+ case INT:
+ return ArrayType.INT_ARRAY_TYPE;
+ case BIGINT:
+ return ArrayType.LONG_ARRAY_TYPE;
+ case FLOAT:
+ return ArrayType.FLOAT_ARRAY_TYPE;
+ case DOUBLE:
+ return ArrayType.DOUBLE_ARRAY_TYPE;
+ default:
+ String errorMsg = String.format("SeaTunnel array type not supported this genericType [%s] yet", seaTunnelDataType);
+ throw new UnsupportedOperationException(errorMsg);
+ }
+ case MAP:
+ TypeDescription keyType = typeDescription.getChildren().get(0);
+ TypeDescription valueType = typeDescription.getChildren().get(1);
+ return new MapType<>(orcDataType2SeaTunnelDataType(keyType), orcDataType2SeaTunnelDataType(valueType));
+ case STRUCT:
+ List<TypeDescription> children = typeDescription.getChildren();
+ String[] fieldNames = typeDescription.getFieldNames().toArray(new String[0]);
+ SeaTunnelDataType<?>[] fieldTypes = children.stream().map(this::orcDataType2SeaTunnelDataType).toArray(SeaTunnelDataType<?>[]::new);
+ return new SeaTunnelRowType(fieldNames, fieldTypes);
+ default:
+ // do nothing
+ // never get in there
+ String errorMsg = String.format("SeaTunnel file connector not supported this orc type [%s] yet", typeDescription.getCategory());
+ throw new UnsupportedOperationException(errorMsg);
}
}
@@ -172,9 +249,12 @@ public class OrcReadStrategy extends AbstractReadStrategy {
break;
case DOUBLE:
columnObj = ((DoubleColumnVector) colVec).vector[rowNum];
+ if (colType.getCategory() == TypeDescription.Category.FLOAT) {
+ columnObj = ((Double) columnObj).floatValue();
+ }
break;
case BYTES:
- columnObj = readBytesVal(colVec, rowNum);
+ columnObj = readBytesVal(colVec, colType, rowNum);
break;
case DECIMAL:
columnObj = readDecimalVal(colVec, rowNum);
@@ -195,9 +275,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
columnObj = readUnionVal(colVec, colType, rowNum);
break;
default:
- throw new RuntimeException(
- "ReadColumn: unsupported ORC file column type: " + colVec.type.name()
- );
+ throw new UnsupportedOperationException("ReadColumn: unsupported ORC file column type: " + colVec.type.name());
}
}
return columnObj;
@@ -214,17 +292,24 @@ public class OrcReadStrategy extends AbstractReadStrategy {
} else if (colType.getCategory() == TypeDescription.Category.BOOLEAN) {
colObj = longVal == 1 ? Boolean.TRUE : Boolean.FALSE;
} else if (colType.getCategory() == TypeDescription.Category.DATE) {
- colObj = new Date(longVal);
+ colObj = LocalDate.ofEpochDay(longVal);
+ } else if (colType.getCategory() == TypeDescription.Category.BYTE) {
+ colObj = (byte) longVal;
+ } else if (colType.getCategory() == TypeDescription.Category.SHORT) {
+ colObj = (short) longVal;
}
}
return colObj;
}
- private Object readBytesVal(ColumnVector colVec, int rowNum) {
+ private Object readBytesVal(ColumnVector colVec, TypeDescription typeDescription, int rowNum) {
Object bytesObj = null;
if (!colVec.isNull[rowNum]) {
BytesColumnVector bytesVector = (BytesColumnVector) colVec;
bytesObj = bytesVector.toString(rowNum);
+ if (typeDescription.getCategory() == TypeDescription.Category.BINARY) {
+ bytesObj = ((String) bytesObj).getBytes();
+ }
}
return bytesObj;
}
@@ -246,9 +331,9 @@ public class OrcReadStrategy extends AbstractReadStrategy {
long millis = timestampVec.time[rowNum];
Timestamp timestamp = new Timestamp(millis);
timestamp.setNanos(nanos);
- timestampVal = timestamp;
+ timestampVal = timestamp.toLocalDateTime();
if (colType.getCategory() == TypeDescription.Category.DATE) {
- timestampVal = new Date(timestamp.getTime());
+ timestampVal = LocalDate.ofEpochDay(timestamp.getTime());
}
}
return timestampVal;
@@ -257,15 +342,15 @@ public class OrcReadStrategy extends AbstractReadStrategy {
private Object readStructVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
Object structObj = null;
if (!colVec.isNull[rowNum]) {
- List<Object> fieldValList = new ArrayList<>();
StructColumnVector structVector = (StructColumnVector) colVec;
ColumnVector[] fieldVec = structVector.fields;
+ Object[] fieldValues = new Object[fieldVec.length];
List<TypeDescription> fieldTypes = colType.getChildren();
for (int i = 0; i < fieldVec.length; i++) {
Object fieldObj = readColumn(fieldVec[i], fieldTypes.get(i), rowNum);
- fieldValList.add(fieldObj);
+ fieldValues[i] = fieldObj;
}
- structObj = fieldValList;
+ structObj = new SeaTunnelRow(fieldValues);
}
return structObj;
}
@@ -281,13 +366,13 @@ public class OrcReadStrategy extends AbstractReadStrategy {
TypeDescription valueType = mapTypes.get(1);
ColumnVector keyChild = mapVector.keys;
ColumnVector valueChild = mapVector.values;
- List<Object> keyList = readMapVector(keyChild, keyType, offset, mapSize);
- List<Object> valueList = readMapVector(valueChild, valueType, offset, mapSize);
- for (int i = 0; i < keyList.size(); i++) {
- objMap.put(keyList.get(i), valueList.get(i));
+ Object[] keyList = readMapVector(keyChild, keyType, offset, mapSize);
+ Object[] valueList = readMapVector(valueChild, valueType, offset, mapSize);
+ for (int i = 0; i < keyList.length; i++) {
+ objMap.put(keyList[i], valueList[i]);
}
} else {
- throw new RuntimeException("readMapVal: unsupported key or value types");
+ throw new UnsupportedOperationException("readMapVal: unsupported key or value types");
}
return objMap;
}
@@ -307,13 +392,12 @@ public class OrcReadStrategy extends AbstractReadStrategy {
valueType == ColumnVector.Type.TIMESTAMP;
}
- @SuppressWarnings("unchecked")
- private List<Object> readMapVector(ColumnVector mapVector, TypeDescription childType, int offset, int numValues) {
- List<Object> mapList;
+ private Object[] readMapVector(ColumnVector mapVector, TypeDescription childType, int offset, int numValues) {
+ Object[] mapList;
switch (mapVector.type) {
case BYTES:
mapList =
- (List<Object>) readBytesListVector(
+ readBytesListVector(
(BytesColumnVector) mapVector,
childType,
offset,
@@ -331,15 +415,16 @@ public class OrcReadStrategy extends AbstractReadStrategy {
break;
case DOUBLE:
mapList =
- (List<Object>) readDoubleListVector(
+ readDoubleListVector(
(DoubleColumnVector) mapVector,
+ childType,
offset,
numValues
);
break;
case DECIMAL:
mapList =
- (List<Object>) readDecimalListVector(
+ readDecimalListVector(
(DecimalColumnVector) mapVector,
offset,
numValues
@@ -347,7 +432,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
break;
case TIMESTAMP:
mapList =
- (List<Object>) readTimestampListVector(
+ readTimestampListVector(
(TimestampColumnVector) mapVector,
childType,
offset,
@@ -355,7 +440,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
);
break;
default:
- throw new RuntimeException(mapVector.type.name() + " is not supported for MapColumnVectors");
+ throw new UnsupportedOperationException(mapVector.type.name() + " is not supported for MapColumnVectors");
}
return mapList;
}
@@ -372,14 +457,10 @@ public class OrcReadStrategy extends AbstractReadStrategy {
Object unionValue = readColumn(fieldVector, fieldType, rowNum);
columnValuePair = Pair.of(fieldType, unionValue);
} else {
- throw new RuntimeException(
- "readUnionVal: union tag value out of range for union column vectors"
- );
+ throw new UnsupportedOperationException("readUnionVal: union tag value out of range for union column vectors");
}
} else {
- throw new RuntimeException(
- "readUnionVal: union tag value out of range for union types"
- );
+ throw new UnsupportedOperationException("readUnionVal: union tag value out of range for union types");
}
return columnValuePair;
}
@@ -395,7 +476,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
listValues = readLongListValues(listVector, childType, rowNum);
break;
case DOUBLE:
- listValues = readDoubleListValues(listVector, rowNum);
+ listValues = readDoubleListValues(listVector, colType, rowNum);
break;
case BYTES:
listValues = readBytesListValues(listVector, childType, rowNum);
@@ -407,9 +488,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
listValues = readTimestampListValues(listVector, childType, rowNum);
break;
default:
- throw new RuntimeException(
- listVector.type.name() + " is not supported for ListColumnVectors"
- );
+ throw new UnsupportedOperationException(listVector.type.name() + " is not supported for ListColumnVectors");
}
}
return listValues;
@@ -422,7 +501,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
return readLongListVector(longVector, childType, offset, numValues);
}
- private List<Object> readLongListVector(LongColumnVector longVector, TypeDescription childType, int offset, int numValues) {
+ private Object[] readLongListVector(LongColumnVector longVector, TypeDescription childType, int offset, int numValues) {
List<Object> longList = new ArrayList<>();
for (int i = 0; i < numValues; i++) {
if (!longVector.isNull[offset + i]) {
@@ -433,6 +512,12 @@ public class OrcReadStrategy extends AbstractReadStrategy {
} else if (childType.getCategory() == TypeDescription.Category.INT) {
Integer intObj = (int) longVal;
longList.add(intObj);
+ } else if (childType.getCategory() == TypeDescription.Category.BYTE) {
+ Byte byteObj = (byte) longVal;
+ longList.add(byteObj);
+ } else if (childType.getCategory() == TypeDescription.Category.SHORT) {
+ Short shortObj = (short) longVal;
+ longList.add(shortObj);
} else {
longList.add(longVal);
}
@@ -440,27 +525,45 @@ public class OrcReadStrategy extends AbstractReadStrategy {
longList.add(null);
}
}
- return longList;
+ if (childType.getCategory() == TypeDescription.Category.BOOLEAN) {
+ return longList.toArray(new Boolean[0]);
+ } else if (childType.getCategory() == TypeDescription.Category.INT) {
+ return longList.toArray(new Integer[0]);
+ } else if (childType.getCategory() == TypeDescription.Category.BYTE) {
+ return longList.toArray(new Byte[0]);
+ } else if (childType.getCategory() == TypeDescription.Category.SHORT) {
+ return longList.toArray(new Short[0]);
+ } else {
+ return longList.toArray(new Long[0]);
+ }
}
- private Object readDoubleListValues(ListColumnVector listVector, int rowNum) {
+ private Object readDoubleListValues(ListColumnVector listVector, TypeDescription colType, int rowNum) {
int offset = (int) listVector.offsets[rowNum];
int numValues = (int) listVector.lengths[rowNum];
DoubleColumnVector doubleVec = (DoubleColumnVector) listVector.child;
- return readDoubleListVector(doubleVec, offset, numValues);
+ return readDoubleListVector(doubleVec, colType, offset, numValues);
}
- private Object readDoubleListVector(DoubleColumnVector doubleVec, int offset, int numValues) {
+ private Object[] readDoubleListVector(DoubleColumnVector doubleVec, TypeDescription colType, int offset, int numValues) {
List<Object> doubleList = new ArrayList<>();
for (int i = 0; i < numValues; i++) {
if (!doubleVec.isNull[offset + i]) {
Double doubleVal = doubleVec.vector[offset + i];
- doubleList.add(doubleVal);
+ if (colType.getCategory() == TypeDescription.Category.FLOAT) {
+ doubleList.add(doubleVal.floatValue());
+ } else {
+ doubleList.add(doubleVal);
+ }
} else {
doubleList.add(null);
}
}
- return doubleList;
+ if (colType.getCategory() == TypeDescription.Category.FLOAT) {
+ return doubleList.toArray(new Float[0]);
+ } else {
+ return doubleList.toArray(new Double[0]);
+ }
}
private Object readBytesListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
@@ -470,7 +573,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
return readBytesListVector(bytesVec, childType, offset, numValues);
}
- private Object readBytesListVector(BytesColumnVector bytesVec, TypeDescription childType, int offset, int numValues) {
+ private Object[] readBytesListVector(BytesColumnVector bytesVec, TypeDescription childType, int offset, int numValues) {
List<Object> bytesValList = new ArrayList<>();
for (int i = 0; i < numValues; i++) {
if (!bytesVec.isNull[offset + i]) {
@@ -488,7 +591,11 @@ public class OrcReadStrategy extends AbstractReadStrategy {
bytesValList.add(null);
}
}
- return bytesValList;
+ if (childType.getCategory() == TypeDescription.Category.STRING) {
+ return bytesValList.toArray(new String[0]);
+ } else {
+ return bytesValList.toArray();
+ }
}
private Object readDecimalListValues(ListColumnVector listVector, int rowNum) {
@@ -498,7 +605,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
return readDecimalListVector(decimalVec, offset, numValues);
}
- private Object readDecimalListVector(DecimalColumnVector decimalVector, int offset, int numValues) {
+ private Object[] readDecimalListVector(DecimalColumnVector decimalVector, int offset, int numValues) {
List<Object> decimalList = new ArrayList<>();
for (int i = 0; i < numValues; i++) {
if (!decimalVector.isNull[offset + i]) {
@@ -508,7 +615,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
decimalList.add(null);
}
}
- return decimalList;
+ return decimalList.toArray(new BigDecimal[0]);
}
private Object readTimestampListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
@@ -518,7 +625,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
return readTimestampListVector(timestampVec, childType, offset, numValues);
}
- private Object readTimestampListVector(TimestampColumnVector timestampVector, TypeDescription childType, int offset, int numValues) {
+ private Object[] readTimestampListVector(TimestampColumnVector timestampVector, TypeDescription childType, int offset, int numValues) {
List<Object> timestampList = new ArrayList<>();
for (int i = 0; i < numValues; i++) {
if (!timestampVector.isNull[offset + i]) {
@@ -527,16 +634,20 @@ public class OrcReadStrategy extends AbstractReadStrategy {
Timestamp timestamp = new Timestamp(millis);
timestamp.setNanos(nanos);
if (childType.getCategory() == TypeDescription.Category.DATE) {
- Date date = new Date(timestamp.getTime());
- timestampList.add(date);
+ LocalDate localDate = LocalDate.ofEpochDay(timestamp.getTime());
+ timestampList.add(localDate);
} else {
- timestampList.add(timestamp);
+ timestampList.add(timestamp.toLocalDateTime());
}
} else {
timestampList.add(null);
}
}
- return timestampList;
+ if (childType.getCategory() == TypeDescription.Category.DATE) {
+ return timestampList.toArray(new LocalDate[0]);
+ } else {
+ return timestampList.toArray(new LocalDateTime[0]);
+ }
}
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
new file mode 100644
index 000000000..dc32fbe22
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.writer;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URL;
+import java.nio.file.Paths;
+
+public class OrcReadStrategyTest {
+
+ @Test
+ public void testOrcRead() throws Exception {
+ URL resource = OrcReadStrategyTest.class.getResource("/test.orc");
+ assert resource != null;
+ String path = Paths.get(resource.toURI()).toString();
+ OrcReadStrategy orcReadStrategy = new OrcReadStrategy();
+ orcReadStrategy.init(null);
+ TestCollector testCollector = new TestCollector();
+ orcReadStrategy.read(path, testCollector);
+ }
+
+ public static class TestCollector implements Collector<SeaTunnelRow> {
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ @Override
+ public void collect(SeaTunnelRow record) {
+ System.out.println(record);
+ Assertions.assertEquals(record.getField(16).getClass(), SeaTunnelRow.class);
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return null;
+ }
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test.orc b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test.orc
new file mode 100644
index 000000000..5f5d5b969
Binary files /dev/null and b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test.orc differ