You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/09/29 04:04:21 UTC
[incubator-seatunnel] branch dev updated: [Bug][format][json] Fix jackson package conflict with spark (#2934)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1a92b8369 [Bug][format][json] Fix jackson package conflict with spark (#2934)
1a92b8369 is described below
commit 1a92b8369b500271de34a3d7419a2d045e04ce14
Author: hailin0 <wa...@apache.org>
AuthorDate: Thu Sep 29 12:04:16 2022 +0800
[Bug][format][json] Fix jackson package conflict with spark (#2934)
---
.../http/source/DeserializationCollector.java | 20 +-
.../seatunnel/e2e/connector/redis/RedisIT.java | 3 +-
seatunnel-formats/seatunnel-format-json/pom.xml | 43 ++++
.../format/json/JsonDeserializationSchema.java | 17 +-
.../seatunnel/format/json/JsonToRowConverters.java | 202 +++++++++++++-----
.../seatunnel/format/json/RowToJsonConverters.java | 228 +++++++++++++++------
6 files changed, 372 insertions(+), 141 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/DeserializationCollector.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/DeserializationCollector.java
index f01bc7ff0..59743c783 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/DeserializationCollector.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/DeserializationCollector.java
@@ -22,8 +22,6 @@ import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
import lombok.AllArgsConstructor;
import java.io.IOException;
@@ -35,26 +33,10 @@ public class DeserializationCollector {
public void collect(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
if (deserializationSchema instanceof JsonDeserializationSchema) {
- collectJson(message, (JsonDeserializationSchema) deserializationSchema, out);
+ ((JsonDeserializationSchema) deserializationSchema).collect(message, out);
} else {
SeaTunnelRow deserialize = deserializationSchema.deserialize(message);
out.collect(deserialize);
}
}
-
- private void collectJson(byte[] message,
- JsonDeserializationSchema jsonDeserializationSchema,
- Collector<SeaTunnelRow> out) throws IOException {
- JsonNode jsonNode = jsonDeserializationSchema.convertBytes(message);
- if (jsonNode.isArray()) {
- ArrayNode arrayNode = (ArrayNode) jsonNode;
- for (int i = 0; i < arrayNode.size(); i++) {
- SeaTunnelRow deserialize = jsonDeserializationSchema.convertJsonNode(arrayNode.get(i));
- out.collect(deserialize);
- }
- } else {
- SeaTunnelRow deserialize = jsonDeserializationSchema.convertJsonNode(jsonNode);
- out.collect(deserialize);
- }
- }
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
index 86e853ccd..8e82fc8b4 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
@@ -29,7 +29,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import lombok.extern.slf4j.Slf4j;
@@ -57,7 +56,6 @@ import java.util.stream.Stream;
import scala.Tuple2;
-@DisabledOnContainer(value = "spark:2.4.3", disabledReason = "json-format conflicts with the Jackson version of Spark-2.4.3, see:https://github.com/apache/incubator-seatunnel/issues/2929")
@Slf4j
public class RedisIT extends TestSuiteBase implements TestResource {
private static final String IMAGE = "redis:latest";
@@ -162,6 +160,7 @@ public class RedisIT extends TestSuiteBase implements TestResource {
private void initJedis() {
Jedis jedis = new Jedis(redisContainer.getHost(), redisContainer.getFirstMappedPort());
jedis.auth(PASSWORD);
+ jedis.ping();
this.jedis = jedis;
}
diff --git a/seatunnel-formats/seatunnel-format-json/pom.xml b/seatunnel-formats/seatunnel-format-json/pom.xml
index 7642c77e5..4b5070244 100644
--- a/seatunnel-formats/seatunnel-format-json/pom.xml
+++ b/seatunnel-formats/seatunnel-format-json/pom.xml
@@ -33,7 +33,50 @@
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.fasterxml.jackson</pattern>
+ <shadedPattern>${seatunnel.shade.package}.com.fasterxml.jackson</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- make sure that flatten runs after maven-shade-plugin -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>flatten-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
index 5d6f199a5..518ece6b8 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.CompositeType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -32,6 +33,7 @@ import com.fasterxml.jackson.core.json.JsonReadFeature;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.IOException;
@@ -107,7 +109,18 @@ public class JsonDeserializationSchema implements DeserializationSchema<SeaTunne
return convertJsonNode(convertBytes(message));
}
- public SeaTunnelRow convertJsonNode(JsonNode jsonNode) throws IOException {
+ public void collect(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
+ JsonNode jsonNode = convertBytes(message);
+ if (jsonNode.isArray()) {
+ ArrayNode arrayNode = (ArrayNode) jsonNode;
+ for (int i = 0; i < arrayNode.size(); i++) {
+ SeaTunnelRow deserialize = convertJsonNode(arrayNode.get(i));
+ out.collect(deserialize);
+ }
+ }
+ }
+
+ private SeaTunnelRow convertJsonNode(JsonNode jsonNode) throws IOException {
if (jsonNode == null) {
return null;
}
@@ -122,7 +135,7 @@ public class JsonDeserializationSchema implements DeserializationSchema<SeaTunne
}
}
- public JsonNode convertBytes(byte[] message) throws IOException {
+ private JsonNode convertBytes(byte[] message) throws IOException {
try {
return objectMapper.readTree(message);
} catch (Throwable t) {
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
index 8c0aceab9..14e662897 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
@@ -45,6 +45,9 @@ import java.time.temporal.TemporalQueries;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.IntFunction;
/** Tool class used to convert from {@link JsonNode} to {@link org.apache.seatunnel.api.table.type.SeaTunnelRow}. * */
public class JsonToRowConverters implements Serializable {
@@ -65,8 +68,8 @@ public class JsonToRowConverters implements Serializable {
private final boolean ignoreParseErrors;
public JsonToRowConverters(
- boolean failOnMissingField,
- boolean ignoreParseErrors) {
+ boolean failOnMissingField,
+ boolean ignoreParseErrors) {
this.failOnMissingField = failOnMissingField;
this.ignoreParseErrors = ignoreParseErrors;
}
@@ -83,33 +86,103 @@ public class JsonToRowConverters implements Serializable {
case ROW:
return createRowConverter((SeaTunnelRowType) type);
case NULL:
- return jsonNode -> null;
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ return null;
+ }
+ };
case BOOLEAN:
- return this::convertToBoolean;
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ return convertToBoolean(jsonNode);
+ }
+ };
case TINYINT:
- return jsonNode -> Byte.parseByte(jsonNode.asText().trim());
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ return Byte.parseByte(jsonNode.asText().trim());
+ }
+ };
case SMALLINT:
- return jsonNode -> Short.parseShort(jsonNode.asText().trim());
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ return Short.parseShort(jsonNode.asText().trim());
+ }
+ };
case INT:
- return this::convertToInt;
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ return convertToInt(jsonNode);
+ }
+ };
case BIGINT:
- return this::convertToLong;
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ return convertToLong(jsonNode);
+ }
+ };
case DATE:
- return this::convertToLocalDate;
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ return convertToLocalDate(jsonNode);
+ }
+ };
case TIME:
- return this::convertToLocalTime;
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ return convertToLocalTime(jsonNode);
+ }
+ };
case TIMESTAMP:
- return this::convertToLocalDateTime;
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ return convertToLocalDateTime(jsonNode);
+ }
+ };
case FLOAT:
- return this::convertToFloat;
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ return convertToFloat(jsonNode);
+ }
+ };
case DOUBLE:
- return this::convertToDouble;
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ return convertToDouble(jsonNode);
+ }
+ };
case STRING:
- return this::convertToString;
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ return convertToString(jsonNode);
+ }
+ };
case BYTES:
- return this::convertToBytes;
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ return convertToBytes(jsonNode);
+ }
+ };
case DECIMAL:
- return this::convertToBigDecimal;
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ return convertToBigDecimal(jsonNode);
+ }
+ };
case ARRAY:
return createArrayConverter((ArrayType<?, ?>) type);
case MAP:
@@ -210,46 +283,70 @@ public class JsonToRowConverters implements Serializable {
private JsonToRowConverter createRowConverter(SeaTunnelRowType rowType) {
final JsonToRowConverter[] fieldConverters =
Arrays.stream(rowType.getFieldTypes())
- .map(this::createConverter)
- .toArray(JsonToRowConverter[]::new);
+ .map(new Function<SeaTunnelDataType<?>, Object>() {
+ @Override
+ public Object apply(SeaTunnelDataType<?> seaTunnelDataType) {
+ return createConverter(seaTunnelDataType);
+ }
+ })
+ .toArray(new IntFunction<JsonToRowConverter[]>() {
+ @Override
+ public JsonToRowConverter[] apply(int value) {
+ return new JsonToRowConverter[value];
+ }
+ });
final String[] fieldNames = rowType.getFieldNames();
- return jsonNode -> {
- ObjectNode node = (ObjectNode) jsonNode;
- int arity = fieldNames.length;
- SeaTunnelRow row = new SeaTunnelRow(arity);
- for (int i = 0; i < arity; i++) {
- String fieldName = fieldNames[i];
- JsonNode field = node.get(fieldName);
- try {
- Object convertedField = convertField(fieldConverters[i], fieldName, field);
- row.setField(i, convertedField);
- } catch (Throwable t) {
- throw new JsonParseException(
- String.format("Fail to deserialize at field: %s.", fieldName), t);
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ ObjectNode node = (ObjectNode) jsonNode;
+ int arity = fieldNames.length;
+ SeaTunnelRow row = new SeaTunnelRow(arity);
+ for (int i = 0; i < arity; i++) {
+ String fieldName = fieldNames[i];
+ JsonNode field = node.get(fieldName);
+ try {
+ Object convertedField = convertField(fieldConverters[i], fieldName, field);
+ row.setField(i, convertedField);
+ } catch (Throwable t) {
+ throw new JsonParseException(
+ String.format("Fail to deserialize at field: %s.", fieldName), t);
+ }
}
+ return row;
}
- return row;
};
}
private JsonToRowConverter createArrayConverter(ArrayType<?, ?> type) {
JsonToRowConverter valueConverter = createConverter(type.getElementType());
- return jsonNode -> {
- Object arr = Array.newInstance(type.getElementType().getTypeClass(), jsonNode.size());
- for (int i = 0; i < jsonNode.size(); i++) {
- Array.set(arr, i, valueConverter.convert(jsonNode.get(i)));
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ Object arr = Array.newInstance(type.getElementType().getTypeClass(), jsonNode.size());
+ for (int i = 0; i < jsonNode.size(); i++) {
+ Array.set(arr, i, valueConverter.convert(jsonNode.get(i)));
+ }
+ return arr;
}
- return arr;
};
}
private JsonToRowConverter createMapConverter(MapType<?, ?> type) {
JsonToRowConverter valueConverter = createConverter(type.getValueType());
- return jsonNode -> {
- Map<Object, Object> value = new HashMap<>();
- jsonNode.fields().forEachRemaining(entry -> value.put(entry.getKey(), valueConverter.convert(entry.getValue())));
- return value;
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ Map<Object, Object> value = new HashMap<>();
+ jsonNode.fields().forEachRemaining(new Consumer<Map.Entry<String, JsonNode>>() {
+ @Override
+ public void accept(Map.Entry<String, JsonNode> entry) {
+ value.put(entry.getKey(), valueConverter.convert(entry.getValue()));
+ }
+ });
+ return value;
+ }
};
}
@@ -267,17 +364,20 @@ public class JsonToRowConverters implements Serializable {
}
private JsonToRowConverter wrapIntoNullableConverter(JsonToRowConverter converter) {
- return jsonNode -> {
- if (jsonNode == null || jsonNode.isNull() || jsonNode.isMissingNode()) {
- return null;
- }
- try {
- return converter.convert(jsonNode);
- } catch (Throwable t) {
- if (!ignoreParseErrors) {
- throw t;
+ return new JsonToRowConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode) {
+ if (jsonNode == null || jsonNode.isNull() || jsonNode.isMissingNode()) {
+ return null;
+ }
+ try {
+ return converter.convert(jsonNode);
+ } catch (Throwable t) {
+ if (!ignoreParseErrors) {
+ throw t;
+ }
+ return null;
}
- return null;
}
};
}
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
index 02c6ab9a7..525ef7292 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
@@ -40,6 +40,8 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.Map;
+import java.util.function.Function;
+import java.util.function.IntFunction;
public class RowToJsonConverters implements Serializable {
@@ -50,11 +52,14 @@ public class RowToJsonConverters implements Serializable {
}
private RowToJsonConverter wrapIntoNullableConverter(RowToJsonConverter converter) {
- return (mapper, reuse, value) -> {
- if (value == null) {
- return mapper.getNodeFactory().nullNode();
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ if (value == null) {
+ return mapper.getNodeFactory().nullNode();
+ }
+ return converter.convert(mapper, reuse, value);
}
- return converter.convert(mapper, reuse, value);
};
}
@@ -64,33 +69,103 @@ public class RowToJsonConverters implements Serializable {
case ROW:
return createRowConverter((SeaTunnelRowType) type);
case NULL:
- return (mapper, reuse, value) -> null;
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ return null;
+ }
+ };
case BOOLEAN:
- return (mapper, reuse, value) -> mapper.getNodeFactory().booleanNode((Boolean) value);
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ return mapper.getNodeFactory().booleanNode((Boolean) value);
+ }
+ };
case TINYINT:
- return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((byte) value);
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ return mapper.getNodeFactory().numberNode((byte) value);
+ }
+ };
case SMALLINT:
- return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((short) value);
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ return mapper.getNodeFactory().numberNode((short) value);
+ }
+ };
case INT:
- return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((int) value);
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ return mapper.getNodeFactory().numberNode((int) value);
+ }
+ };
case BIGINT:
- return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((long) value);
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ return mapper.getNodeFactory().numberNode((long) value);
+ }
+ };
case FLOAT:
- return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((float) value);
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ return mapper.getNodeFactory().numberNode((float) value);
+ }
+ };
case DOUBLE:
- return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((double) value);
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ return mapper.getNodeFactory().numberNode((double) value);
+ }
+ };
case DECIMAL:
- return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((BigDecimal) value);
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ return mapper.getNodeFactory().numberNode((BigDecimal) value);
+ }
+ };
case BYTES:
- return (mapper, reuse, value) -> mapper.getNodeFactory().binaryNode((byte[]) value);
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ return mapper.getNodeFactory().binaryNode((byte[]) value);
+ }
+ };
case STRING:
- return (mapper, reuse, value) -> mapper.getNodeFactory().textNode((String) value);
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ return mapper.getNodeFactory().textNode((String) value);
+ }
+ };
case DATE:
- return (mapper, reuse, value) -> mapper.getNodeFactory().textNode(ISO_LOCAL_DATE.format((LocalDate) value));
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ return mapper.getNodeFactory().textNode(ISO_LOCAL_DATE.format((LocalDate) value));
+ }
+ };
case TIME:
- return (mapper, reuse, value) -> mapper.getNodeFactory().textNode(TimeFormat.TIME_FORMAT.format((LocalTime) value));
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ return mapper.getNodeFactory().textNode(TimeFormat.TIME_FORMAT.format((LocalTime) value));
+ }
+ };
case TIMESTAMP:
- return (mapper, reuse, value) -> mapper.getNodeFactory().textNode(ISO_LOCAL_DATE_TIME.format((LocalDateTime) value));
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ return mapper.getNodeFactory().textNode(ISO_LOCAL_DATE_TIME.format((LocalDateTime) value));
+ }
+ };
case ARRAY:
return createArrayConverter((ArrayType) type);
case MAP:
@@ -104,53 +179,69 @@ public class RowToJsonConverters implements Serializable {
private RowToJsonConverter createRowConverter(SeaTunnelRowType rowType) {
final RowToJsonConverter[] fieldConverters =
Arrays.stream(rowType.getFieldTypes())
- .map(this::createConverter)
- .toArray(RowToJsonConverter[]::new);
+ .map(new Function<SeaTunnelDataType<?>, Object>() {
+ @Override
+ public Object apply(SeaTunnelDataType<?> seaTunnelDataType) {
+ return createConverter(seaTunnelDataType);
+ }
+ })
+ .toArray(new IntFunction<RowToJsonConverter[]>() {
+ @Override
+ public RowToJsonConverter[] apply(int value) {
+ return new RowToJsonConverter[value];
+ }
+ });
final String[] fieldNames = rowType.getFieldNames();
final int arity = fieldNames.length;
- return (mapper, reuse, value) -> {
- ObjectNode node;
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ ObjectNode node;
- // reuse could be a NullNode if last record is null.
- if (reuse == null || reuse.isNull()) {
- node = mapper.createObjectNode();
- } else {
- node = (ObjectNode) reuse;
- }
+ // reuse could be a NullNode if last record is null.
+ if (reuse == null || reuse.isNull()) {
+ node = mapper.createObjectNode();
+ } else {
+ node = (ObjectNode) reuse;
+ }
- for (int i = 0; i < arity; i++) {
- String fieldName = fieldNames[i];
- SeaTunnelRow row = (SeaTunnelRow) value;
- node.set(fieldName, fieldConverters[i].convert(
+ for (int i = 0; i < arity; i++) {
+ String fieldName = fieldNames[i];
+ SeaTunnelRow row = (SeaTunnelRow) value;
+ node.set(fieldName, fieldConverters[i].convert(
mapper, node.get(fieldName), row.getField(i)));
- }
+ }
- return node;
+ return node;
+ }
};
}
private RowToJsonConverter createArrayConverter(ArrayType arrayType) {
final RowToJsonConverter elementConverter = createConverter(arrayType.getElementType());
- return (mapper, reuse, value) -> {
- ArrayNode node;
-
- // reuse could be a NullNode if last record is null.
- if (reuse == null || reuse.isNull()) {
- node = mapper.createArrayNode();
- } else {
- node = (ArrayNode) reuse;
- node.removeAll();
- }
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ ArrayNode node;
- Object[] arrayData = (Object[]) value;
- int numElements = arrayData.length;
- for (int i = 0; i < numElements; i++) {
- Object element = arrayData[i];
- node.add(elementConverter.convert(mapper, null, element));
- }
+ // reuse could be a NullNode if last record is null.
+ if (reuse == null || reuse.isNull()) {
+ node = mapper.createArrayNode();
+ } else {
+ node = (ArrayNode) reuse;
+ node.removeAll();
+ }
+
+ Object[] arrayData = (Object[]) value;
+ int numElements = arrayData.length;
+ for (int i = 0; i < numElements; i++) {
+ Object element = arrayData[i];
+ node.add(elementConverter.convert(mapper, null, element));
+ }
- return node;
+ return node;
+ }
};
}
@@ -161,24 +252,27 @@ public class RowToJsonConverters implements Serializable {
}
final RowToJsonConverter valueConverter = createConverter(valueType);
- return (mapper, reuse, value) -> {
- ObjectNode node;
-
- // reuse could be a NullNode if last record is null.
- if (reuse == null || reuse.isNull()) {
- node = mapper.createObjectNode();
- } else {
- node = (ObjectNode) reuse;
- node.removeAll();
- }
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+ ObjectNode node;
- Map<String, ?> mapData = (Map) value;
- for (Map.Entry<String, ?> entry : mapData.entrySet()) {
- String fieldName = entry.getKey();
- node.set(fieldName, valueConverter.convert(mapper, node.get(fieldName), entry.getValue()));
- }
+ // reuse could be a NullNode if last record is null.
+ if (reuse == null || reuse.isNull()) {
+ node = mapper.createObjectNode();
+ } else {
+ node = (ObjectNode) reuse;
+ node.removeAll();
+ }
- return node;
+ Map<String, ?> mapData = (Map) value;
+ for (Map.Entry<String, ?> entry : mapData.entrySet()) {
+ String fieldName = entry.getKey();
+ node.set(fieldName, valueConverter.convert(mapper, node.get(fieldName), entry.getValue()));
+ }
+
+ return node;
+ }
};
}