You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/06/20 02:48:19 UTC

[incubator-seatunnel] branch dev updated: [Jackson] [Core] fix jackson type convert error (#2031)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4a6e1a2b1 [Jackson] [Core] fix jackson type convert error (#2031)
4a6e1a2b1 is described below

commit 4a6e1a2b1e136899121c414d8b6efd3ec13d9d27
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Jun 20 10:48:15 2022 +0800

    [Jackson] [Core] fix jackson type convert error (#2031)
    
    * update contribute-plugin.md about plugin mapping.
    
    * update contribute-plugin.md about plugin mapping.
    
    * fix jackson type convert error
    
    * fix jackson type convert error
    
    Co-authored-by: TrickyZerg <32...@users.noreply.github.com>
---
 .../apache/seatunnel/flink/util/SchemaUtil.java    | 44 +++++++++++++++-------
 .../apache/seatunnel/common/utils/JsonUtils.java   |  5 +++
 .../flink/file/source/JsonRowInputFormat.java      | 27 ++++++-------
 .../seatunnel/fink/file/source/FileSourceTest.java |  2 +
 4 files changed, 51 insertions(+), 27 deletions(-)

diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
index 8a4b79dd8..1e48b8c18 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
@@ -17,14 +17,20 @@
 
 package org.apache.seatunnel.flink.util;
 
-import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.flink.enums.FormatType;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.DecimalNode;
+import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.FloatNode;
+import com.fasterxml.jackson.databind.node.IntNode;
+import com.fasterxml.jackson.databind.node.LongNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
@@ -42,7 +48,7 @@ import org.apache.flink.types.Row;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
-import java.math.BigDecimal;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -112,18 +118,23 @@ public final class SchemaUtil {
     }
 
     private static void getJsonSchema(Schema schema, ObjectNode json) {
-        Map<String, Object> jsonMap = JsonUtils.toMap(json);
-        for (Map.Entry<String, Object> entry : jsonMap.entrySet()) {
+        Iterator<Map.Entry<String, JsonNode>> nodeIterator = json.fields();
+        while (nodeIterator.hasNext()) {
+            Map.Entry<String, JsonNode> entry = nodeIterator.next();
             String key = entry.getKey();
             Object value = entry.getValue();
-            if (value instanceof String) {
+            if (value instanceof TextNode) {
                 schema.field(key, Types.STRING());
-            } else if (value instanceof Integer) {
+            } else if (value instanceof IntNode) {
                 schema.field(key, Types.INT());
-            } else if (value instanceof Long) {
+            } else if (value instanceof LongNode) {
                 schema.field(key, Types.LONG());
-            } else if (value instanceof BigDecimal) {
+            } else if (value instanceof DecimalNode) {
                 schema.field(key, Types.JAVA_BIG_DEC());
+            } else if (value instanceof FloatNode) {
+                schema.field(key,  Types.FLOAT());
+            } else if (value instanceof DoubleNode) {
+                schema.field(key, Types.DOUBLE());
             } else if (value instanceof ObjectNode) {
                 schema.field(key, getTypeInformation((ObjectNode) value));
             } else if (value instanceof ArrayNode) {
@@ -188,20 +199,25 @@ public final class SchemaUtil {
         int size = json.size();
         String[] fields = new String[size];
         TypeInformation<?>[] informations = new TypeInformation[size];
-        Map<String, Object> jsonMap = JsonUtils.toMap(json);
         int i = 0;
-        for (Map.Entry<String, Object> entry : jsonMap.entrySet()) {
+        Iterator<Map.Entry<String, JsonNode>> nodeIterator = json.fields();
+        while (nodeIterator.hasNext()) {
+            Map.Entry<String, JsonNode> entry = nodeIterator.next();
             String key = entry.getKey();
             Object value = entry.getValue();
             fields[i] = key;
-            if (value instanceof String) {
+            if (value instanceof TextNode) {
                 informations[i] = Types.STRING();
-            } else if (value instanceof Integer) {
+            } else if (value instanceof IntNode) {
                 informations[i] = Types.INT();
-            } else if (value instanceof Long) {
+            } else if (value instanceof LongNode) {
                 informations[i] = Types.LONG();
-            } else if (value instanceof BigDecimal) {
+            } else if (value instanceof DecimalNode) {
                 informations[i] = Types.JAVA_BIG_DEC();
+            } else if (value instanceof FloatNode) {
+                informations[i] = Types.FLOAT();
+            } else if (value instanceof DoubleNode) {
+                informations[i] = Types.DOUBLE();
             } else if (value instanceof ObjectNode) {
                 informations[i] = getTypeInformation((ObjectNode) value);
             } else if (value instanceof ArrayNode) {
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
index 5f362059d..eb2ded86c 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
@@ -24,6 +24,7 @@ import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_G
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.JsonDeserializer;
@@ -74,6 +75,10 @@ public class JsonUtils {
         return OBJECT_MAPPER.valueToTree(obj);
     }
 
+    public static JsonNode stringToJsonNode(String obj) throws JsonProcessingException {
+        return OBJECT_MAPPER.readTree(obj);
+    }
+
     /**
      * json representation of object
      *
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/JsonRowInputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/JsonRowInputFormat.java
index bac383586..1b3cd5ef6 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/JsonRowInputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/JsonRowInputFormat.java
@@ -19,8 +19,6 @@ package org.apache.seatunnel.flink.file.source;
 
 import org.apache.seatunnel.common.utils.JsonUtils;
 
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.flink.api.common.io.DelimitedInputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
@@ -31,6 +29,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.Row;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Map;
 
 public class JsonRowInputFormat extends DelimitedInputFormat<Row> implements ResultTypeQueryable<Row> {
 
@@ -59,7 +59,7 @@ public class JsonRowInputFormat extends DelimitedInputFormat<Row> implements Res
         }
 
         String str = new String(bytes, offset, numBytes, this.charsetName);
-        ObjectNode json = JsonUtils.parseObject(str);
+        Map<String, Object> json = JsonUtils.toMap(JsonUtils.stringToJsonNode(str));
         Row reuseRow;
         if (reuse == null) {
             reuseRow = new Row(rowTypeInfo.getArity());
@@ -70,26 +70,27 @@ public class JsonRowInputFormat extends DelimitedInputFormat<Row> implements Res
         return reuseRow;
     }
 
-    private void setJsonRow(Row row, ObjectNode json, RowTypeInfo rowTypeInfo) {
+    private void setJsonRow(Row row, Map<String, Object> json, RowTypeInfo rowTypeInfo) {
         String[] fieldNames = rowTypeInfo.getFieldNames();
         int i = 0;
         for (String name : fieldNames) {
             Object value = json.get(name);
-            if (value instanceof ObjectNode) {
-                TypeInformation information = rowTypeInfo.getTypeAt(name);
+            if (value instanceof Map) {
+                TypeInformation<?> information = rowTypeInfo.getTypeAt(name);
                 Row r = new Row(information.getArity());
-                setJsonRow(r, (ObjectNode) value, (RowTypeInfo) information);
+                setJsonRow(r, (Map<String, Object>) value, (RowTypeInfo) information);
                 row.setField(i++, r);
-            } else if (value instanceof ArrayNode) {
-                ObjectArrayTypeInfo information = (ObjectArrayTypeInfo) rowTypeInfo.getTypeAt(name);
-                ArrayNode array = (ArrayNode) value;
+            } else if (value instanceof List) {
+                ObjectArrayTypeInfo<?, ?> information =
+                        (ObjectArrayTypeInfo<?, ?>) rowTypeInfo.getTypeAt(name);
+                List<?> array = (List<?>) value;
                 Object[] objects = new Object[array.size()];
                 int j = 0;
                 for (Object o : array) {
-                    if (o instanceof ObjectNode) {
-                        TypeInformation componentInfo = information.getComponentInfo();
+                    if (o instanceof Map) {
+                        TypeInformation<?> componentInfo = information.getComponentInfo();
                         Row r = new Row(componentInfo.getArity());
-                        setJsonRow(r, (ObjectNode) o, (RowTypeInfo) componentInfo);
+                        setJsonRow(r, (Map<String, Object>) o, (RowTypeInfo) componentInfo);
                         objects[j++] = r;
                     } else {
                         objects[j++] = o;
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/fink/file/source/FileSourceTest.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/fink/file/source/FileSourceTest.java
index 7a5829655..4113c6e14 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/fink/file/source/FileSourceTest.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/fink/file/source/FileSourceTest.java
@@ -46,6 +46,7 @@ public class FileSourceTest {
         FlinkEnvironment flinkEnvironment = createFlinkStreamEnvironment(configFile);
 
         try (FileSource fileSource = createFileSource(configFile, flinkEnvironment)) {
+            fileSource.prepare(flinkEnvironment);
             DataSet<Row> data = fileSource.getData(flinkEnvironment);
             Assert.assertNotNull(data);
         }
@@ -57,6 +58,7 @@ public class FileSourceTest {
         FlinkEnvironment flinkEnvironment = createFlinkStreamEnvironment(configFile);
 
         try (FileSource fileSource = createFileSource(configFile, flinkEnvironment)) {
+            fileSource.prepare(flinkEnvironment);
             DataSet<Row> data = fileSource.getData(flinkEnvironment);
             Assert.assertNotNull(data);
         }