You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/17 10:30:52 UTC

[GitHub] [incubator-seatunnel] ashulin commented on a diff in pull request #2750: [Bug][Core] Fix the bug that can not convert array and map

ashulin commented on code in PR #2750:
URL: https://github.com/apache/incubator-seatunnel/pull/2750#discussion_r973567728


##########
seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java:
##########
@@ -213,4 +225,60 @@ private static SeaTunnelRow reconvert(InternalRow engineRow, SeaTunnelRowType ro
         }
         return new SeaTunnelRow(fields);
     }
+
+    private static DataType seaTunnelType2SparkType(SeaTunnelDataType<?> seaTunnelDataType) {
+        SqlType sqlType = seaTunnelDataType.getSqlType();
+        switch (sqlType) {
+            case ARRAY:
+                BasicType<?> elementType = ((ArrayType<?, ?>) seaTunnelDataType).getElementType();
+                return DataTypes.createArrayType(seaTunnelType2SparkType(elementType));
+            case MAP:
+                SeaTunnelDataType<?> keyType = ((MapType<?, ?>) seaTunnelDataType).getKeyType();
+                SeaTunnelDataType<?> valueType = ((MapType<?, ?>) seaTunnelDataType).getValueType();
+                return DataTypes.createMapType(seaTunnelType2SparkType(keyType), seaTunnelType2SparkType(valueType));
+            case STRING:
+                return DataTypes.StringType;
+            case BOOLEAN:
+                return DataTypes.BooleanType;
+            case TINYINT:
+                return DataTypes.ByteType;
+            case SMALLINT:
+                return DataTypes.ShortType;
+            case INT:
+                return DataTypes.IntegerType;
+            case BIGINT:
+                return DataTypes.LongType;
+            case FLOAT:
+                return DataTypes.FloatType;
+            case DOUBLE:
+                return DataTypes.DoubleType;
+            case DECIMAL:
+                int precision = ((DecimalType) seaTunnelDataType).getPrecision();
+                int scale = ((DecimalType) seaTunnelDataType).getScale();
+                return DataTypes.createDecimalType(precision, scale);
+            case NULL:
+                return DataTypes.NullType;
+            case BYTES:
+                return DataTypes.BinaryType;
+            case DATE:
+                return DataTypes.DateType;
+            case TIMESTAMP:
+                return DataTypes.TimestampType;
+            case TIME:
+                throw new RuntimeException("SeaTunnel not support time type, it will be supported in the future");
+            case ROW:
+                ArrayList<StructField> structFields = new ArrayList<>();
+                SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) seaTunnelDataType).getFieldTypes();
+                String[] fieldNames = ((SeaTunnelRowType) seaTunnelDataType).getFieldNames();
+                for (int i = 0; i < fieldNames.length; i++) {
+                    StructField structField = new StructField(fieldNames[i], seaTunnelType2SparkType(fieldTypes[i]), true, null);
+                    structFields.add(structField);
+                }
+                return DataTypes.createStructType(structFields);
+            default:
+                // do nothing
+                // never get in there
+                return null;
+        }
+    }

Review Comment:
   Please use `TypeConverterUtils#convert(SeaTunnelDataType<?>)`



