You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/07 07:40:20 UTC

[inlong] branch master updated: [INLONG-6068][Manager] Optimize the usage of ObjectMapper (#6084)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e4e34ebdc [INLONG-6068][Manager] Optimize the usage of ObjectMapper (#6084)
e4e34ebdc is described below

commit e4e34ebdc6ee6a65e2cf96f02336dc22d8a477a9
Author: ciscozhou <45...@users.noreply.github.com>
AuthorDate: Fri Oct 7 15:40:15 2022 +0800

    [INLONG-6068][Manager] Optimize the usage of ObjectMapper (#6084)
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../org/apache/inlong/manager/client/BaseExample.java  |  9 +++------
 .../inlong/manager/client/File2HBaseExample.java       | 11 ++++++-----
 .../inlong/manager/client/File2IcebergExample.java     | 15 ++++++++-------
 .../manager/client/cli/AbstractCommandRunner.java      |  9 ---------
 .../inlong/manager/client/cli/CreateCommand.java       | 11 +++++++----
 .../inlong/manager/client/cli/UpdateCommand.java       | 10 ++++++----
 .../inlong/manager/client/cli/util/PrintUtils.java     | 14 +++++---------
 .../client/api/inner/client/WorkflowClient.java        |  6 ++----
 .../manager/client/api/util/InlongGroupTransfer.java   |  8 +++-----
 .../inlong/manager/common/auth/Authentication.java     |  3 ---
 .../manager/common/auth/DefaultAuthentication.java     |  3 ++-
 .../manager/common/auth/SecretAuthentication.java      |  3 ++-
 .../manager/common/auth/SecretTokenAuthentication.java |  3 ++-
 .../manager/common/auth/TokenAuthentication.java       |  3 ++-
 .../apache/inlong/manager/common/util/JsonUtils.java   |  8 ++++++++
 .../inlong/manager/plugin/flink/FlinkOperation.java    | 18 +++++++++---------
 .../manager/plugin/listener/DeleteSortListener.java    |  8 +++-----
 .../manager/plugin/listener/DeleteStreamListener.java  | 14 +++++++-------
 .../manager/plugin/listener/RestartSortListener.java   |  8 +++-----
 .../manager/plugin/listener/RestartStreamListener.java | 15 +++++++--------
 .../manager/plugin/listener/StartupSortListener.java   |  8 +++-----
 .../manager/plugin/listener/StartupStreamListener.java |  8 +++-----
 .../manager/plugin/listener/SuspendSortListener.java   |  8 +++-----
 .../manager/plugin/listener/SuspendStreamListener.java | 16 ++++++++--------
 .../plugin/listener/DeleteSortListenerTest.java        | 11 +++++------
 .../plugin/listener/RestartSortListenerTest.java       |  7 +++----
 .../plugin/listener/StartupSortListenerTest.java       |  7 +++----
 .../plugin/listener/SuspendSortListenerTest.java       |  7 +++----
 .../manager/pojo/cluster/agent/AgentClusterDTO.java    |  8 ++------
 .../pojo/cluster/dataproxy/DataProxyClusterDTO.java    |  8 ++------
 .../manager/pojo/cluster/kafka/KafkaClusterDTO.java    |  8 ++------
 .../manager/pojo/cluster/pulsar/PulsarClusterDTO.java  |  8 ++------
 .../manager/pojo/cluster/tubemq/TubeClusterDTO.java    |  8 ++------
 .../manager/pojo/consume/pulsar/ConsumePulsarDTO.java  |  8 ++------
 .../manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java  |  8 ++------
 .../manager/pojo/group/kafka/InlongKafkaDTO.java       |  7 ++-----
 .../manager/pojo/group/pulsar/InlongPulsarDTO.java     |  8 ++------
 .../manager/pojo/node/ck/ClickHouseDataNodeDTO.java    |  9 +++------
 .../inlong/manager/pojo/node/hive/HiveDataNodeDTO.java |  8 ++------
 .../manager/pojo/node/iceberg/IcebergDataNodeDTO.java  |  8 ++------
 .../manager/pojo/node/mysql/MySQLDataNodeDTO.java      |  8 ++------
 .../inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java |  8 ++------
 .../pojo/sink/dlciceberg/DLCIcebergSinkDTO.java        |  8 ++------
 .../manager/pojo/sink/es/ElasticsearchFieldInfo.java   |  8 ++------
 .../manager/pojo/sink/es/ElasticsearchSinkDTO.java     |  8 ++------
 .../manager/pojo/sink/greenplum/GreenplumSinkDTO.java  |  8 ++------
 .../manager/pojo/sink/hbase/HBaseColumnFamilyInfo.java |  8 ++------
 .../inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java   |  8 ++------
 .../inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java     |  8 ++------
 .../inlong/manager/pojo/sink/hive/HiveSinkDTO.java     |  8 ++------
 .../manager/pojo/sink/iceberg/IcebergColumnInfo.java   |  8 ++------
 .../manager/pojo/sink/iceberg/IcebergSinkDTO.java      |  8 ++------
 .../inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java   |  8 ++------
 .../inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java   |  7 ++-----
 .../inlong/manager/pojo/sink/oracle/OracleSinkDTO.java |  7 ++-----
 .../pojo/sink/postgresql/PostgreSQLSinkDTO.java        |  8 ++------
 .../manager/pojo/sink/sqlserver/SQLServerSinkDTO.java  |  8 ++------
 .../sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java   |  8 ++------
 .../inlong/manager/pojo/source/SubSourceDTO.java       |  8 ++------
 .../pojo/source/autopush/AutoPushSourceDTO.java        |  8 ++------
 .../inlong/manager/pojo/source/file/FileSourceDTO.java |  8 ++------
 .../manager/pojo/source/kafka/KafkaSourceDTO.java      |  8 ++------
 .../manager/pojo/source/mongodb/MongoDBSourceDTO.java  |  8 ++------
 .../pojo/source/mysql/MySQLBinlogSourceDTO.java        |  8 ++------
 .../manager/pojo/source/oracle/OracleSourceDTO.java    |  8 ++------
 .../pojo/source/postgresql/PostgreSQLSourceDTO.java    |  8 ++------
 .../manager/pojo/source/pulsar/PulsarSourceDTO.java    |  8 ++------
 .../manager/pojo/source/redis/RedisSourceDTO.java      |  8 ++------
 .../pojo/source/sqlserver/SQLServerSourceDTO.java      |  8 ++------
 .../manager/pojo/source/tubemq/TubeMQSourceDTO.java    |  8 ++------
 .../resource/sort/DefaultSortConfigOperator.java       |  5 ++---
 .../inlong/manager/workflow/util/WorkflowUtils.java    |  8 ++++----
 72 files changed, 207 insertions(+), 391 deletions(-)

diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
index 1abc7ae40..9e1ddfaf2 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
@@ -17,12 +17,11 @@
 
 package org.apache.inlong.manager.client;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.Data;
 import org.apache.inlong.manager.common.auth.DefaultAuthentication;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.enums.FileFormat;
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
@@ -42,9 +41,7 @@ import java.util.Map;
 @Data
 public class BaseExample {
 
-    protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
-    // Manager web url
+    // Service url of the inlong manager
     private String serviceUrl = "127.0.0.1:8083";
     // Inlong user && passwd
     private DefaultAuthentication inlongAuth = new DefaultAuthentication("admin", "inlong");
@@ -140,4 +137,4 @@ public class BaseExample {
         return hiveSink;
     }
 
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HBaseExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HBaseExample.java
index 6771fd24f..df8e59a4a 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HBaseExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HBaseExample.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.client.api.InlongGroup;
 import org.apache.inlong.manager.client.api.InlongGroupContext;
 import org.apache.inlong.manager.client.api.InlongStreamBuilder;
 import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.hbase.HBaseColumnFamilyInfo;
@@ -109,9 +110,9 @@ public class File2HBaseExample extends BaseExample {
     }
 
     /**
-     * Create iceberg sink
+     * Create HBase sink
      */
-    public HBaseSink createHBaseSink() throws Exception {
+    public HBaseSink createHBaseSink() {
         HBaseSink sink = new HBaseSink();
 
         sink.setSinkName("{sink.name}");
@@ -130,15 +131,15 @@ public class File2HBaseExample extends BaseExample {
         // field ext param
         HBaseColumnFamilyInfo info1 = new HBaseColumnFamilyInfo();
         info1.setCfName("cf_1");
-        field1.setExtParams(OBJECT_MAPPER.writeValueAsString(info1));
+        field1.setExtParams(JsonUtils.toJsonString(info1));
 
         HBaseColumnFamilyInfo info2 = new HBaseColumnFamilyInfo();
         info2.setCfName("cf_2");
-        field2.setExtParams(OBJECT_MAPPER.writeValueAsString(info2));
+        field2.setExtParams(JsonUtils.toJsonString(info2));
 
         HBaseColumnFamilyInfo info3 = new HBaseColumnFamilyInfo();
         info3.setCfName("cf_3");
-        field3.setExtParams(OBJECT_MAPPER.writeValueAsString(info3));
+        field3.setExtParams(JsonUtils.toJsonString(info3));
 
         List<SinkField> fields = new ArrayList<>();
         fields.add(field1);
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2IcebergExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2IcebergExample.java
index 093161a01..1bb4f6f5f 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2IcebergExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2IcebergExample.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.client.api.InlongGroup;
 import org.apache.inlong.manager.client.api.InlongGroupContext;
 import org.apache.inlong.manager.client.api.InlongStreamBuilder;
 import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.iceberg.IcebergColumnInfo;
@@ -111,9 +112,9 @@ public class File2IcebergExample extends BaseExample {
     }
 
     /**
-     * Create iceberg sink
+     * Create Iceberg sink
      */
-    public IcebergSink createIcebergSink() throws Exception {
+    public IcebergSink createIcebergSink() {
         IcebergSink sink = new IcebergSink();
 
         sink.setSinkName("{sink.name}");
@@ -136,18 +137,18 @@ public class File2IcebergExample extends BaseExample {
         info1.setRequired(true);
         info1.setPartitionStrategy(IcebergPartition.BUCKET.toString());
         info1.setBucketNum(10);
-        field1.setExtParams(OBJECT_MAPPER.writeValueAsString(info1));
+        field1.setExtParams(JsonUtils.toJsonString(info1));
 
         // field3: decimal column example
         IcebergColumnInfo info3 = new IcebergColumnInfo();
         info3.setScale(5);
-        info3.setPrecision(10);  //NOTE: scale must be less than or equal to precision
-        field3.setExtParams(OBJECT_MAPPER.writeValueAsString(info3));
+        info3.setPrecision(10);  // scale must be less than or equal to precision
+        field3.setExtParams(JsonUtils.toJsonString(info3));
 
-        // field4: hour parititon example
+        // field4: hour partition example
         IcebergColumnInfo info4 = new IcebergColumnInfo();
         info4.setPartitionStrategy(IcebergPartition.HOUR.toString());
-        field4.setExtParams(OBJECT_MAPPER.writeValueAsString(info4));
+        field4.setExtParams(JsonUtils.toJsonString(info4));
 
         List<SinkField> fields = new ArrayList<>();
         fields.add(field1);
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/AbstractCommandRunner.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/AbstractCommandRunner.java
index a479460b2..aff34302d 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/AbstractCommandRunner.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/AbstractCommandRunner.java
@@ -17,21 +17,12 @@
 
 package org.apache.inlong.manager.client.cli;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.inlong.manager.common.util.JsonUtils;
-
 /**
  * The runner of command.
  * of command for creat connect by config file.
  */
 public abstract class AbstractCommandRunner {
 
-    protected final ObjectMapper objectMapper = new ObjectMapper();
-
-    AbstractCommandRunner() {
-        JsonUtils.initJsonTypeDefine(objectMapper);
-    }
-
     /**
      * Execute the specified command.
      */
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
index 2c05d560d..21c94af96 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.client.api.InlongStreamBuilder;
 import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
 import org.apache.inlong.manager.client.cli.pojo.CreateGroupConf;
 import org.apache.inlong.manager.client.cli.util.ClientUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
@@ -74,7 +75,8 @@ public class CreateCommand extends AbstractCommand {
                     content = ClientUtils.readFile(file);
                 }
                 // first extract group config from the file passed in
-                CreateGroupConf groupConf = objectMapper.readValue(content, CreateGroupConf.class);
+                CreateGroupConf groupConf = JsonUtils.parseObject(content, CreateGroupConf.class);
+                assert groupConf != null;
                 // get the corresponding inlong group, aka the task to execute
                 InlongClient inlongClient = ClientUtils.getClient();
                 InlongGroup group = inlongClient.forGroup(groupConf.getGroupInfo());
@@ -107,9 +109,10 @@ public class CreateCommand extends AbstractCommand {
         void run() {
             try {
                 String content = ClientUtils.readFile(file);
-                ClusterRequest request = objectMapper.readValue(content, ClusterRequest.class);
+                ClusterRequest request = JsonUtils.parseObject(content, ClusterRequest.class);
                 ClientUtils.initClientFactory();
                 InlongClusterClient clusterClient = ClientUtils.clientFactory.getClusterClient();
+                assert request != null;
                 Integer clusterId = clusterClient.saveCluster(request);
                 if (clusterId != null) {
                     System.out.println("Create cluster success! ID: " + clusterId);
@@ -133,7 +136,7 @@ public class CreateCommand extends AbstractCommand {
         void run() {
             try {
                 String content = ClientUtils.readFile(file);
-                ClusterTagRequest request = objectMapper.readValue(content, ClusterTagRequest.class);
+                ClusterTagRequest request = JsonUtils.parseObject(content, ClusterTagRequest.class);
                 ClientUtils.initClientFactory();
                 InlongClusterClient clusterClient = ClientUtils.clientFactory.getClusterClient();
                 Integer tagId = clusterClient.saveTag(request);
@@ -159,7 +162,7 @@ public class CreateCommand extends AbstractCommand {
         void run() {
             try {
                 String content = ClientUtils.readFile(file);
-                ClusterNodeRequest request = objectMapper.readValue(content, ClusterNodeRequest.class);
+                ClusterNodeRequest request = JsonUtils.parseObject(content, ClusterNodeRequest.class);
                 ClientUtils.initClientFactory();
                 InlongClusterClient clusterClient = ClientUtils.clientFactory.getClusterClient();
                 Integer nodeId = clusterClient.saveNode(request);
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/UpdateCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/UpdateCommand.java
index f1045f78e..11ecf1080 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/UpdateCommand.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/UpdateCommand.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.client.api.InlongClient;
 import org.apache.inlong.manager.client.api.InlongGroup;
 import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
 import org.apache.inlong.manager.client.cli.util.ClientUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
@@ -75,7 +76,7 @@ public class UpdateCommand extends AbstractCommand {
                     return;
                 }
                 // first extract group config from the file passed in
-                BaseSortConf sortConf = objectMapper.readValue(fileContent, BaseSortConf.class);
+                BaseSortConf sortConf = JsonUtils.parseObject(fileContent, BaseSortConf.class);
                 group.update(sortConf);
                 System.out.println("Update group success!");
             } catch (Exception e) {
@@ -97,7 +98,8 @@ public class UpdateCommand extends AbstractCommand {
         void run() {
             try {
                 String content = ClientUtils.readFile(file);
-                ClusterRequest request = objectMapper.readValue(content, ClusterRequest.class);
+                ClusterRequest request = JsonUtils.parseObject(content, ClusterRequest.class);
+                assert request != null;
                 ClientUtils.initClientFactory();
                 InlongClusterClient clusterClient = ClientUtils.clientFactory.getClusterClient();
                 if (clusterClient.update(request)) {
@@ -122,7 +124,7 @@ public class UpdateCommand extends AbstractCommand {
         void run() {
             try {
                 String content = ClientUtils.readFile(file);
-                ClusterTagRequest request = objectMapper.readValue(content, ClusterTagRequest.class);
+                ClusterTagRequest request = JsonUtils.parseObject(content, ClusterTagRequest.class);
                 ClientUtils.initClientFactory();
                 InlongClusterClient clusterClient = ClientUtils.clientFactory.getClusterClient();
                 if (clusterClient.updateTag(request)) {
@@ -147,7 +149,7 @@ public class UpdateCommand extends AbstractCommand {
         void run() {
             try {
                 String content = ClientUtils.readFile(file);
-                ClusterNodeRequest request = objectMapper.readValue(content, ClusterNodeRequest.class);
+                ClusterNodeRequest request = JsonUtils.parseObject(content, ClusterNodeRequest.class);
                 ClientUtils.initClientFactory();
                 InlongClusterClient clusterClient = ClientUtils.clientFactory.getClusterClient();
                 if (clusterClient.updateNode(request)) {
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java
index 0627e732f..3a67c8c6d 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java
@@ -17,11 +17,9 @@
 
 package org.apache.inlong.manager.client.cli.util;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import java.lang.reflect.Field;
 import java.text.SimpleDateFormat;
@@ -38,8 +36,6 @@ public class PrintUtils {
     private static final String horizontal = "—";
     private static final String vertical = "|";
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     /**
      * Print a list info to console with format.
      */
@@ -57,8 +53,8 @@ public class PrintUtils {
      */
     public static <T> void printJson(T item) {
         try {
-            System.out.println(OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(item));
-        } catch (JsonProcessingException e) {
+            System.out.println(JsonUtils.toPrettyJsonString(item));
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
@@ -112,10 +108,10 @@ public class PrintUtils {
      */
     private static <T, K> List<K> copyObject(List<T> list, Class<K> clazz) {
         List<K> newList = new ArrayList<>();
-        OBJECT_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
         list.forEach(item -> {
             try {
-                K value = OBJECT_MAPPER.readValue(OBJECT_MAPPER.writeValueAsString(item), clazz);
+                K value = JsonUtils.parseObject(JsonUtils.toJsonString(item), clazz);
+                assert value != null;
                 parseStatus(value);
                 newList.add(value);
             } catch (Exception e) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java
index aa3bfa462..11f57c7e2 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.manager.client.api.inner.client;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
@@ -47,7 +46,6 @@ import java.util.Map;
 public class WorkflowClient {
 
     private final WorkflowApi workflowApi;
-    private final ObjectMapper objectMapper = new ObjectMapper();
 
     public WorkflowClient(ClientConfiguration configuration) {
         workflowApi = ClientUtils.createRetrofit(configuration).create(WorkflowApi.class);
@@ -61,11 +59,11 @@ public class WorkflowClient {
      * @return workflow result info
      */
     public WorkflowResult startInlongGroup(int taskId, ApplyGroupProcessForm groupProcessForm) {
-        ObjectNode workflowTaskOperation = objectMapper.createObjectNode();
+        ObjectNode workflowTaskOperation = JsonUtils.OBJECT_MAPPER.createObjectNode();
         workflowTaskOperation.putPOJO("transferTo", Lists.newArrayList());
         workflowTaskOperation.put("remark", "approved by system");
 
-        ObjectNode groupApproveForm = objectMapper.createObjectNode();
+        ObjectNode groupApproveForm = JsonUtils.OBJECT_MAPPER.createObjectNode();
         groupApproveForm.putPOJO("groupApproveInfo", groupProcessForm.getGroupInfo());
         groupApproveForm.putPOJO("streamApproveInfoList", groupProcessForm.getStreamInfoList());
         groupApproveForm.put("formName", "InlongGroupApproveForm");
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index d079a406b..286cd2b9c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.manager.client.api.util;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.auth.Authentication;
@@ -25,6 +24,7 @@ import org.apache.inlong.manager.common.auth.Authentication.AuthType;
 import org.apache.inlong.manager.common.auth.SecretTokenAuthentication;
 import org.apache.inlong.manager.common.auth.TokenAuthentication;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
@@ -41,8 +41,6 @@ import java.util.List;
  */
 public class InlongGroupTransfer {
 
-    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     /**
      * Create inlong group info from group config.
      */
@@ -130,7 +128,7 @@ public class InlongGroupTransfer {
             InlongGroupExtInfo flinkProperties = new InlongGroupExtInfo();
             flinkProperties.setKeyName(InlongConstants.SORT_PROPERTIES);
             try {
-                flinkProperties.setKeyValue(OBJECT_MAPPER.writeValueAsString(flinkSortConf.getProperties()));
+                flinkProperties.setKeyValue(JsonUtils.toJsonString(flinkSortConf.getProperties()));
             } catch (Exception e) {
                 throw new RuntimeException("get json for sort properties error: " + e.getMessage());
             }
@@ -156,7 +154,7 @@ public class InlongGroupTransfer {
             InlongGroupExtInfo flinkProperties = new InlongGroupExtInfo();
             flinkProperties.setKeyName(InlongConstants.SORT_PROPERTIES);
             try {
-                flinkProperties.setKeyValue(OBJECT_MAPPER.writeValueAsString(userDefinedSortConf.getProperties()));
+                flinkProperties.setKeyValue(JsonUtils.toJsonString(userDefinedSortConf.getProperties()));
             } catch (Exception e) {
                 throw new RuntimeException("get json for sort properties error: " + e.getMessage());
             }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
index 047983f26..ae428048f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.manager.common.auth;
 
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.databind.ObjectMapper;
 
 import java.util.Locale;
 import java.util.Map;
@@ -26,8 +25,6 @@ import java.util.Map;
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type")
 public interface Authentication {
 
-    ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     AuthType getAuthType();
 
     void configure(Map<String, String> properties);
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/DefaultAuthentication.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/DefaultAuthentication.java
index 3ef45a874..8185622cd 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/DefaultAuthentication.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/DefaultAuthentication.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 
 import java.util.Map;
@@ -62,7 +63,7 @@ public class DefaultAuthentication implements Authentication {
 
     @Override
     public String toString() {
-        ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
+        ObjectNode objectNode = JsonUtils.OBJECT_MAPPER.createObjectNode();
         objectNode.put(USERNAME, this.getUsername());
         objectNode.put(PASSWORD, this.getPassword());
         return objectNode.toString();
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretAuthentication.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretAuthentication.java
index e8d69086c..ec542701b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretAuthentication.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretAuthentication.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 
 import java.util.Map;
@@ -63,7 +64,7 @@ public class SecretAuthentication implements Authentication {
 
     @Override
     public String toString() {
-        ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
+        ObjectNode objectNode = JsonUtils.OBJECT_MAPPER.createObjectNode();
         objectNode.put(SECRET_ID, this.getSecretId());
         objectNode.put(SECRET_KEY, this.getSecretKey());
         return objectNode.toString();
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretTokenAuthentication.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretTokenAuthentication.java
index 31867493a..00aa7508d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretTokenAuthentication.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/SecretTokenAuthentication.java
@@ -22,6 +22,7 @@ import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import java.util.Map;
 
@@ -57,7 +58,7 @@ public class SecretTokenAuthentication extends SecretAuthentication {
     @SneakyThrows
     @Override
     public String toString() {
-        ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
+        ObjectNode objectNode = JsonUtils.OBJECT_MAPPER.createObjectNode();
         objectNode.put(SECRET_ID, this.getSecretId());
         objectNode.put(SECRET_KEY, this.getSecretKey());
         objectNode.put(SECRET_TOKEN, this.getSToken());
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/TokenAuthentication.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/TokenAuthentication.java
index efaab3fb2..6caf26f6e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/TokenAuthentication.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/TokenAuthentication.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 
 import java.util.Map;
@@ -54,7 +55,7 @@ public class TokenAuthentication implements Authentication {
 
     @Override
     public String toString() {
-        ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
+        ObjectNode objectNode = JsonUtils.OBJECT_MAPPER.createObjectNode();
         objectNode.put(TOKEN, this.getToken());
         return objectNode.toString();
     }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
index f53ac723d..c5feeb237 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
@@ -63,6 +63,14 @@ public class JsonUtils {
         return OBJECT_MAPPER.writeValueAsString(object);
     }
 
+    /**
+     * Transform Java object to pretty JSON string
+     */
+    @SneakyThrows
+    public static String toPrettyJsonString(Object object) {
+        return OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(object);
+    }
+
     /**
      * Transform Java object to JSON byte
      */
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
index b2afe90f5..2232606ae 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.plugin.flink;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -59,7 +58,6 @@ import static org.apache.flink.api.common.JobStatus.RUNNING;
 @Slf4j
 public class FlinkOperation {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final String CONFIG_FILE = "application.properties";
     private static final String CONNECTOR_DIR_KEY = "sort.connector.dir";
     private static final String JOB_TERMINATED_MSG = "the job not found by id %s, "
@@ -185,18 +183,20 @@ public class FlinkOperation {
                 .get(0).get(InlongConstants.RELATIONS);
         List<Pair<List<String>, List<String>>> nodeIdsPairList = new ArrayList<>();
         for (int i = 0; i < relations.size(); i++) {
-            List<String> inputIds = OBJECT_MAPPER.convertValue(relations.get(i).get(InlongConstants.INPUTS),
-                    new TypeReference<List<String>>() {
-                    }).stream().collect(Collectors.toList());
+            List<String> inputIds = new ArrayList<>(
+                    JsonUtils.OBJECT_MAPPER.convertValue(relations.get(i).get(InlongConstants.INPUTS),
+                            new TypeReference<List<String>>() {
+                            }));
             if (CollectionUtils.isEmpty(inputIds)) {
                 String message = String.format("input nodeId %s cannot be empty", inputIds);
                 log.error(message);
                 throw new Exception(message);
             }
 
-            List<String> outputIds = OBJECT_MAPPER.convertValue(relations.get(i).get(InlongConstants.OUTPUTS),
-                    new TypeReference<List<String>>() {
-                    }).stream().collect(Collectors.toList());
+            List<String> outputIds = new ArrayList<>(
+                    JsonUtils.OBJECT_MAPPER.convertValue(relations.get(i).get(InlongConstants.OUTPUTS),
+                            new TypeReference<List<String>>() {
+                            }));
             if (CollectionUtils.isEmpty(outputIds)) {
                 String message = String.format("output nodeId %s cannot be empty", outputIds);
                 log.error(message);
@@ -257,7 +257,7 @@ public class FlinkOperation {
             checkNodeIds(dataflow);
             JsonNode nodes = JsonUtils.parseTree(dataflow).get(InlongConstants.STREAMS)
                     .get(0).get(InlongConstants.NODES);
-            List<String> types = OBJECT_MAPPER.convertValue(nodes,
+            List<String> types = JsonUtils.OBJECT_MAPPER.convertValue(nodes,
                     new TypeReference<List<Map<String, Object>>>() {
                     }).stream().map(s -> s.get(InlongConstants.NODE_TYPE).toString()).collect(Collectors.toList());
             nodeTypes.addAll(types);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index 657a0e4fc..6e65f30dc 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -18,12 +18,12 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -47,8 +47,6 @@ import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
 @Slf4j
 public class DeleteSortListener implements SortOperateListener {
 
-    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
@@ -98,8 +96,8 @@ public class DeleteSortListener implements SortOperateListener {
             return ListenerResult.fail(message);
         }
 
-        Map<String, String> result = OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
-                new TypeReference<Map<String, String>>() {
+        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
                 });
         kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index 33b8c3057..7c50d9738 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -18,12 +18,12 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -49,8 +49,6 @@ import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
 @Slf4j
 public class DeleteStreamListener implements SortOperateListener {
 
-    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
@@ -87,13 +85,15 @@ public class DeleteStreamListener implements SortOperateListener {
         InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo();
         List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
         log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
-        final String groupId = streamInfo.getInlongGroupId();
-        final String streamId = streamInfo.getInlongStreamId();
+
         Map<String, String> kvConf = new HashMap<>();
         groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         streamExtList.forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
+
+        final String groupId = streamInfo.getInlongGroupId();
+        final String streamId = streamInfo.getInlongStreamId();
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format(
@@ -103,8 +103,8 @@ public class DeleteStreamListener implements SortOperateListener {
             return ListenerResult.fail(message);
         }
 
-        Map<String, String> result = OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
-                new TypeReference<Map<String, String>>() {
+        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
                 });
         kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 382e66851..83abc7d57 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -18,12 +18,12 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -48,8 +48,6 @@ import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
 @Slf4j
 public class RestartSortListener implements SortOperateListener {
 
-    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
@@ -99,8 +97,8 @@ public class RestartSortListener implements SortOperateListener {
             return ListenerResult.fail(message);
         }
 
-        Map<String, String> result = OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
-                new TypeReference<Map<String, String>>() {
+        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
                 });
         kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index b899b5f55..f0469d188 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -18,12 +18,12 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -50,8 +50,6 @@ import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
 @Slf4j
 public class RestartStreamListener implements SortOperateListener {
 
-    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
@@ -88,13 +86,14 @@ public class RestartStreamListener implements SortOperateListener {
         InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo();
         List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
         log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
-        final String groupId = streamInfo.getInlongGroupId();
-        final String streamId = streamInfo.getInlongStreamId();
+
         Map<String, String> kvConf = new HashMap<>();
         groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
-        streamExtList.stream().forEach(extInfo -> {
+        streamExtList.forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
+        final String groupId = streamInfo.getInlongGroupId();
+        final String streamId = streamInfo.getInlongStreamId();
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format(
@@ -104,8 +103,8 @@ public class RestartStreamListener implements SortOperateListener {
             return ListenerResult.fail(message);
         }
 
-        Map<String, String> result = OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
-                new TypeReference<Map<String, String>>() {
+        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
                 });
         kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 70e1e2aca..1132e4078 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -18,12 +18,12 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -49,8 +49,6 @@ import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
 @Slf4j
 public class StartupSortListener implements SortOperateListener {
 
-    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
@@ -104,8 +102,8 @@ public class StartupSortListener implements SortOperateListener {
                 InlongGroupExtInfo::getKeyValue));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isNotEmpty(sortExt)) {
-            Map<String, String> result = OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
-                    new TypeReference<Map<String, String>>() {
+            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
                     });
             kvConf.putAll(result);
         }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
index 4713927b6..4c0a19c08 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
@@ -18,13 +18,13 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -51,8 +51,6 @@ import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
 @Slf4j
 public class StartupStreamListener implements SortOperateListener {
 
-    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
@@ -104,8 +102,8 @@ public class StartupStreamListener implements SortOperateListener {
         });
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isNotEmpty(sortExt)) {
-            Map<String, String> result = OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
-                    new TypeReference<Map<String, String>>() {
+            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
                     });
             kvConf.putAll(result);
         }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index a7e03e84c..d8a17d2d5 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -18,12 +18,12 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -47,8 +47,6 @@ import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
 @Slf4j
 public class SuspendSortListener implements SortOperateListener {
 
-    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
@@ -98,8 +96,8 @@ public class SuspendSortListener implements SortOperateListener {
             return ListenerResult.fail(message);
         }
 
-        Map<String, String> result = OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
-                new TypeReference<Map<String, String>>() {
+        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
                 });
         kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
index ea647bf4a..672590d67 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
@@ -18,12 +18,12 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -49,8 +49,6 @@ import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStack
 @Slf4j
 public class SuspendStreamListener implements SortOperateListener {
 
-    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
@@ -87,13 +85,15 @@ public class SuspendStreamListener implements SortOperateListener {
         InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo();
         List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
         log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
-        final String groupId = streamInfo.getInlongGroupId();
-        final String streamId = streamInfo.getInlongStreamId();
+
         Map<String, String> kvConf = new HashMap<>();
         groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
-        streamExtList.stream().forEach(extInfo -> {
+        streamExtList.forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
+
+        final String groupId = streamInfo.getInlongGroupId();
+        final String streamId = streamInfo.getInlongStreamId();
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format(
@@ -103,8 +103,8 @@ public class SuspendStreamListener implements SortOperateListener {
             return ListenerResult.fail(message);
         }
 
-        Map<String, String> result = OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
-                new TypeReference<Map<String, String>>() {
+        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
                 });
         kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
index e574a5880..ca579bcac 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
@@ -17,12 +17,12 @@
 
 package org.apache.inlong.manager.plugin.listener;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.junit.jupiter.api.Test;
 
@@ -37,7 +37,7 @@ import java.util.Map;
 public class DeleteSortListenerTest {
 
     @Test
-    public void testListener() throws Exception {
+    public void testListener() {
         WorkflowContext context = new WorkflowContext();
         GroupResourceProcessForm groupResourceProcessForm = new GroupResourceProcessForm();
         context.setProcessForm(groupResourceProcessForm);
@@ -53,9 +53,8 @@ public class DeleteSortListenerTest {
 
         InlongGroupExtInfo inlongGroupExtInfo2 = new InlongGroupExtInfo();
         inlongGroupExtInfo2.setKeyName(InlongConstants.SORT_PROPERTIES);
-        ObjectMapper objectMapper = new ObjectMapper();
         Map<String, String> sortProperties = new HashMap<>(16);
-        String sortStr = objectMapper.writeValueAsString(sortProperties);
+        String sortStr = JsonUtils.toJsonString(sortProperties);
         inlongGroupExtInfo2.setKeyValue(sortStr);
         inlongGroupExtInfos.add(inlongGroupExtInfo2);
 
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
index 5699f36c9..1ec594feb 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.inlong.manager.plugin.listener;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
@@ -36,7 +36,7 @@ import java.util.Map;
 public class RestartSortListenerTest {
 
     @Test
-    public void testListener() throws Exception {
+    public void testListener() {
         WorkflowContext context = new WorkflowContext();
         GroupResourceProcessForm groupResourceProcessForm = new GroupResourceProcessForm();
         context.setProcessForm(groupResourceProcessForm);
@@ -52,9 +52,8 @@ public class RestartSortListenerTest {
 
         InlongGroupExtInfo inlongGroupExtInfo2 = new InlongGroupExtInfo();
         inlongGroupExtInfo2.setKeyName(InlongConstants.SORT_PROPERTIES);
-        ObjectMapper objectMapper = new ObjectMapper();
         Map<String, String> sortProperties = new HashMap<>(16);
-        String sortStr = objectMapper.writeValueAsString(sortProperties);
+        String sortStr = JsonUtils.toJsonString(sortProperties);
         inlongGroupExtInfo2.setKeyValue(sortStr);
         inlongGroupExtInfoList.add(inlongGroupExtInfo2);
 
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
index d3a43edce..9f0d97847 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.inlong.manager.plugin.listener;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
@@ -36,7 +36,7 @@ import java.util.Map;
 public class StartupSortListenerTest {
 
     @Test
-    public void testListener() throws Exception {
+    public void testListener() {
         WorkflowContext context = new WorkflowContext();
         GroupResourceProcessForm groupResourceForm = new GroupResourceProcessForm();
         context.setProcessForm(groupResourceForm);
@@ -52,9 +52,8 @@ public class StartupSortListenerTest {
 
         InlongGroupExtInfo inlongGroupExtInfo2 = new InlongGroupExtInfo();
         inlongGroupExtInfo2.setKeyName(InlongConstants.SORT_PROPERTIES);
-        ObjectMapper objectMapper = new ObjectMapper();
         Map<String, String> sortProperties = new HashMap<>(16);
-        String sortStr = objectMapper.writeValueAsString(sortProperties);
+        String sortStr = JsonUtils.toJsonString(sortProperties);
         inlongGroupExtInfo2.setKeyValue(sortStr);
         inlongGroupExtInfos.add(inlongGroupExtInfo2);
 
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
index a88937def..ebd1306c5 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.inlong.manager.plugin.listener;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
@@ -36,7 +36,7 @@ import java.util.Map;
 public class SuspendSortListenerTest {
 
     @Test
-    public void testListener() throws Exception {
+    public void testListener() {
         WorkflowContext context = new WorkflowContext();
         GroupResourceProcessForm groupResourceForm = new GroupResourceProcessForm();
         context.setProcessForm(groupResourceForm);
@@ -52,9 +52,8 @@ public class SuspendSortListenerTest {
 
         InlongGroupExtInfo inlongGroupExtInfo2 = new InlongGroupExtInfo();
         inlongGroupExtInfo2.setKeyName(InlongConstants.SORT_PROPERTIES);
-        ObjectMapper objectMapper = new ObjectMapper();
         Map<String, String> sortProperties = new HashMap<>(16);
-        String sortStr = objectMapper.writeValueAsString(sortProperties);
+        String sortStr = JsonUtils.toJsonString(sortProperties);
         inlongGroupExtInfo2.setKeyValue(sortStr);
         inlongGroupExtInfos.add(inlongGroupExtInfo2);
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterDTO.java
index ad79759ff..6e923fb97 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.cluster.agent;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -27,6 +25,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 
@@ -40,9 +39,6 @@ import javax.validation.constraints.NotNull;
 @ApiModel("Agent cluster info")
 public class AgentClusterDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
-            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
     @ApiModelProperty(value = "Version number of the server list collected by the cluster")
     private Integer serverVersion;
 
@@ -60,7 +56,7 @@ public class AgentClusterDTO {
      */
     public static AgentClusterDTO getFromJson(@NotNull String extParams) {
         try {
-            return OBJECT_MAPPER.readValue(extParams, AgentClusterDTO.class);
+            return JsonUtils.parseObject(extParams, AgentClusterDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/dataproxy/DataProxyClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/dataproxy/DataProxyClusterDTO.java
index c5dc98fcd..c87784ea1 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/dataproxy/DataProxyClusterDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/dataproxy/DataProxyClusterDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.cluster.dataproxy;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -27,6 +25,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 
@@ -40,8 +39,6 @@ import javax.validation.constraints.NotNull;
 @ApiModel("DataProxy cluster info")
 public class DataProxyClusterDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
-
     @ApiModelProperty("Is the DataProxy cluster an intranet? 0: no, 1: yes")
     private Integer isIntranet = 1;
 
@@ -67,8 +64,7 @@ public class DataProxyClusterDTO {
      */
     public static DataProxyClusterDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, DataProxyClusterDTO.class);
+            return JsonUtils.parseObject(extParams, DataProxyClusterDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
index 79b4f9039..620385a1d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
@@ -17,14 +17,13 @@
 
 package org.apache.inlong.manager.pojo.cluster.kafka;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModel;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 
@@ -37,9 +36,6 @@ import javax.validation.constraints.NotNull;
 @ApiModel("Kafka cluster info")
 public class KafkaClusterDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
-            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); // thread safe
-
     /**
      * Get the dto instance from the request
      */
@@ -53,7 +49,7 @@ public class KafkaClusterDTO {
      */
     public static KafkaClusterDTO getFromJson(@NotNull String extParams) {
         try {
-            return OBJECT_MAPPER.readValue(extParams, KafkaClusterDTO.class);
+            return JsonUtils.parseObject(extParams, KafkaClusterDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
index 13c317ce5..b807b7635 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.cluster.pulsar;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -27,6 +25,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 
@@ -40,8 +39,6 @@ import javax.validation.constraints.NotNull;
 @ApiModel("Pulsar cluster info")
 public class PulsarClusterDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
-
     @ApiModelProperty(value = "Pulsar admin URL, such as: http://127.0.0.1:8080",
             notes = "Pulsar service URL is the 'url' field of the cluster")
     private String adminUrl;
@@ -65,8 +62,7 @@ public class PulsarClusterDTO {
      */
     public static PulsarClusterDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, PulsarClusterDTO.class);
+            return JsonUtils.parseObject(extParams, PulsarClusterDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
index 6e460c95b..dda32da10 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
@@ -18,8 +18,6 @@
 
 package org.apache.inlong.manager.pojo.cluster.tubemq;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -28,6 +26,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotBlank;
 import javax.validation.constraints.NotNull;
@@ -42,8 +41,6 @@ import javax.validation.constraints.NotNull;
 @ApiModel("TubeMQ cluster info")
 public class TubeClusterDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
-
     @NotBlank(message = "masterWebUrl cannot be blank")
     @ApiModelProperty(value = "Master Web URL http://120.0.0.1:8080",
             notes = "TubeMQ master RPC URL is the 'url' field of the cluster")
@@ -54,8 +51,7 @@ public class TubeClusterDTO {
      */
     public static TubeClusterDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, TubeClusterDTO.class);
+            return JsonUtils.parseObject(extParams, TubeClusterDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java
index 5e6388922..f096e6f44 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.consume.pulsar;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -27,6 +25,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 
@@ -40,9 +39,6 @@ import javax.validation.constraints.NotNull;
 @ApiModel("Inlong group dto of Pulsar")
 public class ConsumePulsarDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
-            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
     @ApiModelProperty("Whether to configure the dead letter queue, 0: not configure, 1: configure")
     private Integer isDlq;
 
@@ -72,7 +68,7 @@ public class ConsumePulsarDTO {
      */
     public static ConsumePulsarDTO getFromJson(@NotNull String extParams) {
         try {
-            return OBJECT_MAPPER.readValue(extParams, ConsumePulsarDTO.class);
+            return JsonUtils.parseObject(extParams, ConsumePulsarDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.CONSUMER_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java
index 6b0b313d5..c16c8021a 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java
@@ -17,14 +17,13 @@
 
 package org.apache.inlong.manager.pojo.consume.tubemq;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModel;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 
@@ -37,9 +36,6 @@ import javax.validation.constraints.NotNull;
 @ApiModel("Inlong group info of TubeMQ")
 public class ConsumeTubeMQDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
-            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
     // no fields
 
     /**
@@ -54,7 +50,7 @@ public class ConsumeTubeMQDTO {
      */
     public static ConsumeTubeMQDTO getFromJson(@NotNull String extParams) {
         try {
-            return OBJECT_MAPPER.readValue(extParams, ConsumeTubeMQDTO.class);
+            return JsonUtils.parseObject(extParams, ConsumeTubeMQDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.CONSUMER_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
index 8d5d3c09c..62a94e722 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.group.kafka;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModel;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 
@@ -39,8 +38,6 @@ import javax.validation.constraints.NotNull;
 @ApiModel("Inlong group info for Kafka")
 public class InlongKafkaDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
-            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); // thread safe
     // partition number
     private Integer numPartitions;
     // replicationFactor number
@@ -67,7 +64,7 @@ public class InlongKafkaDTO {
      */
     public static InlongKafkaDTO getFromJson(@NotNull String extParams) {
         try {
-            return OBJECT_MAPPER.readValue(extParams, InlongKafkaDTO.class);
+            return JsonUtils.parseObject(extParams, InlongKafkaDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
index 28e0fba3a..5f42fe0e5 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.group.pulsar;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -27,6 +25,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 
@@ -40,8 +39,6 @@ import javax.validation.constraints.NotNull;
 @ApiModel("Inlong group info for Pulsar")
 public class InlongPulsarDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
-
     @ApiModelProperty(value = "Queue model, parallel: multiple partitions, high throughput, out-of-order messages;"
             + "serial: single partition, low throughput, and orderly messages")
     private String queueModule;
@@ -100,8 +97,7 @@ public class InlongPulsarDTO {
      */
     public static InlongPulsarDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, InlongPulsarDTO.class);
+            return JsonUtils.parseObject(extParams, InlongPulsarDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java
index 94bad14d8..8843db2ae 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java
@@ -17,13 +17,13 @@
 
 package org.apache.inlong.manager.pojo.node.ck;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModel;
 import lombok.Builder;
 import lombok.Data;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
 import javax.validation.constraints.NotNull;
 
 /**
@@ -34,8 +34,6 @@ import javax.validation.constraints.NotNull;
 @ApiModel("ClickHouse data node info")
 public class ClickHouseDataNodeDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
-
     /**
      * Get the dto instance from the request
      */
@@ -48,8 +46,7 @@ public class ClickHouseDataNodeDTO {
      */
     public static ClickHouseDataNodeDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, ClickHouseDataNodeDTO.class);
+            return JsonUtils.parseObject(extParams, ClickHouseDataNodeDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
index 3c75602f6..74b57864d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.node.hive;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -27,6 +25,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,8 +43,6 @@ public class HiveDataNodeDTO {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(HiveDataNodeDTO.class);
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
-
     @ApiModelProperty("Version for Hive, such as: 3.2.1")
     private String hiveVersion;
 
@@ -79,8 +76,7 @@ public class HiveDataNodeDTO {
      */
     public static HiveDataNodeDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, HiveDataNodeDTO.class);
+            return JsonUtils.parseObject(extParams, HiveDataNodeDTO.class);
         } catch (Exception e) {
             LOGGER.error("Failed to extract additional parameters for hive data node: ", e);
             throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeDTO.java
index b514866f2..d1aada9ec 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.node.iceberg;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -27,6 +25,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,8 +43,6 @@ public class IcebergDataNodeDTO {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(IcebergDataNodeDTO.class);
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
-
     @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
     @Builder.Default
     private String catalogType = "HIVE";
@@ -68,8 +65,7 @@ public class IcebergDataNodeDTO {
      */
     public static IcebergDataNodeDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, IcebergDataNodeDTO.class);
+            return JsonUtils.parseObject(extParams, IcebergDataNodeDTO.class);
         } catch (Exception e) {
             LOGGER.error("Failed to extract additional parameters for iceberg data node: ", e);
             throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java
index ddba97a9e..4ed05e257 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.node.mysql;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -27,6 +25,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,9 +43,6 @@ public class MySQLDataNodeDTO {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDataNodeDTO.class);
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
-            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
     @ApiModelProperty("URL of backup DB server")
     private String backupUrl;
 
@@ -64,7 +60,7 @@ public class MySQLDataNodeDTO {
      */
     public static MySQLDataNodeDTO getFromJson(@NotNull String extParams) {
         try {
-            return OBJECT_MAPPER.readValue(extParams, MySQLDataNodeDTO.class);
+            return JsonUtils.parseObject(extParams, MySQLDataNodeDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java
index f229c020c..3f5d020cb 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.ck;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -28,6 +26,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.AESUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.nio.charset.StandardCharsets;
@@ -43,8 +42,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class ClickHouseSinkDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("JDBC URL of the ClickHouse server")
     private String jdbcUrl;
 
@@ -134,8 +131,7 @@ public class ClickHouseSinkDTO {
 
     public static ClickHouseSinkDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, ClickHouseSinkDTO.class).decryptPassword();
+            return JsonUtils.parseObject(extParams, ClickHouseSinkDTO.class).decryptPassword();
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/dlciceberg/DLCIcebergSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/dlciceberg/DLCIcebergSinkDTO.java
index 82a4b0874..63253ff16 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/dlciceberg/DLCIcebergSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/dlciceberg/DLCIcebergSinkDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.dlciceberg;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.Map;
@@ -39,8 +38,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class DLCIcebergSinkDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Catalog URI of the DLCIceberg server")
     private String catalogUri;
 
@@ -78,8 +75,7 @@ public class DLCIcebergSinkDTO {
      */
     public static DLCIcebergSinkDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, DLCIcebergSinkDTO.class);
+            return JsonUtils.parseObject(extParams, DLCIcebergSinkDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchFieldInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchFieldInfo.java
index 58f61eeff..82a280a41 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchFieldInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchFieldInfo.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.es;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -27,6 +25,7 @@ import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 
@@ -36,8 +35,6 @@ import javax.validation.constraints.NotNull;
 @AllArgsConstructor
 public class ElasticsearchFieldInfo {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Elasticsearch Field name")
     private String name;
 
@@ -64,8 +61,7 @@ public class ElasticsearchFieldInfo {
             return new ElasticsearchFieldInfo();
         }
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, ElasticsearchFieldInfo.class);
+            return JsonUtils.parseObject(extParams, ElasticsearchFieldInfo.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
index cf9e150dc..2121f237a 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.es;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -28,6 +26,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.AESUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.nio.charset.StandardCharsets;
@@ -42,8 +41,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class ElasticsearchSinkDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Host of the Elasticsearch server")
     private String host;
 
@@ -117,8 +114,7 @@ public class ElasticsearchSinkDTO {
      */
     public static ElasticsearchSinkDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, ElasticsearchSinkDTO.class).decryptPassword();
+            return JsonUtils.parseObject(extParams, ElasticsearchSinkDTO.class).decryptPassword();
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
index 89571e84b..03f77bfe4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.greenplum;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.List;
@@ -40,8 +39,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class GreenplumSinkDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("JDBC URL of Greenplum server, such as: jdbc:postgresql://host:port/database")
     private String jdbcUrl;
 
@@ -79,8 +76,7 @@ public class GreenplumSinkDTO {
      */
     public static GreenplumSinkDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, GreenplumSinkDTO.class);
+            return JsonUtils.parseObject(extParams, GreenplumSinkDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseColumnFamilyInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseColumnFamilyInfo.java
index f20865899..6284ed377 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseColumnFamilyInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseColumnFamilyInfo.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.hbase;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -27,6 +25,7 @@ import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 
@@ -39,8 +38,6 @@ import javax.validation.constraints.NotNull;
 @AllArgsConstructor
 public class HBaseColumnFamilyInfo {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Column family name")
     private String cfName;
 
@@ -55,8 +52,7 @@ public class HBaseColumnFamilyInfo {
             return new HBaseColumnFamilyInfo();
         }
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, HBaseColumnFamilyInfo.class);
+            return JsonUtils.parseObject(extParams, HBaseColumnFamilyInfo.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java
index 34f09f81d..bbdd924e3 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.hbase;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.List;
@@ -40,8 +39,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class HBaseSinkDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Target namespace")
     private String namespace;
 
@@ -91,8 +88,7 @@ public class HBaseSinkDTO {
      */
     public static HBaseSinkDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, HBaseSinkDTO.class);
+            return JsonUtils.parseObject(extParams, HBaseSinkDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java
index 5d94a59f2..25b604bd8 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.hdfs;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.List;
@@ -40,8 +39,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class HDFSSinkDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("File format, support: TextFile, RCFile, SequenceFile, Avro")
     private String fileFormat;
 
@@ -83,8 +80,7 @@ public class HDFSSinkDTO {
      */
     public static HDFSSinkDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, HDFSSinkDTO.class);
+            return JsonUtils.parseObject(extParams, HDFSSinkDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java
index 008e16839..4111cbf88 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.hive;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -29,6 +27,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.AESUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.nio.charset.StandardCharsets;
@@ -44,8 +43,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class HiveSinkDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
-
     @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
     private String jdbcUrl;
 
@@ -129,8 +126,7 @@ public class HiveSinkDTO {
      */
     public static HiveSinkDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, HiveSinkDTO.class).decryptPassword();
+            return JsonUtils.parseObject(extParams, HiveSinkDTO.class).decryptPassword();
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergColumnInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergColumnInfo.java
index 58f8cac4c..e814f5d72 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergColumnInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergColumnInfo.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.iceberg;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -27,6 +25,7 @@ import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 
 /**
@@ -38,8 +37,6 @@ import org.apache.inlong.manager.common.exceptions.BusinessException;
 @AllArgsConstructor
 public class IcebergColumnInfo {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Length of fixed type")
     private Integer length;
 
@@ -72,8 +69,7 @@ public class IcebergColumnInfo {
             return new IcebergColumnInfo();
         }
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, IcebergColumnInfo.class);
+            return JsonUtils.parseObject(extParams, IcebergColumnInfo.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
index 678f2c160..800da63b2 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.iceberg;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.List;
@@ -40,8 +39,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class IcebergSinkDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
     @Builder.Default
     private String catalogType = "HIVE";
@@ -92,8 +89,7 @@ public class IcebergSinkDTO {
 
     public static IcebergSinkDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, IcebergSinkDTO.class);
+            return JsonUtils.parseObject(extParams, IcebergSinkDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
index 376d0a94d..3de6ca4fd 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.kafka;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.Map;
@@ -39,8 +38,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class KafkaSinkDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Kafka bootstrap servers")
     private String bootstrapServers;
 
@@ -80,8 +77,7 @@ public class KafkaSinkDTO {
 
     public static KafkaSinkDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, KafkaSinkDTO.class);
+            return JsonUtils.parseObject(extParams, KafkaSinkDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
index e2f206212..21604f777 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.mysql;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import io.swagger.annotations.ApiModelProperty;
@@ -29,6 +27,7 @@ import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +49,6 @@ public class MySQLSinkDTO {
      */
     private static final String SENSITIVE_PARAM_TRUE = "autoDeserialize=true";
     private static final String SENSITIVE_PARAM_FALSE = "autoDeserialize=false";
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final Logger LOGGER = LoggerFactory.getLogger(MySQLSinkDTO.class);
 
     @ApiModelProperty("MySQL JDBC URL, such as jdbc:mysql://host:port/database")
@@ -98,8 +96,7 @@ public class MySQLSinkDTO {
      */
     public static MySQLSinkDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, MySQLSinkDTO.class);
+            return JsonUtils.parseObject(extParams, MySQLSinkDTO.class);
         } catch (Exception e) {
             LOGGER.error("fetch mysql sink info failed from json params: " + extParams, e);
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java
index 86d008b28..553a534ce 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.oracle;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +41,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class OracleSinkDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final Logger LOGGER = LoggerFactory.getLogger(OracleSinkDTO.class);
 
     @ApiModelProperty("Oracle JDBC URL,Such as jdbc:oracle:thin@host:port:sid "
@@ -83,8 +81,7 @@ public class OracleSinkDTO {
      */
     public static OracleSinkDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, OracleSinkDTO.class);
+            return JsonUtils.parseObject(extParams, OracleSinkDTO.class);
         } catch (Exception e) {
             LOGGER.error("fetch oracle sink info failed from json params: " + extParams, e);
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
index 9fb8c30ba..7fd5b45d9 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.postgresql;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -28,6 +26,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.AESUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.nio.charset.StandardCharsets;
@@ -43,8 +42,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class PostgreSQLSinkDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("JDBC URL of the PostgreSQL server")
     private String jdbcUrl;
 
@@ -99,8 +96,7 @@ public class PostgreSQLSinkDTO {
      */
     public static PostgreSQLSinkDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, PostgreSQLSinkDTO.class).decryptPassword();
+            return JsonUtils.parseObject(extParams, PostgreSQLSinkDTO.class).decryptPassword();
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java
index ec37f63a6..00c9b9ca3 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.sqlserver;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.List;
@@ -39,8 +38,6 @@ import java.util.List;
 @AllArgsConstructor
 public class SQLServerSinkDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Username of the SQLServer")
     private String username;
 
@@ -86,8 +83,7 @@ public class SQLServerSinkDTO {
      */
     public static SQLServerSinkDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, SQLServerSinkDTO.class);
+            return JsonUtils.parseObject(extParams, SQLServerSinkDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
index 735f9934e..569606737 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.sink.tdsqlpostgresql;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.Map;
@@ -39,8 +38,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class TDSQLPostgreSQLSinkDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("TDSQLPostgreSQL jdbc url, such as jdbc:postgresql://host:port/database")
     private String jdbcUrl;
 
@@ -82,8 +79,7 @@ public class TDSQLPostgreSQLSinkDTO {
      */
     public static TDSQLPostgreSQLSinkDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, TDSQLPostgreSQLSinkDTO.class);
+            return JsonUtils.parseObject(extParams, TDSQLPostgreSQLSinkDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java
index 372f0d71a..00a9967d9 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.source;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 
@@ -38,8 +37,6 @@ import javax.validation.constraints.NotNull;
 @Data
 public class SubSourceDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("stream source id")
     private Integer id;
 
@@ -54,8 +51,7 @@ public class SubSourceDTO {
 
     public static SubSourceDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, SubSourceDTO.class);
+            return JsonUtils.parseObject(extParams, SubSourceDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceDTO.java
index 336cd01f2..b34c563f3 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.source.autopush;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 
@@ -38,8 +37,6 @@ import javax.validation.constraints.NotNull;
 @AllArgsConstructor
 public class AutoPushSourceDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty(value = "DataProxy group name, used when the user enables local configuration")
     private String dataProxyGroup;
 
@@ -60,8 +57,7 @@ public class AutoPushSourceDTO {
 
     public static AutoPushSourceDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, AutoPushSourceDTO.class);
+            return JsonUtils.parseObject(extParams, AutoPushSourceDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
index acd58eea1..41aa7ab0f 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.source.file;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.List;
@@ -40,8 +39,6 @@ import java.util.Map;
 @Data
 public class FileSourceDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Path regex pattern for file, such as /a/b/*.txt")
     private String pattern;
 
@@ -95,8 +92,7 @@ public class FileSourceDTO {
 
     public static FileSourceDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, FileSourceDTO.class);
+            return JsonUtils.parseObject(extParams, FileSourceDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
index dd4735d9e..f33d097de 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.source.kafka;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.Map;
@@ -39,8 +38,6 @@ import java.util.Map;
 @NoArgsConstructor
 public class KafkaSourceDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Kafka topic")
     private String topic;
 
@@ -126,8 +123,7 @@ public class KafkaSourceDTO {
 
     public static KafkaSourceDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, KafkaSourceDTO.class);
+            return JsonUtils.parseObject(extParams, KafkaSourceDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mongodb/MongoDBSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mongodb/MongoDBSourceDTO.java
index edde6d627..548bd5cdb 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mongodb/MongoDBSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mongodb/MongoDBSourceDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.source.mongodb;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.Map;
@@ -39,8 +38,6 @@ import java.util.Map;
 @NoArgsConstructor
 public class MongoDBSourceDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Hosts of the MongoDB server")
     private String hosts;
 
@@ -82,8 +79,7 @@ public class MongoDBSourceDTO {
      */
     public static MongoDBSourceDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, MongoDBSourceDTO.class);
+            return JsonUtils.parseObject(extParams, MongoDBSourceDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSourceDTO.java
index 8e8a028b3..029422fd7 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSourceDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.source.mysql;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.Map;
@@ -39,8 +38,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class MySQLBinlogSourceDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
-
     @ApiModelProperty("Username of the MySQL server")
     private String user;
 
@@ -151,8 +148,7 @@ public class MySQLBinlogSourceDTO {
 
     public static MySQLBinlogSourceDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, MySQLBinlogSourceDTO.class);
+            return JsonUtils.parseObject(extParams, MySQLBinlogSourceDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oracle/OracleSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oracle/OracleSourceDTO.java
index e9d3503ff..6f775a059 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oracle/OracleSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oracle/OracleSourceDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.source.oracle;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.Map;
@@ -39,8 +38,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class OracleSourceDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Hostname of the Oracle server")
     private String hostname;
 
@@ -91,8 +88,7 @@ public class OracleSourceDTO {
 
     public static OracleSourceDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, OracleSourceDTO.class);
+            return JsonUtils.parseObject(extParams, OracleSourceDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/postgresql/PostgreSQLSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/postgresql/PostgreSQLSourceDTO.java
index d23d99690..be2615b76 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/postgresql/PostgreSQLSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/postgresql/PostgreSQLSourceDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.source.postgresql;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.List;
@@ -40,8 +39,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class PostgreSQLSourceDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
-
     @ApiModelProperty("Username of the PostgreSQL server")
     private String username;
 
@@ -92,8 +89,7 @@ public class PostgreSQLSourceDTO {
 
     public static PostgreSQLSourceDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, PostgreSQLSourceDTO.class);
+            return JsonUtils.parseObject(extParams, PostgreSQLSourceDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
index 3a4623868..4ebb29b47 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.source.pulsar;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.Map;
@@ -39,8 +38,6 @@ import java.util.Map;
 @NoArgsConstructor
 public class PulsarSourceDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Pulsar tenant")
     private String tenant;
 
@@ -94,8 +91,7 @@ public class PulsarSourceDTO {
 
     public static PulsarSourceDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, PulsarSourceDTO.class);
+            return JsonUtils.parseObject(extParams, PulsarSourceDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceDTO.java
index 10892affb..a41a587c7 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/redis/RedisSourceDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.source.redis;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.Map;
@@ -39,8 +38,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class RedisSourceDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Username of the redis server")
     private String username;
 
@@ -139,8 +136,7 @@ public class RedisSourceDTO {
 
     public static RedisSourceDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, RedisSourceDTO.class);
+            return JsonUtils.parseObject(extParams, RedisSourceDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sqlserver/SQLServerSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sqlserver/SQLServerSourceDTO.java
index 0c1b65bcf..5b7dd4e50 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sqlserver/SQLServerSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sqlserver/SQLServerSourceDTO.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.pojo.source.sqlserver;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.Map;
@@ -39,8 +38,6 @@ import java.util.Map;
 @AllArgsConstructor
 public class SQLServerSourceDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Username of the SQLServer server")
     private String username;
 
@@ -98,8 +95,7 @@ public class SQLServerSourceDTO {
      */
     public static SQLServerSourceDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, SQLServerSourceDTO.class);
+            return JsonUtils.parseObject(extParams, SQLServerSourceDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
index 571373b51..04a41e80b 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
@@ -18,8 +18,6 @@
 
 package org.apache.inlong.manager.pojo.source.tubemq;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -27,6 +25,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 
 import javax.validation.constraints.NotNull;
 import java.util.Map;
@@ -41,8 +40,6 @@ import java.util.TreeSet;
 @AllArgsConstructor
 public class TubeMQSourceDTO {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     @ApiModelProperty("Master RPC of the TubeMQ,127.0.0.1:8715")
     private String masterRpc;
 
@@ -87,8 +84,7 @@ public class TubeMQSourceDTO {
      */
     public static TubeMQSourceDTO getFromJson(@NotNull String extParams) {
         try {
-            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-            return OBJECT_MAPPER.readValue(extParams, TubeMQSourceDTO.class);
+            return JsonUtils.parseObject(extParams, TubeMQSourceDTO.class);
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 6096777ac..790e46049 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -18,8 +18,8 @@
 package org.apache.inlong.manager.service.resource.sort;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -60,7 +60,6 @@ import java.util.stream.Collectors;
 public class DefaultSortConfigOperator implements SortConfigOperator {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSortConfigOperator.class);
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     @Autowired
     private StreamSourceService sourceService;
@@ -83,7 +82,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
         }
 
         GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos);
-        String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
+        String dataflow = JsonUtils.toJsonString(configInfo);
         if (isStream) {
             this.addToStreamExt(streamInfos, dataflow);
         } else {
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowUtils.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowUtils.java
index c70659e06..026f1a937 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowUtils.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowUtils.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.common.enums.ProcessStatus;
 import org.apache.inlong.manager.common.enums.TaskStatus;
 import org.apache.inlong.manager.common.exceptions.FormParseException;
 import org.apache.inlong.manager.common.exceptions.JsonException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.WorkflowEventLogEntity;
 import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
@@ -53,7 +54,6 @@ import java.util.Arrays;
  */
 public class WorkflowUtils {
 
-    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowUtils.class);
 
     /**
@@ -111,10 +111,10 @@ public class WorkflowUtils {
                 .build();
         try {
             if (StringUtils.isNotBlank(entity.getFormData())) {
-                processResponse.setFormData(OBJECT_MAPPER.readTree(entity.getFormData()));
+                processResponse.setFormData(JsonUtils.parseTree(entity.getFormData()));
             }
             if (StringUtils.isNotBlank(entity.getExtParams())) {
-                processResponse.setExtParams(OBJECT_MAPPER.readTree(entity.getExtParams()));
+                processResponse.setExtParams(JsonUtils.parseTree(entity.getExtParams()));
             }
         } catch (Exception e) {
             LOGGER.error("parse process form error: ", e);
@@ -152,7 +152,7 @@ public class WorkflowUtils {
         try {
             JsonNode formData = null;
             if (StringUtils.isNotBlank(taskEntity.getFormData())) {
-                formData = OBJECT_MAPPER.readTree(taskEntity.getFormData());
+                formData = JsonUtils.parseTree(taskEntity.getFormData());
             }
             taskResponse.setFormData(formData);
         } catch (Exception e) {