You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/25 13:08:07 UTC

[incubator-seatunnel] branch dev updated: [Bug][Core] fix ArrayType convert for spark2.4.0 and improve the convert method name (#2518)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 28fb446dd [Bug][Core] fix ArrayType convert for spark2.4.0 and improve the convert method name (#2518)
28fb446dd is described below

commit 28fb446ddb41e269836419f26ef05980c9ec960a
Author: Laglangyue <35...@users.noreply.github.com>
AuthorDate: Thu Aug 25 21:08:00 2022 +0800

    [Bug][Core] fix ArrayType convert for spark2.4.0 and improve the convert method name (#2518)
    
    * array transfer for spark
    
    * checkstyle
---
 .../src/main/resources/examples/spark.batch.conf   |  4 +--
 .../translation/serialization/RowConverter.java    |  2 +-
 .../flink/serialization/FlinkRowConverter.java     |  2 +-
 .../translation/flink/sink/FlinkSinkWriter.java    |  2 +-
 .../translation/spark/sink/SparkDataWriter.java    |  2 +-
 .../common/serialization/InternalRowConverter.java | 32 ++++++++++++++++++++--
 6 files changed, 35 insertions(+), 9 deletions(-)

diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 6e7755d8c..25ae6b567 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -36,7 +36,7 @@ source {
     schema = {
       fields {
         c_map = "map<string, string>"
-        c_array = "array<tinyint>"
+        c_array = "array<int>"
         c_string = string
         c_boolean = boolean
         c_tinyint = tinyint
@@ -72,7 +72,7 @@ transform {
 
   # you can also use other transform plugins, such as sql
   sql {
-    sql = "select c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes from fake"
+    sql = "select c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes from fake"
     result_table_name = "sql"
   }
 
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
index f642cfe45..95a1b96b6 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
@@ -124,5 +124,5 @@ public abstract class RowConverter<T> {
      *
      * @throws IOException Thrown, if the conversion fails.
      */
-    public abstract SeaTunnelRow convert(T engineRow) throws IOException;
+    public abstract SeaTunnelRow reconvert(T engineRow) throws IOException;
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
index 89e56f3d3..0eba14b52 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
@@ -86,7 +86,7 @@ public class FlinkRowConverter extends RowConverter<Row> {
     }
 
     @Override
-    public SeaTunnelRow convert(Row engineRow) throws IOException {
+    public SeaTunnelRow reconvert(Row engineRow) throws IOException {
         return (SeaTunnelRow) reconvert(engineRow, dataType);
     }
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 5e221a289..beb5976a0 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -48,7 +48,7 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements SinkWriter<
     @Override
     public void write(InputT element, org.apache.flink.api.connector.sink.SinkWriter.Context context) throws IOException {
         if (element instanceof Row) {
-            sinkWriter.write(rowSerialization.convert((Row) element));
+            sinkWriter.write(rowSerialization.reconvert((Row) element));
         } else {
             throw new InvalidClassException("only support Flink Row at now, the element Class is " + element.getClass());
         }
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
index cc42bb1a6..6b65eede7 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
@@ -55,7 +55,7 @@ public class SparkDataWriter<CommitInfoT, StateT> implements DataWriter<Internal
 
     @Override
     public void write(InternalRow record) throws IOException {
-        sinkWriter.write(rowConverter.convert(record));
+        sinkWriter.write(rowConverter.reconvert(record));
     }
 
     @Override
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
index 0224654d6..9facf8fe3 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.translation.spark.common.serialization;
 
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -35,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.MutableLong;
 import org.apache.spark.sql.catalyst.expressions.MutableShort;
 import org.apache.spark.sql.catalyst.expressions.MutableValue;
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.unsafe.types.UTF8String;
 
@@ -84,6 +87,8 @@ public final class InternalRowConverter extends RowConverter<InternalRow> {
                 return UTF8String.fromString((String) field);
             case DECIMAL:
                 return Decimal.apply((BigDecimal) field);
+            case ARRAY:
+                return ArrayData.toArrayData(field);
             default:
                 return field;
         }
@@ -146,11 +151,11 @@ public final class InternalRowConverter extends RowConverter<InternalRow> {
     }
 
     @Override
-    public SeaTunnelRow convert(InternalRow engineRow) throws IOException {
+    public SeaTunnelRow reconvert(InternalRow engineRow) throws IOException {
         return (SeaTunnelRow) reconvert(engineRow, dataType);
     }
 
-    public static Object reconvert(Object field, SeaTunnelDataType<?> dataType) {
+    private static Object reconvert(Object field, SeaTunnelDataType<?> dataType) {
         if (field == null) {
             return null;
         }
@@ -170,12 +175,33 @@ public final class InternalRowConverter extends RowConverter<InternalRow> {
                 return field.toString();
             case DECIMAL:
                 return ((Decimal) field).toJavaBigDecimal();
+            case ARRAY:
+                ArrayData arrayData = (ArrayData) field;
+                BasicType<?> elementType = ((ArrayType<?, ?>) dataType).getElementType();
+                switch (elementType.getSqlType()) {
+                    case INT:
+                        return arrayData.toIntArray();
+                    case TINYINT:
+                        return arrayData.toByteArray();
+                    case SMALLINT:
+                        return arrayData.toShortArray();
+                    case BIGINT:
+                        return arrayData.toLongArray();
+                    case BOOLEAN:
+                        return arrayData.toBooleanArray();
+                    case FLOAT:
+                        return arrayData.toFloatArray();
+                    case DOUBLE:
+                        return arrayData.toDoubleArray();
+                    default:
+                        return arrayData.array();
+                }
             default:
                 return field;
         }
     }
 
-    public static SeaTunnelRow reconvert(InternalRow engineRow, SeaTunnelRowType rowType) {
+    private static SeaTunnelRow reconvert(InternalRow engineRow, SeaTunnelRowType rowType) {
         Object[] fields = new Object[engineRow.numFields()];
         for (int i = 0; i < engineRow.numFields(); i++) {
             fields[i] = reconvert(engineRow.get(i, TypeConverterUtils.convert(rowType.getFieldType(i))),