You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/25 13:08:07 UTC
[incubator-seatunnel] branch dev updated: [Bug][Core] fix ArrayType convert for spark2.4.0 and improve the convert method name (#2518)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 28fb446dd [Bug][Core] fix ArrayType convert for spark2.4.0 and improve the convert method name (#2518)
28fb446dd is described below
commit 28fb446ddb41e269836419f26ef05980c9ec960a
Author: Laglangyue <35...@users.noreply.github.com>
AuthorDate: Thu Aug 25 21:08:00 2022 +0800
[Bug][Core] fix ArrayType convert for spark2.4.0 and improve the convert method name (#2518)
* array transfer for spark
* checkstyle
---
.../src/main/resources/examples/spark.batch.conf | 4 +--
.../translation/serialization/RowConverter.java | 2 +-
.../flink/serialization/FlinkRowConverter.java | 2 +-
.../translation/flink/sink/FlinkSinkWriter.java | 2 +-
.../translation/spark/sink/SparkDataWriter.java | 2 +-
.../common/serialization/InternalRowConverter.java | 32 ++++++++++++++++++++--
6 files changed, 35 insertions(+), 9 deletions(-)
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 6e7755d8c..25ae6b567 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -36,7 +36,7 @@ source {
schema = {
fields {
c_map = "map<string, string>"
- c_array = "array<tinyint>"
+ c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
@@ -72,7 +72,7 @@ transform {
# you can also use other transform plugins, such as sql
sql {
- sql = "select c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes from fake"
+ sql = "select c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes from fake"
result_table_name = "sql"
}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
index f642cfe45..95a1b96b6 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
@@ -124,5 +124,5 @@ public abstract class RowConverter<T> {
*
* @throws IOException Thrown, if the conversion fails.
*/
- public abstract SeaTunnelRow convert(T engineRow) throws IOException;
+ public abstract SeaTunnelRow reconvert(T engineRow) throws IOException;
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
index 89e56f3d3..0eba14b52 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
@@ -86,7 +86,7 @@ public class FlinkRowConverter extends RowConverter<Row> {
}
@Override
- public SeaTunnelRow convert(Row engineRow) throws IOException {
+ public SeaTunnelRow reconvert(Row engineRow) throws IOException {
return (SeaTunnelRow) reconvert(engineRow, dataType);
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 5e221a289..beb5976a0 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -48,7 +48,7 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements SinkWriter<
@Override
public void write(InputT element, org.apache.flink.api.connector.sink.SinkWriter.Context context) throws IOException {
if (element instanceof Row) {
- sinkWriter.write(rowSerialization.convert((Row) element));
+ sinkWriter.write(rowSerialization.reconvert((Row) element));
} else {
throw new InvalidClassException("only support Flink Row at now, the element Class is " + element.getClass());
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
index cc42bb1a6..6b65eede7 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
@@ -55,7 +55,7 @@ public class SparkDataWriter<CommitInfoT, StateT> implements DataWriter<Internal
@Override
public void write(InternalRow record) throws IOException {
- sinkWriter.write(rowConverter.convert(record));
+ sinkWriter.write(rowConverter.reconvert(record));
}
@Override
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
index 0224654d6..9facf8fe3 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.translation.spark.common.serialization;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -35,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.MutableLong;
import org.apache.spark.sql.catalyst.expressions.MutableShort;
import org.apache.spark.sql.catalyst.expressions.MutableValue;
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;
@@ -84,6 +87,8 @@ public final class InternalRowConverter extends RowConverter<InternalRow> {
return UTF8String.fromString((String) field);
case DECIMAL:
return Decimal.apply((BigDecimal) field);
+ case ARRAY:
+ return ArrayData.toArrayData(field);
default:
return field;
}
@@ -146,11 +151,11 @@ public final class InternalRowConverter extends RowConverter<InternalRow> {
}
@Override
- public SeaTunnelRow convert(InternalRow engineRow) throws IOException {
+ public SeaTunnelRow reconvert(InternalRow engineRow) throws IOException {
return (SeaTunnelRow) reconvert(engineRow, dataType);
}
- public static Object reconvert(Object field, SeaTunnelDataType<?> dataType) {
+ private static Object reconvert(Object field, SeaTunnelDataType<?> dataType) {
if (field == null) {
return null;
}
@@ -170,12 +175,33 @@ public final class InternalRowConverter extends RowConverter<InternalRow> {
return field.toString();
case DECIMAL:
return ((Decimal) field).toJavaBigDecimal();
+ case ARRAY:
+ ArrayData arrayData = (ArrayData) field;
+ BasicType<?> elementType = ((ArrayType<?, ?>) dataType).getElementType();
+ switch (elementType.getSqlType()) {
+ case INT:
+ return arrayData.toIntArray();
+ case TINYINT:
+ return arrayData.toByteArray();
+ case SMALLINT:
+ return arrayData.toShortArray();
+ case BIGINT:
+ return arrayData.toLongArray();
+ case BOOLEAN:
+ return arrayData.toBooleanArray();
+ case FLOAT:
+ return arrayData.toFloatArray();
+ case DOUBLE:
+ return arrayData.toDoubleArray();
+ default:
+ return arrayData.array();
+ }
default:
return field;
}
}
- public static SeaTunnelRow reconvert(InternalRow engineRow, SeaTunnelRowType rowType) {
+ private static SeaTunnelRow reconvert(InternalRow engineRow, SeaTunnelRowType rowType) {
Object[] fields = new Object[engineRow.numFields()];
for (int i = 0; i < engineRow.numFields(); i++) {
fields[i] = reconvert(engineRow.get(i, TypeConverterUtils.convert(rowType.getFieldType(i))),