##########
seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java:
##########
@@ -111,23 +128,35 @@ private static InternalRow convert(SeaTunnelRow seaTunnelRow, SeaTunnelRowType r
 
     private static Object convertMap(Map<?, ?> mapData, MapType<?, ?> mapType, BiFunction<Object, SeaTunnelDataType<?>, Object> convertFunction) {
         if (mapData == null || mapData.size() == 0) {
-            return mapData;
+            return ArrayBasedMapData.apply(new Object[]{}, new Object[]{});
         }
-        switch (mapType.getValueType().getSqlType()) {
-            case MAP:
-            case ROW:
-            case DATE:
-            case TIME:
-            case TIMESTAMP:
-                Map<Object, Object> newMap = new HashMap<>(mapData.size());
-                mapData.forEach((key, value) -> {
-                    SeaTunnelDataType<?> valueType = mapType.getValueType();
-                    newMap.put(key, convertFunction.apply(value, valueType));
-                });
-                return newMap;
-            default:
-                return mapData;
+        Map<Object, Object> newMap = new HashMap<>(mapData.size());
+        mapData.forEach((key, value) -> {
+            SeaTunnelDataType<?> keyType = mapType.getKeyType();
+            SeaTunnelDataType<?> valueType = mapType.getValueType();
+            newMap.put(convertFunction.apply(key, keyType), convertFunction.apply(value, valueType));
+        });
+        Object[] keys = newMap.keySet().toArray();
+        Object[] values = newMap.values().toArray();
+        return ArrayBasedMapData.apply(keys, values);
+    }
+
+    private static Object reconvertMap(MapData mapData, MapType<?, ?> mapType, BiFunction<Object, SeaTunnelDataType<?>, Object> convertFunction) {
+        if (mapData == null || mapData.numElements() == 0) {
+            return Collections.emptyMap();
+        }
+        Map<Object, Object> newMap = new HashMap<>(mapData.numElements());
+        int num = mapData.numElements();
+        SeaTunnelDataType<?> keyType = mapType.getKeyType();
+        SeaTunnelDataType<?> valueType = mapType.getValueType();
+        Object[] keys = mapData.keyArray().toObjectArray(seaTunnelType2SparkType(keyType));
+        Object[] values = mapData.valueArray().toObjectArray(seaTunnelType2SparkType(valueType));
+        for (int i = 0; i < num; i++) {
+            keys[i] = convertFunction.apply(keys[i], keyType);
+            values[i] = convertFunction.apply(values[i], valueType);
+            newMap.put(keys[i], values[i]);
         }

Review Comment:
   Can't it be like this here?
   
   ```java
           Map<Object, Object> newMap = new HashMap<>(mapData.size());
           mapData.forEach((key, value) -> {
               SeaTunnelDataType<?> keyType = mapType.getKeyType();
               SeaTunnelDataType<?> valueType = mapType.getValueType();
               newMap.put(convertFunction.apply(key, keyType), convertFunction.apply(value, valueType));
           });
   ```



##########
seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java:
##########
@@ -111,23 +128,35 @@ private static InternalRow convert(SeaTunnelRow seaTunnelRow, SeaTunnelRowType r
 
     private static Object convertMap(Map<?, ?> mapData, MapType<?, ?> mapType, BiFunction<Object, SeaTunnelDataType<?>, Object> convertFunction) {
         if (mapData == null || mapData.size() == 0) {
-            return mapData;
+            return ArrayBasedMapData.apply(new Object[]{}, new Object[]{});
         }
-        switch (mapType.getValueType().getSqlType()) {
-            case MAP:
-            case ROW:
-            case DATE:
-            case TIME:
-            case TIMESTAMP:
-                Map<Object, Object> newMap = new HashMap<>(mapData.size());
-                mapData.forEach((key, value) -> {
-                    SeaTunnelDataType<?> valueType = mapType.getValueType();
-                    newMap.put(key, convertFunction.apply(value, valueType));
-                });
-                return newMap;
-            default:
-                return mapData;
+        Map<Object, Object> newMap = new HashMap<>(mapData.size());
+        mapData.forEach((key, value) -> {
+            SeaTunnelDataType<?> keyType = mapType.getKeyType();
+            SeaTunnelDataType<?> valueType = mapType.getValueType();
+            newMap.put(convertFunction.apply(key, keyType), convertFunction.apply(value, valueType));
+        });
+        Object[] keys = newMap.keySet().toArray();
+        Object[] values = newMap.values().toArray();
+        return ArrayBasedMapData.apply(keys, values);
+    }
+
+    private static Object reconvertMap(MapData mapData, MapType<?, ?> mapType, BiFunction<Object, SeaTunnelDataType<?>, Object> convertFunction) {
+        if (mapData == null || mapData.numElements() == 0) {
+            return Collections.emptyMap();
+        }
+        Map<Object, Object> newMap = new HashMap<>(mapData.numElements());
+        int num = mapData.numElements();
+        SeaTunnelDataType<?> keyType = mapType.getKeyType();
+        SeaTunnelDataType<?> valueType = mapType.getValueType();
+        Object[] keys = mapData.keyArray().toObjectArray(seaTunnelType2SparkType(keyType));
+        Object[] values = mapData.valueArray().toObjectArray(seaTunnelType2SparkType(valueType));
+        for (int i = 0; i < num; i++) {
+            keys[i] = convertFunction.apply(keys[i], keyType);
+            values[i] = convertFunction.apply(values[i], valueType);
+            newMap.put(keys[i], values[i]);
         }

Review Comment:
   If it can't, remove the `convertFunction` parameter in `#convertMap` and `#reconvertMap`, as it was meant to support both convert and reconvert, which is useless now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org