You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/15 18:32:22 UTC

[GitHub] [incubator-seatunnel] TyrantLucifer opened a new pull request, #2750: [Bug][Core] Fix the bug that can not convert array and map

TyrantLucifer opened a new pull request, #2750:
URL: https://github.com/apache/incubator-seatunnel/pull/2750

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   Fix the bug that can not convert array and map
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [x] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #2750: [Bug][Core] Fix the bug that can not convert array and map

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on PR #2750:
URL: https://github.com/apache/incubator-seatunnel/pull/2750#issuecomment-1250051499

   Hi, I fix the map convert in #2767 . Can you copy the code to your PR? Or commit your array change to my PR. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin merged pull request #2750: [Bug][Core] Fix the bug that can not convert array and map

Posted by GitBox <gi...@apache.org>.
ashulin merged PR #2750:
URL: https://github.com/apache/incubator-seatunnel/pull/2750


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin commented on a diff in pull request #2750: [Bug][Core] Fix the bug that can not convert array and map

Posted by GitBox <gi...@apache.org>.
ashulin commented on code in PR #2750:
URL: https://github.com/apache/incubator-seatunnel/pull/2750#discussion_r973567728


##########
seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java:
##########
@@ -213,4 +225,60 @@ private static SeaTunnelRow reconvert(InternalRow engineRow, SeaTunnelRowType ro
         }
         return new SeaTunnelRow(fields);
     }
+
+    private static DataType seaTunnelType2SparkType(SeaTunnelDataType<?> seaTunnelDataType) {
+        SqlType sqlType = seaTunnelDataType.getSqlType();
+        switch (sqlType) {
+            case ARRAY:
+                BasicType<?> elementType = ((ArrayType<?, ?>) seaTunnelDataType).getElementType();
+                return DataTypes.createArrayType(seaTunnelType2SparkType(elementType));
+            case MAP:
+                SeaTunnelDataType<?> keyType = ((MapType<?, ?>) seaTunnelDataType).getKeyType();
+                SeaTunnelDataType<?> valueType = ((MapType<?, ?>) seaTunnelDataType).getValueType();
+                return DataTypes.createMapType(seaTunnelType2SparkType(keyType), seaTunnelType2SparkType(valueType));
+            case STRING:
+                return DataTypes.StringType;
+            case BOOLEAN:
+                return DataTypes.BooleanType;
+            case TINYINT:
+                return DataTypes.ByteType;
+            case SMALLINT:
+                return DataTypes.ShortType;
+            case INT:
+                return DataTypes.IntegerType;
+            case BIGINT:
+                return DataTypes.LongType;
+            case FLOAT:
+                return DataTypes.FloatType;
+            case DOUBLE:
+                return DataTypes.DoubleType;
+            case DECIMAL:
+                int precision = ((DecimalType) seaTunnelDataType).getPrecision();
+                int scale = ((DecimalType) seaTunnelDataType).getScale();
+                return DataTypes.createDecimalType(precision, scale);
+            case NULL:
+                return DataTypes.NullType;
+            case BYTES:
+                return DataTypes.BinaryType;
+            case DATE:
+                return DataTypes.DateType;
+            case TIMESTAMP:
+                return DataTypes.TimestampType;
+            case TIME:
+                throw new RuntimeException("SeaTunnel not support time type, it will be supported in the future");
+            case ROW:
+                ArrayList<StructField> structFields = new ArrayList<>();
+                SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) seaTunnelDataType).getFieldTypes();
+                String[] fieldNames = ((SeaTunnelRowType) seaTunnelDataType).getFieldNames();
+                for (int i = 0; i < fieldNames.length; i++) {
+                    StructField structField = new StructField(fieldNames[i], seaTunnelType2SparkType(fieldTypes[i]), true, null);
+                    structFields.add(structField);
+                }
+                return DataTypes.createStructType(structFields);
+            default:
+                // do nothing
+                // never get in there
+                return null;
+        }
+    }

Review Comment:
   Please use `TypeConverterUtils#convert(SeaTunnelDataType<?>)`



