You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/06/20 02:48:19 UTC
[incubator-seatunnel] branch dev updated: [Jackson] [Core] fix jackson type convert error (#2031)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4a6e1a2b1 [Jackson] [Core] fix jackson type convert error (#2031)
4a6e1a2b1 is described below
commit 4a6e1a2b1e136899121c414d8b6efd3ec13d9d27
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Jun 20 10:48:15 2022 +0800
[Jackson] [Core] fix jackson type convert error (#2031)
* update contribute-plugin.md about plugin mapping.
* update contribute-plugin.md about plugin mapping.
* fix jackson type convert error
* fix jackson type convert error
Co-authored-by: TrickyZerg <32...@users.noreply.github.com>
---
.../apache/seatunnel/flink/util/SchemaUtil.java | 44 +++++++++++++++-------
.../apache/seatunnel/common/utils/JsonUtils.java | 5 +++
.../flink/file/source/JsonRowInputFormat.java | 27 ++++++-------
.../seatunnel/fink/file/source/FileSourceTest.java | 2 +
4 files changed, 51 insertions(+), 27 deletions(-)
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
index 8a4b79dd8..1e48b8c18 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
@@ -17,14 +17,20 @@
package org.apache.seatunnel.flink.util;
-import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.flink.enums.FormatType;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.DecimalNode;
+import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.FloatNode;
+import com.fasterxml.jackson.databind.node.IntNode;
+import com.fasterxml.jackson.databind.node.LongNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
@@ -42,7 +48,7 @@ import org.apache.flink.types.Row;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
-import java.math.BigDecimal;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -112,18 +118,23 @@ public final class SchemaUtil {
}
private static void getJsonSchema(Schema schema, ObjectNode json) {
- Map<String, Object> jsonMap = JsonUtils.toMap(json);
- for (Map.Entry<String, Object> entry : jsonMap.entrySet()) {
+ Iterator<Map.Entry<String, JsonNode>> nodeIterator = json.fields();
+ while (nodeIterator.hasNext()) {
+ Map.Entry<String, JsonNode> entry = nodeIterator.next();
String key = entry.getKey();
Object value = entry.getValue();
- if (value instanceof String) {
+ if (value instanceof TextNode) {
schema.field(key, Types.STRING());
- } else if (value instanceof Integer) {
+ } else if (value instanceof IntNode) {
schema.field(key, Types.INT());
- } else if (value instanceof Long) {
+ } else if (value instanceof LongNode) {
schema.field(key, Types.LONG());
- } else if (value instanceof BigDecimal) {
+ } else if (value instanceof DecimalNode) {
schema.field(key, Types.JAVA_BIG_DEC());
+ } else if (value instanceof FloatNode) {
+ schema.field(key, Types.FLOAT());
+ } else if (value instanceof DoubleNode) {
+ schema.field(key, Types.DOUBLE());
} else if (value instanceof ObjectNode) {
schema.field(key, getTypeInformation((ObjectNode) value));
} else if (value instanceof ArrayNode) {
@@ -188,20 +199,25 @@ public final class SchemaUtil {
int size = json.size();
String[] fields = new String[size];
TypeInformation<?>[] informations = new TypeInformation[size];
- Map<String, Object> jsonMap = JsonUtils.toMap(json);
int i = 0;
- for (Map.Entry<String, Object> entry : jsonMap.entrySet()) {
+ Iterator<Map.Entry<String, JsonNode>> nodeIterator = json.fields();
+ while (nodeIterator.hasNext()) {
+ Map.Entry<String, JsonNode> entry = nodeIterator.next();
String key = entry.getKey();
Object value = entry.getValue();
fields[i] = key;
- if (value instanceof String) {
+ if (value instanceof TextNode) {
informations[i] = Types.STRING();
- } else if (value instanceof Integer) {
+ } else if (value instanceof IntNode) {
informations[i] = Types.INT();
- } else if (value instanceof Long) {
+ } else if (value instanceof LongNode) {
informations[i] = Types.LONG();
- } else if (value instanceof BigDecimal) {
+ } else if (value instanceof DecimalNode) {
informations[i] = Types.JAVA_BIG_DEC();
+ } else if (value instanceof FloatNode) {
+ informations[i] = Types.FLOAT();
+ } else if (value instanceof DoubleNode) {
+ informations[i] = Types.DOUBLE();
} else if (value instanceof ObjectNode) {
informations[i] = getTypeInformation((ObjectNode) value);
} else if (value instanceof ArrayNode) {
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
index 5f362059d..eb2ded86c 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
@@ -24,6 +24,7 @@ import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_G
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
@@ -74,6 +75,10 @@ public class JsonUtils {
return OBJECT_MAPPER.valueToTree(obj);
}
+ public static JsonNode stringToJsonNode(String obj) throws JsonProcessingException {
+ return OBJECT_MAPPER.readTree(obj);
+ }
+
/**
* json representation of object
*
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/JsonRowInputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/JsonRowInputFormat.java
index bac383586..1b3cd5ef6 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/JsonRowInputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/JsonRowInputFormat.java
@@ -19,8 +19,6 @@ package org.apache.seatunnel.flink.file.source;
import org.apache.seatunnel.common.utils.JsonUtils;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.io.DelimitedInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
@@ -31,6 +29,8 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.types.Row;
import java.io.IOException;
+import java.util.List;
+import java.util.Map;
public class JsonRowInputFormat extends DelimitedInputFormat<Row> implements ResultTypeQueryable<Row> {
@@ -59,7 +59,7 @@ public class JsonRowInputFormat extends DelimitedInputFormat<Row> implements Res
}
String str = new String(bytes, offset, numBytes, this.charsetName);
- ObjectNode json = JsonUtils.parseObject(str);
+ Map<String, Object> json = JsonUtils.toMap(JsonUtils.stringToJsonNode(str));
Row reuseRow;
if (reuse == null) {
reuseRow = new Row(rowTypeInfo.getArity());
@@ -70,26 +70,27 @@ public class JsonRowInputFormat extends DelimitedInputFormat<Row> implements Res
return reuseRow;
}
- private void setJsonRow(Row row, ObjectNode json, RowTypeInfo rowTypeInfo) {
+ private void setJsonRow(Row row, Map<String, Object> json, RowTypeInfo rowTypeInfo) {
String[] fieldNames = rowTypeInfo.getFieldNames();
int i = 0;
for (String name : fieldNames) {
Object value = json.get(name);
- if (value instanceof ObjectNode) {
- TypeInformation information = rowTypeInfo.getTypeAt(name);
+ if (value instanceof Map) {
+ TypeInformation<?> information = rowTypeInfo.getTypeAt(name);
Row r = new Row(information.getArity());
- setJsonRow(r, (ObjectNode) value, (RowTypeInfo) information);
+ setJsonRow(r, (Map<String, Object>) value, (RowTypeInfo) information);
row.setField(i++, r);
- } else if (value instanceof ArrayNode) {
- ObjectArrayTypeInfo information = (ObjectArrayTypeInfo) rowTypeInfo.getTypeAt(name);
- ArrayNode array = (ArrayNode) value;
+ } else if (value instanceof List) {
+ ObjectArrayTypeInfo<?, ?> information =
+ (ObjectArrayTypeInfo<?, ?>) rowTypeInfo.getTypeAt(name);
+ List<?> array = (List<?>) value;
Object[] objects = new Object[array.size()];
int j = 0;
for (Object o : array) {
- if (o instanceof ObjectNode) {
- TypeInformation componentInfo = information.getComponentInfo();
+ if (o instanceof Map) {
+ TypeInformation<?> componentInfo = information.getComponentInfo();
Row r = new Row(componentInfo.getArity());
- setJsonRow(r, (ObjectNode) o, (RowTypeInfo) componentInfo);
+ setJsonRow(r, (Map<String, Object>) o, (RowTypeInfo) componentInfo);
objects[j++] = r;
} else {
objects[j++] = o;
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/fink/file/source/FileSourceTest.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/fink/file/source/FileSourceTest.java
index 7a5829655..4113c6e14 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/fink/file/source/FileSourceTest.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/fink/file/source/FileSourceTest.java
@@ -46,6 +46,7 @@ public class FileSourceTest {
FlinkEnvironment flinkEnvironment = createFlinkStreamEnvironment(configFile);
try (FileSource fileSource = createFileSource(configFile, flinkEnvironment)) {
+ fileSource.prepare(flinkEnvironment);
DataSet<Row> data = fileSource.getData(flinkEnvironment);
Assert.assertNotNull(data);
}
@@ -57,6 +58,7 @@ public class FileSourceTest {
FlinkEnvironment flinkEnvironment = createFlinkStreamEnvironment(configFile);
try (FileSource fileSource = createFileSource(configFile, flinkEnvironment)) {
+ fileSource.prepare(flinkEnvironment);
DataSet<Row> data = fileSource.getData(flinkEnvironment);
Assert.assertNotNull(data);
}