You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/09/29 04:04:21 UTC

[incubator-seatunnel] branch dev updated: [Bug][format][json] Fix jackson package conflict with spark (#2934)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1a92b8369 [Bug][format][json] Fix jackson package conflict with spark (#2934)
1a92b8369 is described below

commit 1a92b8369b500271de34a3d7419a2d045e04ce14
Author: hailin0 <wa...@apache.org>
AuthorDate: Thu Sep 29 12:04:16 2022 +0800

    [Bug][format][json] Fix jackson package conflict with spark (#2934)
---
 .../http/source/DeserializationCollector.java      |  20 +-
 .../seatunnel/e2e/connector/redis/RedisIT.java     |   3 +-
 seatunnel-formats/seatunnel-format-json/pom.xml    |  43 ++++
 .../format/json/JsonDeserializationSchema.java     |  17 +-
 .../seatunnel/format/json/JsonToRowConverters.java | 202 +++++++++++++-----
 .../seatunnel/format/json/RowToJsonConverters.java | 228 +++++++++++++++------
 6 files changed, 372 insertions(+), 141 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/DeserializationCollector.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/DeserializationCollector.java
index f01bc7ff0..59743c783 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/DeserializationCollector.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/DeserializationCollector.java
@@ -22,8 +22,6 @@ import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
 import lombok.AllArgsConstructor;
 
 import java.io.IOException;
@@ -35,26 +33,10 @@ public class DeserializationCollector {
 
     public void collect(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
         if (deserializationSchema instanceof JsonDeserializationSchema) {
-            collectJson(message, (JsonDeserializationSchema) deserializationSchema, out);
+            ((JsonDeserializationSchema) deserializationSchema).collect(message, out);
         } else {
             SeaTunnelRow deserialize = deserializationSchema.deserialize(message);
             out.collect(deserialize);
         }
     }
-
-    private void collectJson(byte[] message,
-                             JsonDeserializationSchema jsonDeserializationSchema,
-                             Collector<SeaTunnelRow> out) throws IOException {
-        JsonNode jsonNode = jsonDeserializationSchema.convertBytes(message);
-        if (jsonNode.isArray()) {
-            ArrayNode arrayNode = (ArrayNode) jsonNode;
-            for (int i = 0; i < arrayNode.size(); i++) {
-                SeaTunnelRow deserialize = jsonDeserializationSchema.convertJsonNode(arrayNode.get(i));
-                out.collect(deserialize);
-            }
-        } else {
-            SeaTunnelRow deserialize = jsonDeserializationSchema.convertJsonNode(jsonNode);
-            out.collect(deserialize);
-        }
-    }
 }
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
index 86e853ccd..8e82fc8b4 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
@@ -29,7 +29,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 
 import lombok.extern.slf4j.Slf4j;
@@ -57,7 +56,6 @@ import java.util.stream.Stream;
 
 import scala.Tuple2;
 
-@DisabledOnContainer(value = "spark:2.4.3", disabledReason = "json-format conflicts with the Jackson version of Spark-2.4.3, see:https://github.com/apache/incubator-seatunnel/issues/2929")
 @Slf4j
 public class RedisIT extends TestSuiteBase implements TestResource {
     private static final String IMAGE = "redis:latest";
@@ -162,6 +160,7 @@ public class RedisIT extends TestSuiteBase implements TestResource {
     private void initJedis() {
         Jedis jedis = new Jedis(redisContainer.getHost(), redisContainer.getFirstMappedPort());
         jedis.auth(PASSWORD);
+        jedis.ping();
         this.jedis = jedis;
     }
 
diff --git a/seatunnel-formats/seatunnel-format-json/pom.xml b/seatunnel-formats/seatunnel-format-json/pom.xml
index 7642c77e5..4b5070244 100644
--- a/seatunnel-formats/seatunnel-format-json/pom.xml
+++ b/seatunnel-formats/seatunnel-format-json/pom.xml
@@ -33,7 +33,50 @@
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-api</artifactId>
             <version>${project.version}</version>
+            <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.fasterxml.jackson</pattern>
+                                    <shadedPattern>${seatunnel.shade.package}.com.fasterxml.jackson</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <!-- make sure that flatten runs after maven-shade-plugin -->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>flatten-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
index 5d6f199a5..518ece6b8 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
 
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.CompositeType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -32,6 +33,7 @@ import com.fasterxml.jackson.core.json.JsonReadFeature;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 
 import java.io.IOException;
 
@@ -107,7 +109,18 @@ public class JsonDeserializationSchema implements DeserializationSchema<SeaTunne
         return convertJsonNode(convertBytes(message));
     }
 
-    public SeaTunnelRow convertJsonNode(JsonNode jsonNode) throws IOException {
+    public void collect(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
+        JsonNode jsonNode = convertBytes(message);
+        if (jsonNode.isArray()) {
+            ArrayNode arrayNode = (ArrayNode) jsonNode;
+            for (int i = 0; i < arrayNode.size(); i++) {
+                SeaTunnelRow deserialize = convertJsonNode(arrayNode.get(i));
+                out.collect(deserialize);
+            }
+        }
+    }
+
+    private SeaTunnelRow convertJsonNode(JsonNode jsonNode) throws IOException {
         if (jsonNode == null) {
             return null;
         }
@@ -122,7 +135,7 @@ public class JsonDeserializationSchema implements DeserializationSchema<SeaTunne
         }
     }
 
-    public JsonNode convertBytes(byte[] message) throws IOException {
+    private JsonNode convertBytes(byte[] message) throws IOException {
         try {
             return objectMapper.readTree(message);
         } catch (Throwable t) {
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
index 8c0aceab9..14e662897 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
@@ -45,6 +45,9 @@ import java.time.temporal.TemporalQueries;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.IntFunction;
 
 /** Tool class used to convert from {@link JsonNode} to {@link org.apache.seatunnel.api.table.type.SeaTunnelRow}. * */
 public class JsonToRowConverters implements Serializable {
@@ -65,8 +68,8 @@ public class JsonToRowConverters implements Serializable {
     private final boolean ignoreParseErrors;
 
     public JsonToRowConverters(
-            boolean failOnMissingField,
-            boolean ignoreParseErrors) {
+        boolean failOnMissingField,
+        boolean ignoreParseErrors) {
         this.failOnMissingField = failOnMissingField;
         this.ignoreParseErrors = ignoreParseErrors;
     }
@@ -83,33 +86,103 @@ public class JsonToRowConverters implements Serializable {
             case ROW:
                 return createRowConverter((SeaTunnelRowType) type);
             case NULL:
-                return jsonNode -> null;
+                return new JsonToRowConverter() {
+                    @Override
+                    public Object convert(JsonNode jsonNode) {
+                        return null;
+                    }
+                };
             case BOOLEAN:
-                return this::convertToBoolean;
+                return new JsonToRowConverter() {
+                    @Override
+                    public Object convert(JsonNode jsonNode) {
+                        return convertToBoolean(jsonNode);
+                    }
+                };
             case TINYINT:
-                return jsonNode -> Byte.parseByte(jsonNode.asText().trim());
+                return new JsonToRowConverter() {
+                    @Override
+                    public Object convert(JsonNode jsonNode) {
+                        return Byte.parseByte(jsonNode.asText().trim());
+                    }
+                };
             case SMALLINT:
-                return jsonNode -> Short.parseShort(jsonNode.asText().trim());
+                return new JsonToRowConverter() {
+                    @Override
+                    public Object convert(JsonNode jsonNode) {
+                        return Short.parseShort(jsonNode.asText().trim());
+                    }
+                };
             case INT:
-                return this::convertToInt;
+                return new JsonToRowConverter() {
+                    @Override
+                    public Object convert(JsonNode jsonNode) {
+                        return convertToInt(jsonNode);
+                    }
+                };
             case BIGINT:
-                return this::convertToLong;
+                return new JsonToRowConverter() {
+                    @Override
+                    public Object convert(JsonNode jsonNode) {
+                        return convertToLong(jsonNode);
+                    }
+                };
             case DATE:
-                return this::convertToLocalDate;
+                return new JsonToRowConverter() {
+                    @Override
+                    public Object convert(JsonNode jsonNode) {
+                        return convertToLocalDate(jsonNode);
+                    }
+                };
             case TIME:
-                return this::convertToLocalTime;
+                return new JsonToRowConverter() {
+                    @Override
+                    public Object convert(JsonNode jsonNode) {
+                        return convertToLocalTime(jsonNode);
+                    }
+                };
             case TIMESTAMP:
-                return this::convertToLocalDateTime;
+                return new JsonToRowConverter() {
+                    @Override
+                    public Object convert(JsonNode jsonNode) {
+                        return convertToLocalDateTime(jsonNode);
+                    }
+                };
             case FLOAT:
-                return this::convertToFloat;
+                return new JsonToRowConverter() {
+                    @Override
+                    public Object convert(JsonNode jsonNode) {
+                        return convertToFloat(jsonNode);
+                    }
+                };
             case DOUBLE:
-                return this::convertToDouble;
+                return new JsonToRowConverter() {
+                    @Override
+                    public Object convert(JsonNode jsonNode) {
+                        return convertToDouble(jsonNode);
+                    }
+                };
             case STRING:
-                return this::convertToString;
+                return new JsonToRowConverter() {
+                    @Override
+                    public Object convert(JsonNode jsonNode) {
+                        return convertToString(jsonNode);
+                    }
+                };
             case BYTES:
-                return this::convertToBytes;
+                return new JsonToRowConverter() {
+                    @Override
+                    public Object convert(JsonNode jsonNode) {
+                        return convertToBytes(jsonNode);
+                    }
+                };
             case DECIMAL:
-                return this::convertToBigDecimal;
+                return new JsonToRowConverter() {
+                    @Override
+                    public Object convert(JsonNode jsonNode) {
+                        return convertToBigDecimal(jsonNode);
+                    }
+                };
             case ARRAY:
                 return createArrayConverter((ArrayType<?, ?>) type);
             case MAP:
@@ -210,46 +283,70 @@ public class JsonToRowConverters implements Serializable {
     private JsonToRowConverter createRowConverter(SeaTunnelRowType rowType) {
         final JsonToRowConverter[] fieldConverters =
             Arrays.stream(rowType.getFieldTypes())
-                .map(this::createConverter)
-                .toArray(JsonToRowConverter[]::new);
+                .map(new Function<SeaTunnelDataType<?>, Object>() {
+                    @Override
+                    public Object apply(SeaTunnelDataType<?> seaTunnelDataType) {
+                        return createConverter(seaTunnelDataType);
+                    }
+                })
+                .toArray(new IntFunction<JsonToRowConverter[]>() {
+                    @Override
+                    public JsonToRowConverter[] apply(int value) {
+                        return new JsonToRowConverter[value];
+                    }
+                });
         final String[] fieldNames = rowType.getFieldNames();
 
-        return jsonNode -> {
-            ObjectNode node = (ObjectNode) jsonNode;
-            int arity = fieldNames.length;
-            SeaTunnelRow row = new SeaTunnelRow(arity);
-            for (int i = 0; i < arity; i++) {
-                String fieldName = fieldNames[i];
-                JsonNode field = node.get(fieldName);
-                try {
-                    Object convertedField = convertField(fieldConverters[i], fieldName, field);
-                    row.setField(i, convertedField);
-                } catch (Throwable t) {
-                    throw new JsonParseException(
-                        String.format("Fail to deserialize at field: %s.", fieldName), t);
+        return new JsonToRowConverter() {
+            @Override
+            public Object convert(JsonNode jsonNode) {
+                ObjectNode node = (ObjectNode) jsonNode;
+                int arity = fieldNames.length;
+                SeaTunnelRow row = new SeaTunnelRow(arity);
+                for (int i = 0; i < arity; i++) {
+                    String fieldName = fieldNames[i];
+                    JsonNode field = node.get(fieldName);
+                    try {
+                        Object convertedField = convertField(fieldConverters[i], fieldName, field);
+                        row.setField(i, convertedField);
+                    } catch (Throwable t) {
+                        throw new JsonParseException(
+                            String.format("Fail to deserialize at field: %s.", fieldName), t);
+                    }
                 }
+                return row;
             }
-            return row;
         };
     }
 
     private JsonToRowConverter createArrayConverter(ArrayType<?, ?> type) {
         JsonToRowConverter valueConverter = createConverter(type.getElementType());
-        return jsonNode -> {
-            Object arr = Array.newInstance(type.getElementType().getTypeClass(), jsonNode.size());
-            for (int i = 0; i < jsonNode.size(); i++) {
-                Array.set(arr, i, valueConverter.convert(jsonNode.get(i)));
+        return new JsonToRowConverter() {
+            @Override
+            public Object convert(JsonNode jsonNode) {
+                Object arr = Array.newInstance(type.getElementType().getTypeClass(), jsonNode.size());
+                for (int i = 0; i < jsonNode.size(); i++) {
+                    Array.set(arr, i, valueConverter.convert(jsonNode.get(i)));
+                }
+                return arr;
             }
-            return arr;
         };
     }
 
     private JsonToRowConverter createMapConverter(MapType<?, ?> type) {
         JsonToRowConverter valueConverter = createConverter(type.getValueType());
-        return jsonNode -> {
-            Map<Object, Object> value = new HashMap<>();
-            jsonNode.fields().forEachRemaining(entry -> value.put(entry.getKey(), valueConverter.convert(entry.getValue())));
-            return value;
+        return new JsonToRowConverter() {
+            @Override
+            public Object convert(JsonNode jsonNode) {
+                Map<Object, Object> value = new HashMap<>();
+                jsonNode.fields().forEachRemaining(new Consumer<Map.Entry<String, JsonNode>>() {
+                    @Override
+                    public void accept(Map.Entry<String, JsonNode> entry) {
+                        value.put(entry.getKey(), valueConverter.convert(entry.getValue()));
+                    }
+                });
+                return value;
+            }
         };
     }
 
@@ -267,17 +364,20 @@ public class JsonToRowConverters implements Serializable {
     }
 
     private JsonToRowConverter wrapIntoNullableConverter(JsonToRowConverter converter) {
-        return jsonNode -> {
-            if (jsonNode == null || jsonNode.isNull() || jsonNode.isMissingNode()) {
-                return null;
-            }
-            try {
-                return converter.convert(jsonNode);
-            } catch (Throwable t) {
-                if (!ignoreParseErrors) {
-                    throw t;
+        return new JsonToRowConverter() {
+            @Override
+            public Object convert(JsonNode jsonNode) {
+                if (jsonNode == null || jsonNode.isNull() || jsonNode.isMissingNode()) {
+                    return null;
+                }
+                try {
+                    return converter.convert(jsonNode);
+                } catch (Throwable t) {
+                    if (!ignoreParseErrors) {
+                        throw t;
+                    }
+                    return null;
                 }
-                return null;
             }
         };
     }
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
index 02c6ab9a7..525ef7292 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
@@ -40,6 +40,8 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.function.Function;
+import java.util.function.IntFunction;
 
 public class RowToJsonConverters implements Serializable {
 
@@ -50,11 +52,14 @@ public class RowToJsonConverters implements Serializable {
     }
 
     private RowToJsonConverter wrapIntoNullableConverter(RowToJsonConverter converter) {
-        return (mapper, reuse, value) -> {
-            if (value == null) {
-                return mapper.getNodeFactory().nullNode();
+        return new RowToJsonConverter() {
+            @Override
+            public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                if (value == null) {
+                    return mapper.getNodeFactory().nullNode();
+                }
+                return converter.convert(mapper, reuse, value);
             }
-            return converter.convert(mapper, reuse, value);
         };
     }
 
@@ -64,33 +69,103 @@ public class RowToJsonConverters implements Serializable {
             case ROW:
                 return createRowConverter((SeaTunnelRowType) type);
             case NULL:
-                return (mapper, reuse, value) -> null;
+                return new RowToJsonConverter() {
+                    @Override
+                    public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                        return null;
+                    }
+                };
             case BOOLEAN:
-                return (mapper, reuse, value) -> mapper.getNodeFactory().booleanNode((Boolean) value);
+                return new RowToJsonConverter() {
+                    @Override
+                    public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                        return mapper.getNodeFactory().booleanNode((Boolean) value);
+                    }
+                };
             case TINYINT:
-                return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((byte) value);
+                return new RowToJsonConverter() {
+                    @Override
+                    public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                        return mapper.getNodeFactory().numberNode((byte) value);
+                    }
+                };
             case SMALLINT:
-                return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((short) value);
+                return new RowToJsonConverter() {
+                    @Override
+                    public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                        return mapper.getNodeFactory().numberNode((short) value);
+                    }
+                };
             case INT:
-                return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((int) value);
+                return new RowToJsonConverter() {
+                    @Override
+                    public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                        return mapper.getNodeFactory().numberNode((int) value);
+                    }
+                };
             case BIGINT:
-                return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((long) value);
+                return new RowToJsonConverter() {
+                    @Override
+                    public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                        return mapper.getNodeFactory().numberNode((long) value);
+                    }
+                };
             case FLOAT:
-                return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((float) value);
+                return new RowToJsonConverter() {
+                    @Override
+                    public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                        return mapper.getNodeFactory().numberNode((float) value);
+                    }
+                };
             case DOUBLE:
-                return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((double) value);
+                return new RowToJsonConverter() {
+                    @Override
+                    public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                        return mapper.getNodeFactory().numberNode((double) value);
+                    }
+                };
             case DECIMAL:
-                return (mapper, reuse, value) -> mapper.getNodeFactory().numberNode((BigDecimal) value);
+                return new RowToJsonConverter() {
+                    @Override
+                    public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                        return mapper.getNodeFactory().numberNode((BigDecimal) value);
+                    }
+                };
             case BYTES:
-                return (mapper, reuse, value) -> mapper.getNodeFactory().binaryNode((byte[]) value);
+                return new RowToJsonConverter() {
+                    @Override
+                    public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                        return mapper.getNodeFactory().binaryNode((byte[]) value);
+                    }
+                };
             case STRING:
-                return (mapper, reuse, value) -> mapper.getNodeFactory().textNode((String) value);
+                return new RowToJsonConverter() {
+                    @Override
+                    public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                        return mapper.getNodeFactory().textNode((String) value);
+                    }
+                };
             case DATE:
-                return (mapper, reuse, value) -> mapper.getNodeFactory().textNode(ISO_LOCAL_DATE.format((LocalDate) value));
+                return new RowToJsonConverter() {
+                    @Override
+                    public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                        return mapper.getNodeFactory().textNode(ISO_LOCAL_DATE.format((LocalDate) value));
+                    }
+                };
             case TIME:
-                return (mapper, reuse, value) -> mapper.getNodeFactory().textNode(TimeFormat.TIME_FORMAT.format((LocalTime) value));
+                return new RowToJsonConverter() {
+                    @Override
+                    public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                        return mapper.getNodeFactory().textNode(TimeFormat.TIME_FORMAT.format((LocalTime) value));
+                    }
+                };
             case TIMESTAMP:
-                return (mapper, reuse, value) -> mapper.getNodeFactory().textNode(ISO_LOCAL_DATE_TIME.format((LocalDateTime) value));
+                return new RowToJsonConverter() {
+                    @Override
+                    public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                        return mapper.getNodeFactory().textNode(ISO_LOCAL_DATE_TIME.format((LocalDateTime) value));
+                    }
+                };
             case ARRAY:
                 return createArrayConverter((ArrayType) type);
             case MAP:
@@ -104,53 +179,69 @@ public class RowToJsonConverters implements Serializable {
     private RowToJsonConverter createRowConverter(SeaTunnelRowType rowType) {
         final RowToJsonConverter[] fieldConverters =
                 Arrays.stream(rowType.getFieldTypes())
-                        .map(this::createConverter)
-                        .toArray(RowToJsonConverter[]::new);
+                        .map(new Function<SeaTunnelDataType<?>, Object>() {
+                            @Override
+                            public Object apply(SeaTunnelDataType<?> seaTunnelDataType) {
+                                return createConverter(seaTunnelDataType);
+                            }
+                        })
+                        .toArray(new IntFunction<RowToJsonConverter[]>() {
+                            @Override
+                            public RowToJsonConverter[] apply(int value) {
+                                return new RowToJsonConverter[value];
+                            }
+                        });
         final String[] fieldNames = rowType.getFieldNames();
         final int arity = fieldNames.length;
 
-        return (mapper, reuse, value) -> {
-            ObjectNode node;
+        return new RowToJsonConverter() {
+            @Override
+            public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                ObjectNode node;
 
-            // reuse could be a NullNode if last record is null.
-            if (reuse == null || reuse.isNull()) {
-                node = mapper.createObjectNode();
-            } else {
-                node = (ObjectNode) reuse;
-            }
+                // reuse could be a NullNode if last record is null.
+                if (reuse == null || reuse.isNull()) {
+                    node = mapper.createObjectNode();
+                } else {
+                    node = (ObjectNode) reuse;
+                }
 
-            for (int i = 0; i < arity; i++) {
-                String fieldName = fieldNames[i];
-                SeaTunnelRow row = (SeaTunnelRow) value;
-                node.set(fieldName, fieldConverters[i].convert(
+                for (int i = 0; i < arity; i++) {
+                    String fieldName = fieldNames[i];
+                    SeaTunnelRow row = (SeaTunnelRow) value;
+                    node.set(fieldName, fieldConverters[i].convert(
                         mapper, node.get(fieldName), row.getField(i)));
-            }
+                }
 
-            return node;
+                return node;
+            }
         };
     }
 
     private RowToJsonConverter createArrayConverter(ArrayType arrayType) {
         final RowToJsonConverter elementConverter = createConverter(arrayType.getElementType());
-        return (mapper, reuse, value) -> {
-            ArrayNode node;
-
-            // reuse could be a NullNode if last record is null.
-            if (reuse == null || reuse.isNull()) {
-                node = mapper.createArrayNode();
-            } else {
-                node = (ArrayNode) reuse;
-                node.removeAll();
-            }
+        return new RowToJsonConverter() {
+            @Override
+            public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                ArrayNode node;
 
-            Object[] arrayData = (Object[]) value;
-            int numElements = arrayData.length;
-            for (int i = 0; i < numElements; i++) {
-                Object element = arrayData[i];
-                node.add(elementConverter.convert(mapper, null, element));
-            }
+                // reuse could be a NullNode if last record is null.
+                if (reuse == null || reuse.isNull()) {
+                    node = mapper.createArrayNode();
+                } else {
+                    node = (ArrayNode) reuse;
+                    node.removeAll();
+                }
+
+                Object[] arrayData = (Object[]) value;
+                int numElements = arrayData.length;
+                for (int i = 0; i < numElements; i++) {
+                    Object element = arrayData[i];
+                    node.add(elementConverter.convert(mapper, null, element));
+                }
 
-            return node;
+                return node;
+            }
         };
     }
 
@@ -161,24 +252,27 @@ public class RowToJsonConverters implements Serializable {
         }
 
         final RowToJsonConverter valueConverter = createConverter(valueType);
-        return (mapper, reuse, value) -> {
-            ObjectNode node;
-
-            // reuse could be a NullNode if last record is null.
-            if (reuse == null || reuse.isNull()) {
-                node = mapper.createObjectNode();
-            } else {
-                node = (ObjectNode) reuse;
-                node.removeAll();
-            }
+        return new RowToJsonConverter() {
+            @Override
+            public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
+                ObjectNode node;
 
-            Map<String, ?> mapData = (Map) value;
-            for (Map.Entry<String, ?> entry : mapData.entrySet()) {
-                String fieldName = entry.getKey();
-                node.set(fieldName, valueConverter.convert(mapper, node.get(fieldName), entry.getValue()));
-            }
+                // reuse could be a NullNode if last record is null.
+                if (reuse == null || reuse.isNull()) {
+                    node = mapper.createObjectNode();
+                } else {
+                    node = (ObjectNode) reuse;
+                    node.removeAll();
+                }
 
-            return node;
+                Map<String, ?> mapData = (Map) value;
+                for (Map.Entry<String, ?> entry : mapData.entrySet()) {
+                    String fieldName = entry.getKey();
+                    node.set(fieldName, valueConverter.convert(mapper, node.get(fieldName), entry.getValue()));
+                }
+
+                return node;
+            }
         };
     }