You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/06/01 13:02:14 UTC

[incubator-seatunnel] branch dev updated: [Improve]Use Jackson replace Fastjson (#1971)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 351713d6 [Improve]Use Jackson replace Fastjson (#1971)
351713d6 is described below

commit 351713d63c74fc046a4cb30b51ff9270cd10dd1a
Author: Kirs <ki...@apache.org>
AuthorDate: Wed Jun 1 21:02:09 2022 +0800

    [Improve]Use Jackson replace Fastjson (#1971)
    
    * remove fastjson
    
    * [Improve]Use Jackson replace Fastjson
    
    * fix license header
    
    * remove unsed code
    
    * fix check style
    
    * Fix jsonnode convert fail
    
    * fix check style
    
    * Fix convert error
    
    * Fix FeiShuClient
    
    Co-authored-by: ruanwenjun <we...@apache.org>
---
 pom.xml                                            |   8 +-
 seatunnel-apis/seatunnel-api-flink/pom.xml         |   6 +
 .../apache/seatunnel/flink/util/SchemaUtil.java    |  50 ++--
 seatunnel-apis/seatunnel-api-spark/pom.xml         |   5 +
 .../seatunnel/spark/utils/SparkStructTypeUtil.java |  16 +-
 seatunnel-common/pom.xml                           |  10 +-
 .../apache/seatunnel/common/utils/JsonUtils.java   | 282 +++++++++++++++++++++
 .../seatunnel-connector-flink-clickhouse/pom.xml   |   5 +
 .../clickhouse/sink/client/ClickhouseClient.java   |   6 +-
 .../seatunnel-connector-flink-druid/pom.xml        |   5 +
 .../flink/druid/sink/DruidOutputFormat.java        |  12 +-
 .../seatunnel-connector-flink-file/pom.xml         |   5 +
 .../flink/file/sink/JsonRowOutputFormat.java       |  27 +-
 .../seatunnel/flink/file/source/FileSource.java    |   9 +-
 .../flink/file/source/JsonRowInputFormat.java      |  22 +-
 .../flink/kafka/source/KafkaTableStream.java       |  15 +-
 .../seatunnel-connector-spark-feishu/pom.xml       |   5 -
 .../seatunnel/spark/feishu/FeishuClient.scala      |  59 +++--
 .../seatunnel-connector-spark-mongodb/pom.xml      |   5 +
 .../seatunnel/spark/mongodb/source/MongoDB.scala   |   5 +-
 seatunnel-dist/release-docs/LICENSE                |   1 -
 tools/dependencies/known-dependencies.txt          |   1 -
 22 files changed, 437 insertions(+), 122 deletions(-)

diff --git a/pom.xml b/pom.xml
index 90c6483d..81820bb8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,7 +104,6 @@
         <hudi.version>0.10.0</hudi.version>
         <hadoop.binary.version>2.7</hadoop.binary.version>
         <hadoop.version>2.7.5</hadoop.version>
-        <fastjson.version>1.2.80</fastjson.version>
         <jackson.version>2.12.6</jackson.version>
         <lombok.version>1.18.0</lombok.version>
         <mysql.version>8.0.16</mysql.version>
@@ -410,12 +409,7 @@
                 <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
                 <version>${spark.version}</version>
             </dependency>
-
-            <dependency>
-                <groupId>com.alibaba</groupId>
-                <artifactId>fastjson</artifactId>
-                <version>${fastjson.version}</version>
-            </dependency>
+            
             <dependency>
                 <groupId>org.projectlombok</groupId>
                 <artifactId>lombok</artifactId>
diff --git a/seatunnel-apis/seatunnel-api-flink/pom.xml b/seatunnel-apis/seatunnel-api-flink/pom.xml
index 739b52ec..3afdc7cc 100644
--- a/seatunnel-apis/seatunnel-api-flink/pom.xml
+++ b/seatunnel-apis/seatunnel-api-flink/pom.xml
@@ -37,6 +37,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-java</artifactId>
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
index 0c301513..8a4b79dd 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
@@ -17,13 +17,14 @@
 
 package org.apache.seatunnel.flink.util;
 
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.flink.enums.FormatType;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
 
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
@@ -58,19 +59,19 @@ public final class SchemaUtil {
 
         switch (format) {
             case JSON:
-                getJsonSchema(schema, (JSONObject) info);
+                getJsonSchema(schema, (ObjectNode) info);
                 break;
             case CSV:
                 getCsvSchema(schema, (List<Map<String, String>>) info);
                 break;
             case ORC:
-                getOrcSchema(schema, (JSONObject) info);
+                getOrcSchema(schema, (ObjectNode) info);
                 break;
             case AVRO:
-                getAvroSchema(schema, (JSONObject) info);
+                getAvroSchema(schema, (ObjectNode) info);
                 break;
             case PARQUET:
-                getParquetSchema(schema, (JSONObject) info);
+                getParquetSchema(schema, (ObjectNode) info);
                 break;
             default:
         }
@@ -110,9 +111,9 @@ public final class SchemaUtil {
         return formatDescriptor;
     }
 
-    private static void getJsonSchema(Schema schema, JSONObject json) {
-
-        for (Map.Entry<String, Object> entry : json.entrySet()) {
+    private static void getJsonSchema(Schema schema, ObjectNode json) {
+        Map<String, Object> jsonMap = JsonUtils.toMap(json);
+        for (Map.Entry<String, Object> entry : jsonMap.entrySet()) {
             String key = entry.getKey();
             Object value = entry.getValue();
             if (value instanceof String) {
@@ -123,12 +124,12 @@ public final class SchemaUtil {
                 schema.field(key, Types.LONG());
             } else if (value instanceof BigDecimal) {
                 schema.field(key, Types.JAVA_BIG_DEC());
-            } else if (value instanceof JSONObject) {
-                schema.field(key, getTypeInformation((JSONObject) value));
-            } else if (value instanceof JSONArray) {
-                Object obj = ((JSONArray) value).get(0);
-                if (obj instanceof JSONObject) {
-                    schema.field(key, ObjectArrayTypeInfo.getInfoFor(Row[].class, getTypeInformation((JSONObject) obj)));
+            } else if (value instanceof ObjectNode) {
+                schema.field(key, getTypeInformation((ObjectNode) value));
+            } else if (value instanceof ArrayNode) {
+                Object obj = ((ArrayNode) value).get(0);
+                if (obj instanceof ObjectNode) {
+                    schema.field(key, ObjectArrayTypeInfo.getInfoFor(Row[].class, getTypeInformation((ObjectNode) obj)));
                 } else {
                     schema.field(key, ObjectArrayTypeInfo.getInfoFor(Object[].class, TypeInformation.of(Object.class)));
                 }
@@ -161,7 +162,7 @@ public final class SchemaUtil {
      * @param schema schema
      * @param json   json
      */
-    private static void getOrcSchema(Schema schema, JSONObject json) {
+    private static void getOrcSchema(Schema schema, ObjectNode json) {
 
     }
 
@@ -171,11 +172,11 @@ public final class SchemaUtil {
      * @param schema schema
      * @param json   json
      */
-    private static void getParquetSchema(Schema schema, JSONObject json) {
+    private static void getParquetSchema(Schema schema, ObjectNode json) {
 
     }
 
-    private static void getAvroSchema(Schema schema, JSONObject json) {
+    private static void getAvroSchema(Schema schema, ObjectNode json) {
         RowTypeInfo typeInfo = (RowTypeInfo) AvroSchemaConverter.<Row>convertToTypeInfo(json.toString());
         String[] fieldNames = typeInfo.getFieldNames();
         for (String name : fieldNames) {
@@ -183,12 +184,13 @@ public final class SchemaUtil {
         }
     }
 
-    public static RowTypeInfo getTypeInformation(JSONObject json) {
+    public static RowTypeInfo getTypeInformation(ObjectNode json) {
         int size = json.size();
         String[] fields = new String[size];
         TypeInformation<?>[] informations = new TypeInformation[size];
+        Map<String, Object> jsonMap = JsonUtils.toMap(json);
         int i = 0;
-        for (Map.Entry<String, Object> entry : json.entrySet()) {
+        for (Map.Entry<String, Object> entry : jsonMap.entrySet()) {
             String key = entry.getKey();
             Object value = entry.getValue();
             fields[i] = key;
@@ -200,10 +202,10 @@ public final class SchemaUtil {
                 informations[i] = Types.LONG();
             } else if (value instanceof BigDecimal) {
                 informations[i] = Types.JAVA_BIG_DEC();
-            } else if (value instanceof JSONObject) {
-                informations[i] = getTypeInformation((JSONObject) value);
-            } else if (value instanceof JSONArray) {
-                JSONObject demo = ((JSONArray) value).getJSONObject(0);
+            } else if (value instanceof ObjectNode) {
+                informations[i] = getTypeInformation((ObjectNode) value);
+            } else if (value instanceof ArrayNode) {
+                ObjectNode demo = (ObjectNode) ((ArrayNode) value).get(0);
                 informations[i] = ObjectArrayTypeInfo.getInfoFor(Row[].class, getTypeInformation(demo));
             }
             i++;
diff --git a/seatunnel-apis/seatunnel-api-spark/pom.xml b/seatunnel-apis/seatunnel-api-spark/pom.xml
index 29242201..8f4e7ca0 100644
--- a/seatunnel-apis/seatunnel-api-spark/pom.xml
+++ b/seatunnel-apis/seatunnel-api-spark/pom.xml
@@ -36,6 +36,11 @@
             <artifactId>seatunnel-api-base</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.spark</groupId>
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/utils/SparkStructTypeUtil.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/utils/SparkStructTypeUtil.java
index e3b0542c..27833f6d 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/utils/SparkStructTypeUtil.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/utils/SparkStructTypeUtil.java
@@ -18,8 +18,9 @@
 package org.apache.seatunnel.spark.utils;
 
 import org.apache.seatunnel.common.config.ConfigRuntimeException;
+import org.apache.seatunnel.common.utils.JsonUtils;
 
-import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructType;
@@ -32,13 +33,14 @@ public final class SparkStructTypeUtil {
     private SparkStructTypeUtil() {
     }
 
-    public static StructType getStructType(StructType schema, JSONObject json) {
+    public static StructType getStructType(StructType schema, ObjectNode json) {
         StructType newSchema = schema.copy(schema.fields());
-        for (Map.Entry<String, Object> entry : json.entrySet()) {
+        Map<String, Object> jsonMap = JsonUtils.toMap(json);
+        for (Map.Entry<String, Object> entry : jsonMap.entrySet()) {
             String field = entry.getKey();
             Object type = entry.getValue();
-            if (type instanceof JSONObject) {
-                StructType st = getStructType(new StructType(), (JSONObject) type);
+            if (type instanceof ObjectNode) {
+                StructType st = getStructType(new StructType(), (ObjectNode) type);
                 newSchema = newSchema.add(field, st);
             } else if (type instanceof List) {
                 List list = (List) type;
@@ -47,8 +49,8 @@ public final class SparkStructTypeUtil {
                     newSchema = newSchema.add(field, DataTypes.createArrayType(null, true));
                 } else {
                     Object o = list.get(0);
-                    if (o instanceof JSONObject) {
-                        StructType st = getStructType(new StructType(), (JSONObject) o);
+                    if (o instanceof ObjectNode) {
+                        StructType st = getStructType(new StructType(), (ObjectNode) o);
                         newSchema = newSchema.add(field, DataTypes.createArrayType(st, true));
                     } else {
                         DataType st = getType(o.toString());
diff --git a/seatunnel-common/pom.xml b/seatunnel-common/pom.xml
index da0b169d..81b3eb7c 100644
--- a/seatunnel-common/pom.xml
+++ b/seatunnel-common/pom.xml
@@ -34,11 +34,6 @@
             <artifactId>seatunnel-config-shade</artifactId>
         </dependency>
 
-        <dependency>
-            <groupId>com.alibaba</groupId>
-            <artifactId>fastjson</artifactId>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
@@ -47,7 +42,10 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
new file mode 100644
index 00000000..5f362059
--- /dev/null
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+public class JsonUtils {
+
+    /**
+     * can use static singleton, inject: just make sure to reuse!
+     */
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+            .configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+            .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+            .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+            .setTimeZone(TimeZone.getDefault());
+
+    private JsonUtils() {
+        throw new UnsupportedOperationException("Construct JSONUtils");
+    }
+
+    public static ArrayNode createArrayNode() {
+        return OBJECT_MAPPER.createArrayNode();
+    }
+
+    public static ObjectNode createObjectNode() {
+        return OBJECT_MAPPER.createObjectNode();
+    }
+
+    public static JsonNode toJsonNode(Object obj) {
+        return OBJECT_MAPPER.valueToTree(obj);
+    }
+
+    /**
+     * json representation of object
+     *
+     * @param object  object
+     * @param feature feature
+     * @return object to json string
+     */
+    public static String toJsonString(Object object, SerializationFeature feature) {
+        try {
+            ObjectWriter writer = OBJECT_MAPPER.writer(feature);
+            return writer.writeValueAsString(object);
+        } catch (Exception e) {
+            throw new RuntimeException("Object to json exception!", e);
+        }
+    }
+
+    /**
+     * This method deserializes the specified Json into an object of the specified class. It is not
+     * suitable to use if the specified class is a generic type since it will not have the generic
+     * type information because of the Type Erasure feature of Java. Therefore, this method should not
+     * be used if the desired type is a generic type. Note that this method works fine if the any of
+     * the fields of the specified object are generics, just the object itself should not be a
+     * generic type.
+     *
+     * @param json  the string from which the object is to be deserialized
+     * @param clazz the class of T
+     * @param <T>   T
+     * @return an object of type T from the string
+     * classOfT
+     */
+    public static <T> T parseObject(String json, Class<T> clazz) {
+        if (StringUtils.isEmpty(json)) {
+            return null;
+        }
+
+        try {
+            return OBJECT_MAPPER.readValue(json, clazz);
+        } catch (Exception e) {
+            throw new RuntimeException("Json parse object exception!", e);
+        }
+    }
+
+    /**
+     * json to list
+     *
+     * @param json  json string
+     * @param clazz class
+     * @param <T>   T
+     * @return list
+     */
+    public static <T> List<T> toList(String json, Class<T> clazz) {
+        if (StringUtils.isEmpty(json)) {
+            return Collections.emptyList();
+        }
+
+        try {
+            CollectionType listType = OBJECT_MAPPER.getTypeFactory().constructCollectionType(ArrayList.class, clazz);
+            return OBJECT_MAPPER.readValue(json, listType);
+        } catch (Exception e) {
+            throw new RuntimeException("Json parse list exception!", e);
+        }
+    }
+
+    /**
+     * Method for finding a JSON Object field with specified name in this
+     * node or its child nodes, and returning value it has.
+     * If no matching field is found in this node or its descendants, returns null.
+     *
+     * @param jsonNode  json node
+     * @param fieldName Name of field to look for
+     * @return Value of first matching node found, if any; null if none
+     */
+    public static String findValue(JsonNode jsonNode, String fieldName) {
+        JsonNode node = jsonNode.findValue(fieldName);
+
+        if (node == null) {
+            return null;
+        }
+
+        return node.asText();
+    }
+
+    /**
+     * json to map
+     * {@link #toMap(String, Class, Class)}
+     *
+     * @param json json
+     * @return json to map
+     */
+    public static Map<String, String> toMap(String json) {
+        return parseObject(json, new TypeReference<Map<String, String>>() {
+        });
+    }
+
+    public static Map<String, Object> toMap(JsonNode jsonNode) {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.convertValue(jsonNode, new TypeReference<Map<String, Object>>() {
+        });
+    }
+
+    /**
+     * json to map
+     *
+     * @param json   json
+     * @param classK classK
+     * @param classV classV
+     * @param <K>    K
+     * @param <V>    V
+     * @return to map
+     */
+    public static <K, V> Map<K, V> toMap(String json, Class<K> classK, Class<V> classV) {
+        if (StringUtils.isEmpty(json)) {
+            return Collections.emptyMap();
+        }
+
+        try {
+            return OBJECT_MAPPER.readValue(json, new TypeReference<Map<K, V>>() {
+            });
+        } catch (Exception e) {
+            throw new RuntimeException("json to map exception!", e);
+        }
+    }
+
+    /**
+     * json to object
+     *
+     * @param json json string
+     * @param type type reference
+     * @param <T> type
+     * @return return parse object
+     */
+    public static <T> T parseObject(String json, TypeReference<T> type) {
+        if (StringUtils.isEmpty(json)) {
+            return null;
+        }
+
+        try {
+            return OBJECT_MAPPER.readValue(json, type);
+        } catch (Exception e) {
+            throw new RuntimeException("Json parse object exception.", e);
+        }
+    }
+
+    /**
+     * object to json string
+     *
+     * @param object object
+     * @return json string
+     */
+    public static String toJsonString(Object object) {
+        try {
+            return OBJECT_MAPPER.writeValueAsString(object);
+        } catch (Exception e) {
+            throw new RuntimeException("Object json deserialization exception.", e);
+        }
+    }
+
+    public static ObjectNode parseObject(String text) {
+        try {
+            if (text.isEmpty()) {
+                return parseObject(text, ObjectNode.class);
+            } else {
+                return (ObjectNode) OBJECT_MAPPER.readTree(text);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("String json deserialization exception.", e);
+        }
+    }
+
+    public static ArrayNode parseArray(String text) {
+        try {
+            return (ArrayNode) OBJECT_MAPPER.readTree(text);
+        } catch (Exception e) {
+            throw new RuntimeException("Json deserialization exception.", e);
+        }
+    }
+
+    /**
+     * json serializer
+     */
+    public static class JsonDataSerializer extends JsonSerializer<String> {
+
+        @Override
+        public void serialize(String value, JsonGenerator gen, SerializerProvider provider) throws IOException {
+            gen.writeRawValue(value);
+        }
+
+    }
+
+    /**
+     * json data deserializer
+     */
+    public static class JsonDataDeserializer extends JsonDeserializer<String> {
+
+        @Override
+        public String deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+            JsonNode node = p.getCodec().readTree(p);
+            if (node instanceof TextNode) {
+                return node.asText();
+            } else {
+                return node.toString();
+            }
+        }
+
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml
index 44034e57..cd86170b 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml
@@ -36,6 +36,11 @@
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-java</artifactId>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
index 012e1403..ab91a6dd 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
@@ -24,14 +24,14 @@ import static org.apache.seatunnel.flink.clickhouse.ConfigKey.PASSWORD;
 import static org.apache.seatunnel.flink.clickhouse.ConfigKey.USERNAME;
 
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.flink.clickhouse.pojo.DistributedEngine;
 import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
 import org.apache.seatunnel.flink.clickhouse.sink.file.ClickhouseTable;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.TypeReference;
+import com.fasterxml.jackson.core.type.TypeReference;
 import ru.yandex.clickhouse.BalancedClickhouseDataSource;
 import ru.yandex.clickhouse.ClickHouseConnection;
 import ru.yandex.clickhouse.ClickHouseConnectionImpl;
@@ -191,7 +191,7 @@ public class ClickhouseClient {
             String engine = resultSet.getString(1);
             String createTableDDL = resultSet.getString(2);
             String engineFull = resultSet.getString(3);
-            List<String> dataPaths = JSON.parseObject(resultSet.getString(4).replaceAll("'", "\""), new TypeReference<List<String>>() {
+            List<String> dataPaths = JsonUtils.parseObject(resultSet.getString(4).replaceAll("'", "\""), new TypeReference<List<String>>() {
             });
             DistributedEngine distributedEngine = null;
             if ("Distributed".equals(engine)) {
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/pom.xml
index 4eb30c94..7814f375 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/pom.xml
@@ -36,6 +36,11 @@
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-java</artifactId>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidOutputFormat.java
index 1adee168..dca0c26c 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidOutputFormat.java
@@ -20,12 +20,13 @@ package org.apache.seatunnel.flink.druid.sink;
 import static org.apache.flink.api.java.io.CsvInputFormat.DEFAULT_FIELD_DELIMITER;
 import static org.apache.flink.api.java.io.CsvInputFormat.DEFAULT_LINE_DELIMITER;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
+import org.apache.seatunnel.common.utils.JsonUtils;
+
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.MapperFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.druid.data.input.MaxSizeSplitHintSpec;
 import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -122,14 +123,15 @@ public class DruidOutputFormat extends RichOutputFormat<Row> {
         mapper.configure(SerializationFeature.INDENT_OUTPUT, false);
         mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
         String taskJSON = mapper.writeValueAsString(indexTask);
-        JSONObject jsonObject = JSON.parseObject(taskJSON);
+        ObjectNode jsonObject = JsonUtils.parseObject(taskJSON);
         jsonObject.remove("id");
         jsonObject.remove("groupId");
         jsonObject.remove("resource");
-        JSONObject spec = jsonObject.getJSONObject("spec");
+
+        ObjectNode spec = (ObjectNode) jsonObject.get("spec");
         spec.remove("tuningConfig");
         jsonObject.put("spec", spec);
-        taskJSON = jsonObject.toJSONString();
+        taskJSON = jsonObject.toString();
 
         URL url = new URL(this.coordinatorURL + "druid/indexer/v1/task");
         HttpURLConnection con = (HttpURLConnection) url.openConnection();
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/pom.xml
index 3d99c82d..31bdf4f6 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/pom.xml
@@ -36,6 +36,11 @@
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/JsonRowOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/JsonRowOutputFormat.java
index bc2fc169..efcbceae 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/JsonRowOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/JsonRowOutputFormat.java
@@ -17,8 +17,12 @@
 
 package org.apache.seatunnel.flink.file.sink;
 
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
+import org.apache.seatunnel.common.utils.JsonUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -31,7 +35,6 @@ import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.charset.IllegalCharsetNameException;
 import java.nio.charset.UnsupportedCharsetException;
-import java.util.Arrays;
 
 public class JsonRowOutputFormat extends FileOutputFormat<Row> {
 
@@ -86,37 +89,39 @@ public class JsonRowOutputFormat extends FileOutputFormat<Row> {
 
     @Override
     public void writeRecord(Row record) throws IOException {
-        final JSONObject json = getJson(record, rowTypeInfo);
+        final ObjectNode json = getJson(record, rowTypeInfo);
         byte[] bytes = json.toString().getBytes(charset);
         this.stream.write(bytes);
         this.stream.write(NEWLINE);
     }
 
-    private JSONObject getJson(Row record, RowTypeInfo rowTypeInfo) {
+    private ObjectNode getJson(Row record, RowTypeInfo rowTypeInfo) {
         String[] fieldNames = rowTypeInfo.getFieldNames();
         int i = 0;
-        JSONObject json = new JSONObject();
+        ObjectMapper mapper = new ObjectMapper();
+        ObjectNode json = mapper.createObjectNode();
         for (String name : fieldNames) {
             Object field = record.getField(i);
+            JsonNode fieldNode = JsonUtils.toJsonNode(field);
             final TypeInformation type = rowTypeInfo.getTypeAt(i);
             if (type instanceof AtomicType) {
-                json.put(name, field);
+                json.set(name, fieldNode);
             } else if (type instanceof ObjectArrayTypeInfo) {
                 ObjectArrayTypeInfo arrayTypeInfo = (ObjectArrayTypeInfo) type;
                 TypeInformation componentInfo = arrayTypeInfo.getComponentInfo();
-                JSONArray jsonArray = new JSONArray();
+                ArrayNode jsonArray = mapper.createArrayNode();
                 if (componentInfo instanceof RowTypeInfo) {
                     final Row[] rows = (Row[]) field;
                     for (Row r : rows) {
                         jsonArray.add(getJson(r, (RowTypeInfo) componentInfo));
                     }
                 } else {
-                    jsonArray.addAll(Arrays.asList((Object[]) field));
+                    jsonArray.add(fieldNode);
                 }
-                json.put(name, jsonArray);
+                json.set(name, jsonArray);
             } else if (type instanceof RowTypeInfo) {
                 RowTypeInfo typeInfo = (RowTypeInfo) type;
-                json.put(name, getJson((Row) field, typeInfo));
+                json.set(name, getJson((Row) field, typeInfo));
             }
             i++;
         }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
index eafae0ae..32a208a3 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.flink.file.source;
 
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.flink.BaseFlinkSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSource;
@@ -27,8 +28,8 @@ import org.apache.seatunnel.flink.util.SchemaUtil;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.TypeReference;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.auto.service.AutoService;
 import org.apache.avro.Schema;
 import org.apache.flink.api.common.io.InputFormat;
@@ -94,7 +95,7 @@ public class FileSource implements FlinkBatchSource {
         Path filePath = new Path(path);
         switch (format) {
             case JSON:
-                JSONObject jsonSchemaInfo = JSONObject.parseObject(config.getString(SCHEMA));
+                ObjectNode jsonSchemaInfo = JsonUtils.parseObject(config.getString(SCHEMA));
                 RowTypeInfo jsonInfo = SchemaUtil.getTypeInformation(jsonSchemaInfo);
                 inputFormat = new JsonRowInputFormat(filePath, null, jsonInfo);
                 break;
@@ -107,7 +108,7 @@ public class FileSource implements FlinkBatchSource {
                 this.inputFormat = new OrcRowInputFormat(path, config.getString(SCHEMA), null, DEFAULT_BATCH_SIZE);
                 break;
             case CSV:
-                List<Map<String, String>> csvSchemaInfo = JSONObject.parseObject(config.getString(SCHEMA),
+                List<Map<String, String>> csvSchemaInfo = JsonUtils.parseObject(config.getString(SCHEMA),
                         new TypeReference<List<Map<String, String>>>() {
                         });
                 TypeInformation<?>[] csvType = SchemaUtil.getCsvType(csvSchemaInfo);
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/JsonRowInputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/JsonRowInputFormat.java
index 1fcf26e1..bac38358 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/JsonRowInputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/JsonRowInputFormat.java
@@ -17,8 +17,10 @@
 
 package org.apache.seatunnel.flink.file.source;
 
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
+import org.apache.seatunnel.common.utils.JsonUtils;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.flink.api.common.io.DelimitedInputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
@@ -57,7 +59,7 @@ public class JsonRowInputFormat extends DelimitedInputFormat<Row> implements Res
         }
 
         String str = new String(bytes, offset, numBytes, this.charsetName);
-        JSONObject json = JSONObject.parseObject(str);
+        ObjectNode json = JsonUtils.parseObject(str);
         Row reuseRow;
         if (reuse == null) {
             reuseRow = new Row(rowTypeInfo.getArity());
@@ -68,26 +70,26 @@ public class JsonRowInputFormat extends DelimitedInputFormat<Row> implements Res
         return reuseRow;
     }
 
-    private void setJsonRow(Row row, JSONObject json, RowTypeInfo rowTypeInfo) {
+    private void setJsonRow(Row row, ObjectNode json, RowTypeInfo rowTypeInfo) {
         String[] fieldNames = rowTypeInfo.getFieldNames();
         int i = 0;
         for (String name : fieldNames) {
             Object value = json.get(name);
-            if (value instanceof JSONObject) {
+            if (value instanceof ObjectNode) {
                 TypeInformation information = rowTypeInfo.getTypeAt(name);
                 Row r = new Row(information.getArity());
-                setJsonRow(r, (JSONObject) value, (RowTypeInfo) information);
+                setJsonRow(r, (ObjectNode) value, (RowTypeInfo) information);
                 row.setField(i++, r);
-            } else if (value instanceof JSONArray) {
+            } else if (value instanceof ArrayNode) {
                 ObjectArrayTypeInfo information = (ObjectArrayTypeInfo) rowTypeInfo.getTypeAt(name);
-                JSONArray array = (JSONArray) value;
+                ArrayNode array = (ArrayNode) value;
                 Object[] objects = new Object[array.size()];
                 int j = 0;
                 for (Object o : array) {
-                    if (o instanceof JSONObject) {
+                    if (o instanceof ObjectNode) {
                         TypeInformation componentInfo = information.getComponentInfo();
                         Row r = new Row(componentInfo.getArity());
-                        setJsonRow(r, (JSONObject) o, (RowTypeInfo) componentInfo);
+                        setJsonRow(r, (ObjectNode) o, (RowTypeInfo) componentInfo);
                         objects[j++] = r;
                     } else {
                         objects[j++] = o;
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
index 69951f9f..50382413 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.common.PropertiesUtil;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.flink.BaseFlinkSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.enums.FormatType;
@@ -30,8 +31,7 @@ import org.apache.seatunnel.flink.util.TableUtil;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.parser.Feature;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.auto.service.AutoService;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -45,13 +45,13 @@ import org.apache.flink.types.Row;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 @AutoService(BaseFlinkSource.class)
 public class KafkaTableStream implements FlinkStreamSource {
 
-    private static final long   serialVersionUID = 5287018194573371428L;
+    private static final long serialVersionUID = 5287018194573371428L;
     private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTableStream.class);
 
     private Config config;
@@ -111,7 +111,7 @@ public class KafkaTableStream implements FlinkStreamSource {
         }
         String schemaContent = config.getString(SCHEMA);
         format = FormatType.from(config.getString(SOURCE_FORMAT).trim().toLowerCase());
-        schemaInfo = JSONObject.parse(schemaContent, Feature.OrderedField);
+        schemaInfo = JsonUtils.parseObject(schemaContent);
     }
 
     @Override
@@ -158,9 +158,10 @@ public class KafkaTableStream implements FlinkStreamSource {
                     kafka.startFromEarliest();
                     break;
                 case "specific":
+                    //todo Is json format?
                     String offset = config.getString("offset.reset.specific");
-                    HashMap<Integer, Long> map = new HashMap<>(DEFAULT_INITIAL_CAPACITY);
-                    JSONObject.parseObject(offset).forEach((k, v) -> map.put(Integer.valueOf(k), Long.valueOf(v.toString())));
+                    Map<Integer, Long> map = JsonUtils.parseObject(offset, new TypeReference<Map<Integer, Long>>() {
+                    });
                     kafka.startFromSpecificOffsets(map);
                     break;
                 default:
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/pom.xml b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/pom.xml
index fbe4353c..039d8da2 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/pom.xml
@@ -51,10 +51,5 @@
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
         </dependency>
-
-        <dependency>
-            <groupId>com.alibaba</groupId>
-            <artifactId>fastjson</artifactId>
-        </dependency>
     </dependencies>
 </project>
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/FeishuClient.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/FeishuClient.scala
index 9033d8a0..27063b45 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/FeishuClient.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/FeishuClient.scala
@@ -16,17 +16,19 @@
  */
 package org.apache.seatunnel.spark.feishu
 
-import scala.collection.mutable.ArrayBuffer
-
-import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
-import org.apache.http.{HttpEntity, HttpHeaders}
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode}
 import org.apache.http.client.methods.{CloseableHttpResponse, RequestBuilder}
 import org.apache.http.impl.client.{CloseableHttpClient, HttpClients}
 import org.apache.http.util.EntityUtils
+import org.apache.http.{HttpEntity, HttpHeaders}
+import org.apache.seatunnel.common.utils.JsonUtils
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
 import org.slf4j.{Logger, LoggerFactory}
 
+import scala.collection.mutable.ArrayBuffer
+
 class FeishuClient(appId: String, appSecret: String) {
   val logger: Logger = LoggerFactory.getLogger(this.getClass)
 
@@ -34,7 +36,7 @@ class FeishuClient(appId: String, appSecret: String) {
     val url = Config.TOKEN_URL.format(appId, appSecret)
     val result = this.requestFeishuApi(url, null)
     logger.info(s"Request token and get result $result")
-    result.getString(Config.TENANT_ACCESS_TOKEN)
+    result.get(Config.TENANT_ACCESS_TOKEN).toString
   }
 
   def getSheetId(sheetToken: String, sheetNum: Int): String = {
@@ -45,40 +47,41 @@ class FeishuClient(appId: String, appSecret: String) {
         "Did not get any sheet in Feishu, please make sure there is correct sheet token!")
     }
 
-    val sheets = data.getJSONArray(Config.SHEETS)
+    val sheets = data.get(Config.SHEETS)
     if (null == sheets || sheets.size() < sheetNum) {
       throw new RuntimeException(s"The sheet $sheetNum is does not exists")
     }
 
-    val sheetInfo = sheets.getJSONObject(sheetNum - 1)
-    sheetInfo.getString(Config.SHEET_ID)
+    val sheetInfo = sheets.get(sheetNum - 1)
+    sheetInfo.get(Config.SHEET_ID).toString
   }
 
-  def getSheetData(sheetToken: String, range: String, sheetId: String): JSONArray = {
+  def getSheetData(sheetToken: String, range: String, sheetId: String): ArrayNode = {
     var rangeNew = range
     // the range format is xxx!A1:C3
     if (!"".equals(rangeNew)) {
       rangeNew = "!" + rangeNew
     }
     val url = Config.SHEET_DATA_URL.format(sheetToken, sheetId, rangeNew)
-    val data = this.requestFeishuApiAndGetData(url)
+    val data = this.requestFeishuApiAndGetData(url).asInstanceOf[ArrayNode]
     if (null == data) {
       throw new RuntimeException("The data is empty, please make sure some data in sheet.")
     }
 
-    val valueRange = data.getJSONObject(Config.VALUE_RANGE)
+    val valueRange = data.get(Config.VALUE_RANGE)
     if (null == valueRange) {
       throw new RuntimeException("The data is empty, please make sure some data in sheet.")
     }
-    valueRange.getJSONArray(Config.VALUES)
+    valueRange.get(Config.VALUES).asInstanceOf[ArrayNode]
   }
 
+  // todo: add UT
   def getDataset(
-      sheetToken: String,
-      range: String,
-      titleLineNum: Int,
-      ignoreTitleLine: Boolean,
-      sheetNum: Int): (ArrayBuffer[Row], StructType) = {
+                  sheetToken: String,
+                  range: String,
+                  titleLineNum: Int,
+                  ignoreTitleLine: Boolean,
+                  sheetNum: Int): (ArrayBuffer[Row], StructType) = {
     val sheetId = this.getSheetId(sheetToken, sheetNum)
     val values = getSheetData(sheetToken, range, sheetId)
     if (values.size() < titleLineNum) {
@@ -92,24 +95,24 @@ class FeishuClient(appId: String, appSecret: String) {
     }
 
     var schema: StructType = null
-    val schemaData = values.getJSONArray(titleLineNum - 1)
+    val schemaData = values.get(titleLineNum - 1).asInstanceOf[ArrayNode]
     val fields = ArrayBuffer[StructField]()
     for (index <- 0 until schemaData.size()) {
-      val titleName = schemaData.getString(index)
+      val titleName = schemaData.get(index)
       if (null == titleName) {
         throw new RuntimeException("The title name is not allowed null")
       }
-      val field = DataTypes.createStructField(titleName, DataTypes.StringType, true)
+      val field = DataTypes.createStructField(titleName.toString, DataTypes.StringType, true)
       fields += field
     }
     schema = DataTypes.createStructType(fields.toArray)
 
     val rows = ArrayBuffer[Row]()
     for (index <- start until values.size()) {
-      val jsonArr = values.getJSONArray(index)
+      val jsonArr = values.get(index).asInstanceOf[ArrayNode]
       val arr = ArrayBuffer[String]()
       for (indexInner <- 0 until jsonArr.size()) {
-        arr += jsonArr.getString(indexInner)
+        arr += jsonArr.get(indexInner).toString
       }
 
       val row = Row.fromSeq(arr)
@@ -118,12 +121,12 @@ class FeishuClient(appId: String, appSecret: String) {
     (rows, schema)
   }
 
-  def requestFeishuApiAndGetData(url: String): JSONObject = {
+  def requestFeishuApiAndGetData(url: String): JsonNode = {
     val result = this.requestFeishuApi(url, this.getToken)
-    result.getJSONObject(Config.DATA)
+    result.get(Config.DATA)
   }
 
-  def requestFeishuApi(url: String, token: String): JSONObject = {
+  def requestFeishuApi(url: String, token: String): ObjectNode = {
     val httpGet = RequestBuilder.get()
       .setUri(url)
       .setHeader(HttpHeaders.AUTHORIZATION, s"Bearer $token")
@@ -148,10 +151,10 @@ class FeishuClient(appId: String, appSecret: String) {
       }
     }
 
-    val result = JSON.parseObject(resultStr)
-    val code = result.getIntValue(Config.CODE)
+    val result = JsonUtils.parseObject(resultStr)
+    val code = result.get(Config.CODE).asInt()
     if (code != 0) {
-      val errorMessage = result.getString(Config.MSG)
+      val errorMessage = result.get(Config.MSG).toString
       throw new RuntimeException(
         s"Request feishu api error, the code is: $code and msg is: $errorMessage")
     }
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/pom.xml b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/pom.xml
index 75430b85..da7c82ad 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/pom.xml
@@ -32,6 +32,11 @@
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>org.mongodb.spark</groupId>
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala
index e565d414..61b2d939 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala
@@ -17,11 +17,10 @@
 package org.apache.seatunnel.spark.mongodb.source
 
 import scala.collection.JavaConversions._
-
-import com.alibaba.fastjson.JSON
 import com.mongodb.spark.MongoSpark
 import com.mongodb.spark.config.ReadConfig
 import org.apache.seatunnel.common.config.{CheckConfigUtil, CheckResult, TypesafeConfigUtils}
+import org.apache.seatunnel.common.utils.JsonUtils
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSource
 import org.apache.seatunnel.spark.utils.SparkStructTypeUtil
@@ -49,7 +48,7 @@ class MongoDB extends SparkBatchSource {
       })
 
     if (config.hasPath("schema")) {
-      val schemaJson = JSON.parseObject(config.getString("schema"))
+      val schemaJson = JsonUtils.parseObject(config.getString("schema"))
       schema = SparkStructTypeUtil.getStructType(schema, schemaJson)
     }
 
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index 9471de2a..f0964b6a 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -267,7 +267,6 @@ The text of each license is the standard Apache 2.0 license.
      (Apache 2) chill-java (com.twitter:chill-java:0.7.6 - https://github.com/twitter/chill)
      (Apache 2) chill-java (com.twitter:chill-java:0.8.4 - https://github.com/twitter/chill)
      (Apache 2) chill-java (com.twitter:chill-java:0.9.3 - https://github.com/twitter/chill)
-     (Apache 2) fastjson (com.alibaba:fastjson:1.2.80 - https://github.com/alibaba/fastjson)
      (Apache 2) opencsv (com.opencsv:opencsv:4.6 - http://opencsv.sf.net)
      (Apache 2) opencsv (net.sf.opencsv:opencsv:2.3 - http://opencsv.sf.net)
      (Apache 2) org.roaringbitmap:RoaringBitmap (org.roaringbitmap:RoaringBitmap:0.9.0 - https://github.com/RoaringBitmap/RoaringBitmap)
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 631e319c..066afbd8 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -165,7 +165,6 @@ error_prone_annotations-2.3.4.jar
 error_prone_annotations-2.8.0.jar
 esri-geometry-api-2.2.0.jar
 extendedset-0.22.1.jar
-fastjson-1.2.80.jar
 fastutil-6.5.6.jar
 fastutil-7.0.13.jar
 fastutil-8.5.4.jar