##########
seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java:
##########
@@ -111,23 +128,35 @@ private static InternalRow convert(SeaTunnelRow seaTunnelRow, SeaTunnelRowType r
 
     private static Object convertMap(Map<?, ?> mapData, MapType<?, ?> mapType, BiFunction<Object, SeaTunnelDataType<?>, Object> convertFunction) {
         if (mapData == null || mapData.size() == 0) {
-            return mapData;
+            return ArrayBasedMapData.apply(new Object[]{}, new Object[]{});
         }
-        switch (mapType.getValueType().getSqlType()) {
-            case MAP:
-            case ROW:
-            case DATE:
-            case TIME:
-            case TIMESTAMP:
-                Map<Object, Object> newMap = new HashMap<>(mapData.size());
-                mapData.forEach((key, value) -> {
-                    SeaTunnelDataType<?> valueType = mapType.getValueType();
-                    newMap.put(key, convertFunction.apply(value, valueType));
-                });
-                return newMap;
-            default:
-                return mapData;
+        Map<Object, Object> newMap = new HashMap<>(mapData.size());
+        mapData.forEach((key, value) -> {
+            SeaTunnelDataType<?> keyType = mapType.getKeyType();
+            SeaTunnelDataType<?> valueType = mapType.getValueType();
+            newMap.put(convertFunction.apply(key, keyType), convertFunction.apply(value, valueType));
+        });
+        Object[] keys = newMap.keySet().toArray();
+        Object[] values = newMap.values().toArray();
+        return ArrayBasedMapData.apply(keys, values);
+    }
+
+    private static Object reconvertMap(MapData mapData, MapType<?, ?> mapType, BiFunction<Object, SeaTunnelDataType<?>, Object> convertFunction) {
+        if (mapData == null || mapData.numElements() == 0) {
+            return Collections.emptyMap();
+        }
+        Map<Object, Object> newMap = new HashMap<>(mapData.numElements());
+        int num = mapData.numElements();
+        SeaTunnelDataType<?> keyType = mapType.getKeyType();
+        SeaTunnelDataType<?> valueType = mapType.getValueType();
+        Object[] keys = mapData.keyArray().toObjectArray(seaTunnelType2SparkType(keyType));
+        Object[] values = mapData.valueArray().toObjectArray(seaTunnelType2SparkType(valueType));
+        for (int i = 0; i < num; i++) {
+            keys[i] = convertFunction.apply(keys[i], keyType);
+            values[i] = convertFunction.apply(values[i], valueType);
+            newMap.put(keys[i], values[i]);
         }

Review Comment:
   Can't it be like this here?
   
   ```java
           Map<Object, Object> newMap = new HashMap<>(mapData.size());
           mapData.forEach((key, value) -> {
               SeaTunnelDataType<?> keyType = mapType.getKeyType();
               SeaTunnelDataType<?> valueType = mapType.getValueType();
               newMap.put(convertFunction.apply(key, keyType), convertFunction.apply(value, valueType));
           });
   ```



##########
seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java:
##########
@@ -111,23 +128,35 @@ private static InternalRow convert(SeaTunnelRow seaTunnelRow, SeaTunnelRowType r
 
     private static Object convertMap(Map<?, ?> mapData, MapType<?, ?> mapType, BiFunction<Object, SeaTunnelDataType<?>, Object> convertFunction) {
         if (mapData == null || mapData.size() == 0) {
-            return mapData;
+            return ArrayBasedMapData.apply(new Object[]{}, new Object[]{});
         }
-        switch (mapType.getValueType().getSqlType()) {
-            case MAP:
-            case ROW:
-            case DATE:
-            case TIME:
-            case TIMESTAMP:
-                Map<Object, Object> newMap = new HashMap<>(mapData.size());
-                mapData.forEach((key, value) -> {
-                    SeaTunnelDataType<?> valueType = mapType.getValueType();
-                    newMap.put(key, convertFunction.apply(value, valueType));
-                });
-                return newMap;
-            default:
-                return mapData;
+        Map<Object, Object> newMap = new HashMap<>(mapData.size());
+        mapData.forEach((key, value) -> {
+            SeaTunnelDataType<?> keyType = mapType.getKeyType();
+            SeaTunnelDataType<?> valueType = mapType.getValueType();
+            newMap.put(convertFunction.apply(key, keyType), convertFunction.apply(value, valueType));
+        });
+        Object[] keys = newMap.keySet().toArray();
+        Object[] values = newMap.values().toArray();
+        return ArrayBasedMapData.apply(keys, values);
+    }
+
+    private static Object reconvertMap(MapData mapData, MapType<?, ?> mapType, BiFunction<Object, SeaTunnelDataType<?>, Object> convertFunction) {
+        if (mapData == null || mapData.numElements() == 0) {
+            return Collections.emptyMap();
+        }
+        Map<Object, Object> newMap = new HashMap<>(mapData.numElements());
+        int num = mapData.numElements();
+        SeaTunnelDataType<?> keyType = mapType.getKeyType();
+        SeaTunnelDataType<?> valueType = mapType.getValueType();
+        Object[] keys = mapData.keyArray().toObjectArray(seaTunnelType2SparkType(keyType));
+        Object[] values = mapData.valueArray().toObjectArray(seaTunnelType2SparkType(valueType));
+        for (int i = 0; i < num; i++) {
+            keys[i] = convertFunction.apply(keys[i], keyType);
+            values[i] = convertFunction.apply(values[i], valueType);
+            newMap.put(keys[i], values[i]);
         }

Review Comment:
   If it can't, remove the `convertFunction` parameter in `#convertMap` and `#reconvertMap`, as it was meant to support both convert and reconvert, which is useless now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #2750: [Bug][Core] Fix the bug that can not convert array and map

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on code in PR #2750:
URL: https://github.com/apache/incubator-seatunnel/pull/2750#discussion_r973594479


##########
seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java:
##########
@@ -213,4 +225,60 @@ private static SeaTunnelRow reconvert(InternalRow engineRow, SeaTunnelRowType ro
         }
         return new SeaTunnelRow(fields);
     }
+
+    private static DataType seaTunnelType2SparkType(SeaTunnelDataType<?> seaTunnelDataType) {
+        SqlType sqlType = seaTunnelDataType.getSqlType();
+        switch (sqlType) {
+            case ARRAY:
+                BasicType<?> elementType = ((ArrayType<?, ?>) seaTunnelDataType).getElementType();
+                return DataTypes.createArrayType(seaTunnelType2SparkType(elementType));
+            case MAP:
+                SeaTunnelDataType<?> keyType = ((MapType<?, ?>) seaTunnelDataType).getKeyType();
+                SeaTunnelDataType<?> valueType = ((MapType<?, ?>) seaTunnelDataType).getValueType();
+                return DataTypes.createMapType(seaTunnelType2SparkType(keyType), seaTunnelType2SparkType(valueType));
+            case STRING:
+                return DataTypes.StringType;
+            case BOOLEAN:
+                return DataTypes.BooleanType;
+            case TINYINT:
+                return DataTypes.ByteType;
+            case SMALLINT:
+                return DataTypes.ShortType;
+            case INT:
+                return DataTypes.IntegerType;
+            case BIGINT:
+                return DataTypes.LongType;
+            case FLOAT:
+                return DataTypes.FloatType;
+            case DOUBLE:
+                return DataTypes.DoubleType;
+            case DECIMAL:
+                int precision = ((DecimalType) seaTunnelDataType).getPrecision();
+                int scale = ((DecimalType) seaTunnelDataType).getScale();
+                return DataTypes.createDecimalType(precision, scale);
+            case NULL:
+                return DataTypes.NullType;
+            case BYTES:
+                return DataTypes.BinaryType;
+            case DATE:
+                return DataTypes.DateType;
+            case TIMESTAMP:
+                return DataTypes.TimestampType;
+            case TIME:
+                throw new RuntimeException("SeaTunnel not support time type, it will be supported in the future");
+            case ROW:
+                ArrayList<StructField> structFields = new ArrayList<>();
+                SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) seaTunnelDataType).getFieldTypes();
+                String[] fieldNames = ((SeaTunnelRowType) seaTunnelDataType).getFieldNames();
+                for (int i = 0; i < fieldNames.length; i++) {
+                    StructField structField = new StructField(fieldNames[i], seaTunnelType2SparkType(fieldTypes[i]), true, null);
+                    structFields.add(structField);
+                }
+                return DataTypes.createStructType(structFields);
+            default:
+                // do nothing
+                // never get in there
+                return null;
+        }
+    }

Review Comment:
   Sorry, I don't see the code for this piece. I will correct it immediately after



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org