You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/15 02:15:33 UTC

[incubator-seatunnel] branch api-draft updated: [api-draft][formats] json format (#2014)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 1673dcebb [api-draft][formats] json format (#2014)
1673dcebb is described below

commit 1673dcebb5d31a721cbe47fd1f4c8af7e9de6de2
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Wed Jun 15 10:15:28 2022 +0800

    [api-draft][formats] json format (#2014)
---
 pom.xml                                            |   1 +
 .../api/serialization/DeserializationSchema.java   |  44 ++++
 .../api/serialization/SerializationSchema.java     |  30 +++
 .../api/table/connector/DeserializationFormat.java |  39 +++
 .../api/table/connector/SerializationFormat.java   |  25 ++
 .../factory/DeserializationFormatFactory.java      |  24 ++
 .../table/factory/SerializationFormatFactory.java  |  24 ++
 seatunnel-formats/pom.xml                          |  33 +++
 seatunnel-formats/seatunnel-format-json/pom.xml    |  39 +++
 .../format/json/JsonDeserializationSchema.java     | 122 +++++++++
 .../seatunnel/format/json/JsonToRowConverters.java | 281 +++++++++++++++++++++
 11 files changed, 662 insertions(+)

diff --git a/pom.xml b/pom.xml
index a858f5b91..0e479d784 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
         <module>seatunnel-api</module>
         <module>seatunnel-translation</module>
         <module>seatunnel-plugin-discovery</module>
+        <module>seatunnel-formats</module>
     </modules>
 
     <properties>
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
new file mode 100644
index 000000000..0260d733a
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.serialization;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public interface DeserializationSchema<T> extends Serializable {
+
+    /**
+     * Deserializes the byte message.
+     *
+     * @param message The message, as a byte array.
+     * @return The deserialized message as an SeaTunnel Row (null if the message cannot be deserialized).
+     */
+    T deserialize(byte[] message) throws IOException;
+
+    default void deserialize(byte[] message, Collector<T> out) throws IOException {
+        T deserialize = deserialize(message);
+        if (deserialize != null) {
+            out.collect(deserialize);
+        }
+    }
+
+    SeaTunnelDataType<T> getProducedType();
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/SerializationSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/SerializationSchema.java
new file mode 100644
index 000000000..53f4c7293
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/SerializationSchema.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.serialization;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+public interface SerializationSchema {
+    /**
+     * Serializes the incoming element to a specified type.
+     *
+     * @param element The incoming element to be serialized
+     * @return The serialized element.
+     */
+    byte[] serialize(SeaTunnelRow element);
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/DeserializationFormat.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/DeserializationFormat.java
new file mode 100644
index 000000000..6a3cb0ad5
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/DeserializationFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.connector;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public interface DeserializationFormat {
+
+    DeserializationSchema createDeserializationSchema();
+
+    default Map<String, SeaTunnelDataType<?>> listReadableMetadata() {
+        return Collections.emptyMap();
+    }
+
+    default void applyReadableMetadata(List<String> metadataKeys, SeaTunnelDataType<?> dataType) {
+        throw new UnsupportedOperationException(
+            "A decoding format must override this method to apply metadata keys.");
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SerializationFormat.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SerializationFormat.java
new file mode 100644
index 000000000..bc0c5c052
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SerializationFormat.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.connector;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+
+public interface SerializationFormat {
+
+    SerializationSchema createSerializationSchema();
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/DeserializationFormatFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/DeserializationFormatFactory.java
new file mode 100644
index 000000000..29285f641
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/DeserializationFormatFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.factory;
+
+import org.apache.seatunnel.api.table.connector.DeserializationFormat;
+
+public interface DeserializationFormatFactory extends Factory {
+    DeserializationFormat createDeserializationFormat(TableFactoryContext context);
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SerializationFormatFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SerializationFormatFactory.java
new file mode 100644
index 000000000..653bd03b3
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SerializationFormatFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.factory;
+
+import org.apache.seatunnel.api.table.connector.SerializationFormat;
+
+public interface SerializationFormatFactory extends Factory {
+    SerializationFormat createSerializationFormat(TableFactoryContext context);
+}
diff --git a/seatunnel-formats/pom.xml b/seatunnel-formats/pom.xml
new file mode 100644
index 000000000..11c938b35
--- /dev/null
+++ b/seatunnel-formats/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel</artifactId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-formats</artifactId>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>seatunnel-format-json</module>
+    </modules>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-formats/seatunnel-format-json/pom.xml b/seatunnel-formats/seatunnel-format-json/pom.xml
new file mode 100644
index 000000000..7642c77e5
--- /dev/null
+++ b/seatunnel-formats/seatunnel-format-json/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-formats</artifactId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-format-json</artifactId>
+
+    <properties>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
new file mode 100644
index 000000000..3510daae9
--- /dev/null
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.type.CompositeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import com.fasterxml.jackson.core.json.JsonReadFeature;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+public class JsonDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Flag indicating whether to fail if a field is missing.
+     */
+    private final boolean failOnMissingField;
+
+    /**
+     * Flag indicating whether to ignore invalid fields/rows (default: throw an exception).
+     */
+    private final boolean ignoreParseErrors;
+
+    /**
+     * The row type of the produced {@link SeaTunnelRow}.
+     */
+    private final SeaTunnelRowType rowType;
+
+    /**
+     * Runtime converter that converts {@link JsonNode}s into objects of Flink SQL internal data
+     * structures.
+     */
+    private final JsonToRowConverters.JsonToRowConverter runtimeConverter;
+
+    /**
+     * Object mapper for parsing the JSON.
+     */
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    public JsonDeserializationSchema(boolean failOnMissingField,
+                                     boolean ignoreParseErrors,
+                                     SeaTunnelRowType rowType) {
+        if (ignoreParseErrors && failOnMissingField) {
+            throw new IllegalArgumentException(
+                "JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.");
+        }
+        this.rowType = checkNotNull(rowType);
+        this.failOnMissingField = failOnMissingField;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.runtimeConverter =
+            new JsonToRowConverters(failOnMissingField, ignoreParseErrors)
+                .createConverter(checkNotNull(rowType));
+
+        if (hasDecimalType(rowType)) {
+            objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
+        }
+        objectMapper.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true);
+    }
+
+    private static boolean hasDecimalType(SeaTunnelDataType<?> dataType) {
+        if (SqlType.of(dataType) == SqlType.DECIMAL) {
+            return true;
+        }
+        if (dataType instanceof CompositeType) {
+            CompositeType<?> compositeType = (CompositeType<?>) dataType;
+            for (SeaTunnelDataType<?> child : compositeType.getChildren()) {
+                if (hasDecimalType(child)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        if (message == null) {
+            return null;
+        }
+        try {
+            return (SeaTunnelRow) runtimeConverter.convert(objectMapper.readTree(message));
+        } catch (Throwable t) {
+            if (ignoreParseErrors) {
+                return null;
+            }
+            throw new IOException(
+                format("Failed to deserialize JSON '%s'.", new String(message)), t);
+        }
+    }
+
+    @Override
+    public SeaTunnelRowType getProducedType() {
+        return this.rowType;
+    }
+}
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
new file mode 100644
index 000000000..e6c28490f
--- /dev/null
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json;
+
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+import java.time.temporal.TemporalAccessor;
+import java.time.temporal.TemporalQueries;
+import java.util.Arrays;
+
+/** Tool class used to convert from {@link JsonNode} to {@link org.apache.seatunnel.api.table.type.SeaTunnelRow}. * */
+public class JsonToRowConverters implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @SuppressWarnings("MagicNumber")
+    public static final DateTimeFormatter TIME_FORMAT =
+        new DateTimeFormatterBuilder()
+            .appendPattern("HH:mm:ss")
+            .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
+            .toFormatter();
+
+    /** Flag indicating whether to fail if a field is missing. */
+    private final boolean failOnMissingField;
+
+    /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
+    private final boolean ignoreParseErrors;
+
+    public JsonToRowConverters(
+            boolean failOnMissingField,
+            boolean ignoreParseErrors) {
+        this.failOnMissingField = failOnMissingField;
+        this.ignoreParseErrors = ignoreParseErrors;
+    }
+
+    /** Creates a runtime converter which is null safe. */
+    public JsonToRowConverter createConverter(SeaTunnelDataType<?> type) {
+        return wrapIntoNullableConverter(createNotNullConverter(type));
+    }
+
+    /** Creates a runtime converter which assuming input object is not null. */
+    @SuppressWarnings("unchecked")
+    private JsonToRowConverter createNotNullConverter(SeaTunnelDataType<?> type) {
+        SqlType sqlType = SqlType.of(type);
+        switch (sqlType) {
+            case ROW:
+                return createRowConverter((SeaTunnelRowType) type);
+            case NULL:
+                return jsonNode -> null;
+            case BOOLEAN:
+                return this::convertToBoolean;
+            case TINYINT:
+                return jsonNode -> Byte.parseByte(jsonNode.asText().trim());
+            case SMALLINT:
+                return jsonNode -> Short.parseShort(jsonNode.asText().trim());
+            case INT:
+                return this::convertToInt;
+            case BIGINT:
+                return this::convertToLong;
+            case DATE:
+                return this::convertToLocalDate;
+            case TIME:
+                return this::convertToLocalTime;
+            case TIMESTAMP:
+                return this::convertToLocalDateTime;
+            case FLOAT:
+                return this::convertToFloat;
+            case DOUBLE:
+                return this::convertToDouble;
+            case STRING:
+                return this::convertToString;
+            case BYTES:
+                return this::convertToBytes;
+            case DECIMAL:
+                return createDecimalConverter((BasicType<BigDecimal>) type);
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + type);
+        }
+    }
+
+    private boolean convertToBoolean(JsonNode jsonNode) {
+        if (jsonNode.isBoolean()) {
+            // avoid redundant toString and parseBoolean, for better performance
+            return jsonNode.asBoolean();
+        } else {
+            return Boolean.parseBoolean(jsonNode.asText().trim());
+        }
+    }
+
+    private int convertToInt(JsonNode jsonNode) {
+        if (jsonNode.canConvertToInt()) {
+            // avoid redundant toString and parseInt, for better performance
+            return jsonNode.asInt();
+        } else {
+            return Integer.parseInt(jsonNode.asText().trim());
+        }
+    }
+
+    private long convertToLong(JsonNode jsonNode) {
+        if (jsonNode.canConvertToLong()) {
+            // avoid redundant toString and parseLong, for better performance
+            return jsonNode.asLong();
+        } else {
+            return Long.parseLong(jsonNode.asText().trim());
+        }
+    }
+
+    private double convertToDouble(JsonNode jsonNode) {
+        if (jsonNode.isDouble()) {
+            // avoid redundant toString and parseDouble, for better performance
+            return jsonNode.asDouble();
+        } else {
+            return Double.parseDouble(jsonNode.asText().trim());
+        }
+    }
+
+    private float convertToFloat(JsonNode jsonNode) {
+        if (jsonNode.isDouble()) {
+            // avoid redundant toString and parseDouble, for better performance
+            return (float) jsonNode.asDouble();
+        } else {
+            return Float.parseFloat(jsonNode.asText().trim());
+        }
+    }
+
+    private LocalDate convertToLocalDate(JsonNode jsonNode) {
+        return ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate());
+    }
+
+    private LocalTime convertToLocalTime(JsonNode jsonNode) {
+        TemporalAccessor parsedTime = TIME_FORMAT.parse(jsonNode.asText());
+        return parsedTime.query(TemporalQueries.localTime());
+    }
+
+    private LocalDateTime convertToLocalDateTime(JsonNode jsonNode) {
+        TemporalAccessor parsedTimestamp = DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(jsonNode.asText());
+        LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime());
+        LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate());
+        return LocalDateTime.of(localDate, localTime);
+    }
+
+    private String convertToString(JsonNode jsonNode) {
+        if (jsonNode.isContainerNode()) {
+            return jsonNode.toString();
+        } else {
+            return jsonNode.asText();
+        }
+    }
+
+    private byte[] convertToBytes(JsonNode jsonNode) {
+        try {
+            return jsonNode.binaryValue();
+        } catch (IOException e) {
+            throw new JsonParseException("Unable to deserialize byte array.", e);
+        }
+    }
+
+    private JsonToRowConverter createDecimalConverter(BasicType<BigDecimal> decimalType) {
+        return jsonNode -> {
+            BigDecimal bigDecimal;
+            if (jsonNode.isBigDecimal()) {
+                bigDecimal = jsonNode.decimalValue();
+            } else {
+                bigDecimal = new BigDecimal(jsonNode.asText());
+            }
+
+            return bigDecimal;
+        };
+    }
+
+    public JsonToRowConverter createRowConverter(SeaTunnelRowType rowType) {
+        final JsonToRowConverter[] fieldConverters =
+                Arrays.stream(rowType.getFieldTypes())
+                        .map(this::createConverter)
+                        .toArray(JsonToRowConverter[]::new);
+        final String[] fieldNames = rowType.getFieldNames();
+
+        return jsonNode -> {
+            ObjectNode node = (ObjectNode) jsonNode;
+            int arity = fieldNames.length;
+            SeaTunnelRow row = new SeaTunnelRow(arity);
+            for (int i = 0; i < arity; i++) {
+                String fieldName = fieldNames[i];
+                JsonNode field = node.get(fieldName);
+                try {
+                    Object convertedField = convertField(fieldConverters[i], fieldName, field);
+                    row.setField(i, convertedField);
+                } catch (Throwable t) {
+                    throw new JsonParseException(
+                            String.format("Fail to deserialize at field: %s.", fieldName), t);
+                }
+            }
+            return row;
+        };
+    }
+
+    private Object convertField(
+        JsonToRowConverter fieldConverter, String fieldName, JsonNode field) {
+        if (field == null) {
+            if (failOnMissingField) {
+                throw new JsonParseException("Could not find field with name '" + fieldName + "'.");
+            } else {
+                return null;
+            }
+        } else {
+            return fieldConverter.convert(field);
+        }
+    }
+
+    private JsonToRowConverter wrapIntoNullableConverter(JsonToRowConverter converter) {
+        return jsonNode -> {
+            if (jsonNode == null || jsonNode.isNull() || jsonNode.isMissingNode()) {
+                return null;
+            }
+            try {
+                return converter.convert(jsonNode);
+            } catch (Throwable t) {
+                if (!ignoreParseErrors) {
+                    throw t;
+                }
+                return null;
+            }
+        };
+    }
+
+    /**
+     * Runtime converter that converts {@link JsonNode}s into objects of Flink Table & SQL internal
+     * data structures.
+     */
+    @FunctionalInterface
+    public interface JsonToRowConverter extends Serializable {
+        Object convert(JsonNode jsonNode);
+    }
+
+    /** Exception which refers to parse errors in converters. */
+    private static final class JsonParseException extends RuntimeException {
+        private static final long serialVersionUID = 1L;
+
+        public JsonParseException(String message) {
+            super(message);
+        }
+
+        public JsonParseException(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+}