You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/11/07 11:37:57 UTC

[incubator-dolphinscheduler] branch dev updated: [Improvement-4012][common/remote] Json util code integration, remove the remote module json util (#4013)

This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4b502e3  [Improvement-4012][common/remote] Json util code integration,remove the remote module json util (#4013)
4b502e3 is described below

commit 4b502e361fa0d41d142b1f2c0845d64d1dcc1abd
Author: zhuangchong <37...@users.noreply.github.com>
AuthorDate: Sat Nov 7 19:37:48 2020 +0800

    [Improvement-4012][common/remote] Json util code integration,remove the remote module json util (#4013)
    
    * Json util code integration,remove the remote module json util.
    
    * update code checkstyle.
    
    * update code checkstyle.
    
    * add code checkstyle.
    
    * add test class.
    
    * update JSONUtils class.
    
    * update JSONUtils class.
---
 .../dolphinscheduler/common/utils/JSONUtils.java   |  39 ++++++++
 .../common/utils/JSONUtilsTest.java                |  35 ++++---
 dolphinscheduler-remote/pom.xml                    |   8 +-
 .../remote/command/TaskExecuteAckCommand.java      |  28 +++---
 .../remote/command/TaskExecuteRequestCommand.java  |  13 +--
 .../remote/command/TaskExecuteResponseCommand.java |  27 +++---
 .../remote/command/TaskKillRequestCommand.java     |  14 +--
 .../remote/command/TaskKillResponseCommand.java    |  22 ++---
 .../command/log/GetLogBytesRequestCommand.java     |   6 +-
 .../command/log/GetLogBytesResponseCommand.java    |   6 +-
 .../command/log/RemoveTaskLogRequestCommand.java   |   6 +-
 .../command/log/RemoveTaskLogResponseCommand.java  |   6 +-
 .../command/log/RollViewLogRequestCommand.java     |   6 +-
 .../command/log/RollViewLogResponseCommand.java    |   6 +-
 .../remote/command/log/ViewLogRequestCommand.java  |   6 +-
 .../remote/command/log/ViewLogResponseCommand.java |   6 +-
 .../remote/utils/JsonSerializer.java               |  92 -------------------
 .../remote/JsonSerializerTest.java                 |  58 ------------
 .../server/entity/TaskExecutionContext.java        |   4 +-
 .../server/log/LoggerRequestProcessor.java         |  64 ++++++++-----
 .../server/master/processor/TaskAckProcessor.java  |  16 ++--
 .../processor/TaskKillResponseProcessor.java       |   9 +-
 .../master/processor/TaskResponseProcessor.java    |  17 ++--
 .../worker/processor/TaskExecuteProcessor.java     |  15 ++-
 .../server/worker/processor/TaskKillProcessor.java |  23 ++---
 .../master/processor/TaskAckProcessorTest.java     | 101 +++++++++++++++++++++
 .../processor/TaskKillResponseProcessorTest.java   |  64 +++++++++++++
 .../service/log/LogClientService.java              |  13 ++-
 pom.xml                                            |   2 +
 29 files changed, 402 insertions(+), 310 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
index 56ef74d..fc11a2a 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.common.utils;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
 import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
 import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
@@ -127,6 +129,22 @@ public class JSONUtils {
     }
 
     /**
+     *  deserialize
+     *
+     * @param src byte array
+     * @param clazz class
+     * @param <T> deserialize type
+     * @return deserialize type
+     */
+    public static <T> T parseObject(byte[] src, Class<T> clazz) {
+        if (src == null) {
+            return null;
+        }
+        String json = new String(src, UTF_8);
+        return parseObject(json, clazz);
+    }
+
+    /**
      * json to list
      *
      * @param json json string
@@ -253,6 +271,27 @@ public class JSONUtils {
         }
     }
 
+    /**
+     * serialize to json byte
+     *
+     * @param obj object
+     * @param <T> object type
+     * @return byte array
+     */
+    public static <T> byte[] toJsonByteArray(T obj)  {
+        if (obj == null) {
+            return null;
+        }
+        String json = "";
+        try {
+            json = toJsonString(obj);
+        } catch (Exception e) {
+            logger.error("json serialize exception.", e);
+        }
+
+        return json.getBytes(UTF_8);
+    }
+
     public static ObjectNode parseObject(String text) {
         try {
             return (ObjectNode) objectMapper.readTree(text);
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java
index e273496..af12d5a 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java
@@ -14,13 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.common.utils;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.dolphinscheduler.common.enums.DataType;
+import org.apache.dolphinscheduler.common.enums.Direct;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.process.Property;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -28,13 +28,15 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.dolphinscheduler.common.enums.DataType;
-import org.apache.dolphinscheduler.common.enums.Direct;
-import org.apache.dolphinscheduler.common.model.TaskNode;
-import org.apache.dolphinscheduler.common.process.Property;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class JSONUtilsTest {
 
     @Test
@@ -108,9 +110,8 @@ public class JSONUtilsTest {
         Assert.assertEquals(Direct.IN, direct);
     }
 
-
     @Test
-    public void String2MapTest() {
+    public void string2MapTest() {
         String str = list2String();
 
         List<LinkedHashMap> maps = JSONUtils.toList(str,
@@ -146,6 +147,18 @@ public class JSONUtilsTest {
     }
 
     @Test
+    public void testJsonByteArray() {
+        String str = "foo";
+        byte[] serializeByte = JSONUtils.toJsonByteArray(str);
+        String deserialize = JSONUtils.parseObject(serializeByte, String.class);
+        Assert.assertEquals(str, deserialize);
+        str = null;
+        serializeByte = JSONUtils.toJsonByteArray(str);
+        deserialize = JSONUtils.parseObject(serializeByte, String.class);
+        Assert.assertNull(deserialize);
+    }
+
+    @Test
     public void testToList() {
         Assert.assertEquals(new ArrayList(),
                 JSONUtils.toList("A1B2C3", null));
diff --git a/dolphinscheduler-remote/pom.xml b/dolphinscheduler-remote/pom.xml
index 4d398f3..4af03a3 100644
--- a/dolphinscheduler-remote/pom.xml
+++ b/dolphinscheduler-remote/pom.xml
@@ -36,6 +36,10 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-common</artifactId>
+        </dependency>
+        <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-all</artifactId>
         </dependency>
@@ -48,10 +52,6 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-        </dependency>
 
     </dependencies>
 </project>
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
index 135c149..2fc70f1 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
@@ -14,14 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.remote.command;
 
-import com.fasterxml.jackson.annotation.JsonFormat;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
 import java.io.Serializable;
 import java.util.Date;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
+
 /**
  *  execute task request command
  */
@@ -35,7 +37,7 @@ public class TaskExecuteAckCommand implements Serializable {
     /**
      * startTime
      */
-    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
     private Date startTime;
 
     /**
@@ -111,23 +113,23 @@ public class TaskExecuteAckCommand implements Serializable {
      *
      * @return command
      */
-    public Command convert2Command(){
+    public Command convert2Command() {
         Command command = new Command();
         command.setType(CommandType.TASK_EXECUTE_ACK);
-        byte[] body = JsonSerializer.serialize(this);
+        byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
 
     @Override
     public String toString() {
-        return "TaskExecuteAckCommand{" +
-                "taskInstanceId=" + taskInstanceId +
-                ", startTime=" + startTime +
-                ", host='" + host + '\'' +
-                ", status=" + status +
-                ", logPath='" + logPath + '\'' +
-                ", executePath='" + executePath + '\'' +
-                '}';
+        return "TaskExecuteAckCommand{"
+                + "taskInstanceId=" + taskInstanceId
+                + ", startTime=" + startTime
+                + ", host='" + host + '\''
+                + ", status=" + status
+                + ", logPath='" + logPath + '\''
+                + ", executePath='" + executePath + '\''
+                + '}';
     }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
index 4ae28e3..5b2e339 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
@@ -14,9 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.remote.command;
 
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
 import java.io.Serializable;
 
@@ -50,18 +51,18 @@ public class TaskExecuteRequestCommand implements Serializable {
      *
      * @return command
      */
-    public Command convert2Command(){
+    public Command convert2Command() {
         Command command = new Command();
         command.setType(CommandType.TASK_EXECUTE_REQUEST);
-        byte[] body = JsonSerializer.serialize(this);
+        byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
 
     @Override
     public String toString() {
-        return "TaskExecuteRequestCommand{" +
-                "taskExecutionContext='" + taskExecutionContext + '\'' +
-                '}';
+        return "TaskExecuteRequestCommand{"
+                + "taskExecutionContext='" + taskExecutionContext + '\''
+                + '}';
     }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
index 7f6ee66..de5b82c 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
@@ -14,20 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.remote.command;
 
-import com.fasterxml.jackson.annotation.JsonFormat;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
 import java.io.Serializable;
 import java.util.Date;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
+
 /**
  *  execute task response command
  */
 public class TaskExecuteResponseCommand implements Serializable {
 
-
     public TaskExecuteResponseCommand() {
     }
 
@@ -49,7 +50,7 @@ public class TaskExecuteResponseCommand implements Serializable {
     /**
      *  end time
      */
-    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
     private Date endTime;
 
 
@@ -120,22 +121,22 @@ public class TaskExecuteResponseCommand implements Serializable {
      * package response command
      * @return command
      */
-    public Command convert2Command(){
+    public Command convert2Command() {
         Command command = new Command();
         command.setType(CommandType.TASK_EXECUTE_RESPONSE);
-        byte[] body = JsonSerializer.serialize(this);
+        byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
 
     @Override
     public String toString() {
-        return "TaskExecuteResponseCommand{" +
-                "taskInstanceId=" + taskInstanceId +
-                ", status=" + status +
-                ", endTime=" + endTime +
-                ", processId=" + processId +
-                ", appIds='" + appIds + '\'' +
-                '}';
+        return "TaskExecuteResponseCommand{"
+                + "taskInstanceId=" + taskInstanceId
+                + ", status=" + status
+                + ", endTime=" + endTime
+                + ", processId=" + processId
+                + ", appIds='" + appIds + '\''
+                + '}';
     }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
index 4c0830b..155b317 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
@@ -14,9 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.remote.command;
 
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
 import java.io.Serializable;
 
@@ -30,7 +31,6 @@ public class TaskKillRequestCommand implements Serializable {
      */
     private int taskInstanceId;
 
-
     public int getTaskInstanceId() {
         return taskInstanceId;
     }
@@ -44,18 +44,18 @@ public class TaskKillRequestCommand implements Serializable {
      *
      * @return command
      */
-    public Command convert2Command(){
+    public Command convert2Command() {
         Command command = new Command();
         command.setType(CommandType.TASK_KILL_REQUEST);
-        byte[] body = JsonSerializer.serialize(this);
+        byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
 
     @Override
     public String toString() {
-        return "TaskKillRequestCommand{" +
-                "taskInstanceId=" + taskInstanceId +
-                '}';
+        return "TaskKillRequestCommand{"
+                + "taskInstanceId=" + taskInstanceId
+                + '}';
     }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
index 4b48c1e..f77221d 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
@@ -14,9 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.remote.command;
 
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
 import java.io.Serializable;
 import java.util.List;
@@ -52,7 +53,6 @@ public class TaskKillResponseCommand implements Serializable {
      */
     protected List<String> appIds;
 
-
     public int getTaskInstanceId() {
         return taskInstanceId;
     }
@@ -98,22 +98,22 @@ public class TaskKillResponseCommand implements Serializable {
      *
      * @return command
      */
-    public Command convert2Command(){
+    public Command convert2Command() {
         Command command = new Command();
         command.setType(CommandType.TASK_KILL_RESPONSE);
-        byte[] body = JsonSerializer.serialize(this);
+        byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
 
     @Override
     public String toString() {
-        return "TaskKillResponseCommand{" +
-                "taskInstanceId=" + taskInstanceId +
-                ", host='" + host + '\'' +
-                ", status=" + status +
-                ", processId=" + processId +
-                ", appIds=" + appIds +
-                '}';
+        return "TaskKillResponseCommand{"
+                + "taskInstanceId=" + taskInstanceId
+                + ", host='" + host + '\''
+                + ", status=" + status
+                + ", processId=" + processId
+                + ", appIds=" + appIds
+                + '}';
     }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
index e4b21e2..ef71e07 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
@@ -17,9 +17,9 @@
 
 package org.apache.dolphinscheduler.remote.command.log;
 
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 
 import java.io.Serializable;
 
@@ -53,10 +53,10 @@ public class GetLogBytesRequestCommand implements Serializable {
      *
      * @return command
      */
-    public Command convert2Command(){
+    public Command convert2Command() {
         Command command = new Command();
         command.setType(CommandType.GET_LOG_BYTES_REQUEST);
-        byte[] body = JsonSerializer.serialize(this);
+        byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java
index 349ec03..e8e3eb2 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java
@@ -17,9 +17,9 @@
 
 package org.apache.dolphinscheduler.remote.command.log;
 
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 
 import java.io.Serializable;
 
@@ -54,10 +54,10 @@ public class GetLogBytesResponseCommand implements Serializable {
      * @param opaque request unique identification
      * @return command
      */
-    public Command convert2Command(long opaque){
+    public Command convert2Command(long opaque) {
         Command command = new Command(opaque);
         command.setType(CommandType.GET_LOG_BYTES_RESPONSE);
-        byte[] body = JsonSerializer.serialize(this);
+        byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java
index a91cb2a..c5960d6 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java
@@ -17,9 +17,9 @@
 
 package org.apache.dolphinscheduler.remote.command.log;
 
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 
 import java.io.Serializable;
 
@@ -53,10 +53,10 @@ public class RemoveTaskLogRequestCommand implements Serializable {
      *
      * @return command
      */
-    public Command convert2Command(){
+    public Command convert2Command() {
         Command command = new Command();
         command.setType(CommandType.REMOVE_TAK_LOG_REQUEST);
-        byte[] body = JsonSerializer.serialize(this);
+        byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java
index 39e8672..6883ece 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java
@@ -17,9 +17,9 @@
 
 package org.apache.dolphinscheduler.remote.command.log;
 
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 
 import java.io.Serializable;
 
@@ -53,10 +53,10 @@ public class RemoveTaskLogResponseCommand implements Serializable {
      *
      * @return command
      */
-    public Command convert2Command(long opaque){
+    public Command convert2Command(long opaque) {
         Command command = new Command(opaque);
         command.setType(CommandType.REMOVE_TAK_LOG_RESPONSE);
-        byte[] body = JsonSerializer.serialize(this);
+        byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
index 00129c7..4afee09 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
@@ -17,9 +17,9 @@
 
 package org.apache.dolphinscheduler.remote.command.log;
 
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 
 import java.io.Serializable;
 
@@ -81,10 +81,10 @@ public class RollViewLogRequestCommand implements Serializable {
      *
      * @return command
      */
-    public Command convert2Command(){
+    public Command convert2Command() {
         Command command = new Command();
         command.setType(CommandType.ROLL_VIEW_LOG_REQUEST);
-        byte[] body = JsonSerializer.serialize(this);
+        byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java
index a4f4f86..0e9e44a 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java
@@ -17,9 +17,9 @@
 
 package org.apache.dolphinscheduler.remote.command.log;
 
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 
 import java.io.Serializable;
 
@@ -54,10 +54,10 @@ public class RollViewLogResponseCommand implements Serializable {
      * @param opaque request unique identification
      * @return command
      */
-    public Command convert2Command(long opaque){
+    public Command convert2Command(long opaque) {
         Command command = new Command(opaque);
         command.setType(CommandType.ROLL_VIEW_LOG_RESPONSE);
-        byte[] body = JsonSerializer.serialize(this);
+        byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
index 1d51653..e809469 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
@@ -17,9 +17,9 @@
 
 package org.apache.dolphinscheduler.remote.command.log;
 
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 
 import java.io.Serializable;
 
@@ -53,10 +53,10 @@ public class ViewLogRequestCommand implements Serializable {
      *
      * @return command
      */
-    public Command convert2Command(){
+    public Command convert2Command() {
         Command command = new Command();
         command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
-        byte[] body = JsonSerializer.serialize(this);
+        byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java
index 6940104..33e2630 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java
@@ -17,9 +17,9 @@
 
 package org.apache.dolphinscheduler.remote.command.log;
 
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 
 import java.io.Serializable;
 
@@ -54,10 +54,10 @@ public class ViewLogResponseCommand implements Serializable {
      * @param opaque request unique identification
      * @return command
      */
-    public Command convert2Command(long opaque){
+    public Command convert2Command(long opaque) {
         Command command = new Command(opaque);
         command.setType(CommandType.VIEW_WHOLE_LOG_RESPONSE);
-        byte[] body = JsonSerializer.serialize(this);
+        byte[] body = JSONUtils.toJsonByteArray(this);
         command.setBody(body);
         return command;
     }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/JsonSerializer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/JsonSerializer.java
deleted file mode 100644
index 0c05232..0000000
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/JsonSerializer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.remote.utils;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
-/**
- *  json serialize or deserialize
- */
-public class JsonSerializer {
-	private static final ObjectMapper objectMapper = new ObjectMapper();
-	private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);
-
-	private JsonSerializer(){
-
-	}
-	/**
-	 * serialize to byte
-	 *
-	 * @param obj object
-	 * @param <T> object type
-	 * @return byte array
-	 */
-	public static <T> byte[] serialize(T obj)  {
-        String json = "";
-        try {
-            json = objectMapper.writeValueAsString(obj);
-        } catch (JsonProcessingException e) {
-            logger.error("serializeToString exception!", e);
-        }
-
-		return json.getBytes(Constants.UTF8);
-	}
-
-	/**
-	 *  serialize to string
-	 * @param obj object
-	 * @param <T> object type
-	 * @return string
-	 */
-	public static <T> String serializeToString(T obj)  {
-		String json = "";
-		try {
-			 json = objectMapper.writeValueAsString(obj);
-		} catch (JsonProcessingException e) {
-			logger.error("serializeToString exception!", e);
-		}
-
-		return json;
-	}
-
-	/**
-	 *  deserialize
-	 *
-	 * @param src byte array
-	 * @param clazz class
-	 * @param <T> deserialize type
-	 * @return deserialize type
-	 */
-	public static <T> T deserialize(byte[] src, Class<T> clazz) {
-
-        String json = new String(src, StandardCharsets.UTF_8);
-        try {
-            return objectMapper.readValue(json, clazz);
-        } catch (IOException e) {
-            logger.error("deserialize exception!", e);
-            return null;
-        }
-
-	}
-
-}
diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/JsonSerializerTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/JsonSerializerTest.java
deleted file mode 100644
index cb92db7..0000000
--- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/JsonSerializerTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.remote;
-
-
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class JsonSerializerTest {
-
-    @Test
-    public void testSerialize(){
-        TestObj testObj = new TestObj();
-        testObj.setAge(12);
-        byte[] serializeByte = JsonSerializer.serialize(testObj);
-
-        //
-        TestObj deserialize = JsonSerializer.deserialize(serializeByte, TestObj.class);
-
-        Assert.assertEquals(testObj.getAge(), deserialize.getAge());
-    }
-
-    static class TestObj {
-
-        private int age;
-
-        public int getAge() {
-            return age;
-        }
-
-        public void setAge(int age) {
-            this.age = age;
-        }
-
-        @Override
-        public String toString() {
-            return "TestObj{" +
-                    "age=" + age +
-                    '}';
-        }
-    }
-}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index 1589c36..7d28a77 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -18,9 +18,9 @@
 package org.apache.dolphinscheduler.server.entity;
 
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 
 import java.io.Serializable;
 import java.util.Date;
@@ -475,7 +475,7 @@ public class TaskExecutionContext implements Serializable {
 
     public Command toCommand() {
         TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
-        requestCommand.setTaskExecutionContext(JsonSerializer.serializeToString(this));
+        requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(this));
         return requestCommand.convert2Command();
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
index 458afa6..b31785c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -14,19 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.log;
 
-import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.utils.IOUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.log.*;
+import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
+import org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand;
+import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand;
+import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseCommand;
+import org.apache.dolphinscheduler.remote.command.log.RollViewLogRequestCommand;
+import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand;
+import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
+import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Collections;
@@ -38,6 +49,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.Channel;
+
 /**
  *  logger request process logic
  */
@@ -47,7 +63,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
 
     private final ThreadPoolExecutor executor;
 
-    public LoggerRequestProcessor(){
+    public LoggerRequestProcessor() {
         this.executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
     }
 
@@ -59,35 +75,35 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
          * reuqest task log command type
          */
         final CommandType commandType = command.getType();
-        switch (commandType){
+        switch (commandType) {
             case GET_LOG_BYTES_REQUEST:
-                GetLogBytesRequestCommand getLogRequest = JsonSerializer.deserialize(
+                GetLogBytesRequestCommand getLogRequest = JSONUtils.parseObject(
                         command.getBody(), GetLogBytesRequestCommand.class);
                 byte[] bytes = getFileContentBytes(getLogRequest.getPath());
                 GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
                 channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
                 break;
             case VIEW_WHOLE_LOG_REQUEST:
-                ViewLogRequestCommand viewLogRequest = JsonSerializer.deserialize(
+                ViewLogRequestCommand viewLogRequest = JSONUtils.parseObject(
                         command.getBody(), ViewLogRequestCommand.class);
                 String msg = readWholeFileContent(viewLogRequest.getPath());
                 ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg);
                 channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque()));
                 break;
             case ROLL_VIEW_LOG_REQUEST:
-                RollViewLogRequestCommand rollViewLogRequest = JsonSerializer.deserialize(
+                RollViewLogRequestCommand rollViewLogRequest = JSONUtils.parseObject(
                         command.getBody(), RollViewLogRequestCommand.class);
                 List<String> lines = readPartFileContent(rollViewLogRequest.getPath(),
                         rollViewLogRequest.getSkipLineNum(), rollViewLogRequest.getLimit());
                 StringBuilder builder = new StringBuilder();
-                for (String line : lines){
+                for (String line : lines) {
                     builder.append(line + "\r\n");
                 }
                 RollViewLogResponseCommand rollViewLogRequestResponse = new RollViewLogResponseCommand(builder.toString());
                 channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque()));
                 break;
             case REMOVE_TAK_LOG_REQUEST:
-                RemoveTaskLogRequestCommand removeTaskLogRequest = JsonSerializer.deserialize(
+                RemoveTaskLogRequestCommand removeTaskLogRequest = JSONUtils.parseObject(
                         command.getBody(), RemoveTaskLogRequestCommand.class);
 
                 String taskLogPath = removeTaskLogRequest.getPath();
@@ -95,10 +111,10 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
                 File taskLogFile = new File(taskLogPath);
                 Boolean status = true;
                 try {
-                    if (taskLogFile.exists()){
+                    if (taskLogFile.exists()) {
                         status = taskLogFile.delete();
                     }
-                }catch (Exception e){
+                } catch (Exception e) {
                     status = false;
                 }
 
@@ -110,7 +126,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
         }
     }
 
-    public ExecutorService getExecutor(){
+    public ExecutorService getExecutor() {
         return this.executor;
     }
 
@@ -121,7 +137,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
      * @return byte array of file
      * @throws Exception exception
      */
-    private byte[] getFileContentBytes(String filePath){
+    private byte[] getFileContentBytes(String filePath) {
         InputStream in = null;
         ByteArrayOutputStream bos = null;
         try {
@@ -133,9 +149,9 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
                 bos.write(buf, 0, len);
             }
             return bos.toByteArray();
-        }catch (IOException e){
+        } catch (IOException e) {
             logger.error("get file bytes error",e);
-        }finally {
+        } finally {
             IOUtils.closeQuietly(bos);
             IOUtils.closeQuietly(in);
         }
@@ -152,7 +168,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
      */
     private List<String> readPartFileContent(String filePath,
                                             int skipLine,
-                                            int limit){
+                                            int limit) {
         try (Stream<String> stream = Files.lines(Paths.get(filePath))) {
             return stream.skip(skipLine).limit(limit).collect(Collectors.toList());
         } catch (IOException e) {
@@ -167,19 +183,19 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
      * @param filePath file path
      * @return whole file content
      */
-    private String readWholeFileContent(String filePath){
+    private String readWholeFileContent(String filePath) {
         BufferedReader br = null;
         String line;
         StringBuilder sb = new StringBuilder();
         try {
             br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
-            while ((line = br.readLine()) != null){
+            while ((line = br.readLine()) != null) {
                 sb.append(line + "\r\n");
             }
             return sb.toString();
-        }catch (IOException e){
+        } catch (IOException e) {
             logger.error("read file error",e);
-        }finally {
+        } finally {
             IOUtils.closeQuietly(br);
         }
         return "";
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index 0f038dd..0b4d26d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -17,10 +17,12 @@
 
 package org.apache.dolphinscheduler.server.master.processor;
 
-import io.netty.channel.Channel;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
+
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.Command;
@@ -28,17 +30,17 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
 import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.dolphinscheduler.common.Constants.*;
+import io.netty.channel.Channel;
 
 /**
  *  task ack processor
@@ -63,7 +65,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
      */
     private ProcessService processService;
 
-    public TaskAckProcessor(){
+    public TaskAckProcessor() {
         this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
         this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
         this.processService = SpringApplicationContext.getBean(ProcessService.class);
@@ -77,7 +79,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_EXECUTE_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
-        TaskExecuteAckCommand taskAckCommand = JsonSerializer.deserialize(command.getBody(), TaskExecuteAckCommand.class);
+        TaskExecuteAckCommand taskAckCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteAckCommand.class);
         logger.info("taskAckCommand : {}", taskAckCommand);
 
         taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
@@ -96,10 +98,10 @@ public class TaskAckProcessor implements NettyRequestProcessor {
 
         taskResponseService.addResponse(taskResponseEvent);
 
-        while (Stopper.isRunning()){
+        while (Stopper.isRunning()) {
             TaskInstance taskInstance = processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId());
 
-            if (taskInstance != null && ackStatus.typeIsRunning()){
+            if (taskInstance != null && ackStatus.typeIsRunning()) {
                 break;
             }
             ThreadUtils.sleep(SLEEP_TIME_MILLIS);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
index 2e51998..afd0577 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
@@ -17,16 +17,18 @@
 
 package org.apache.dolphinscheduler.server.master.processor;
 
-import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.channel.Channel;
+
 /**
  *  task response processor
  */
@@ -45,9 +47,8 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
 
-        TaskKillResponseCommand responseCommand = JsonSerializer.deserialize(command.getBody(), TaskKillResponseCommand.class);
+        TaskKillResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskKillResponseCommand.class);
         logger.info("received task kill response command : {}", responseCommand);
     }
 
-
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 2633ccd..ffe3d5d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -17,27 +17,29 @@
 
 package org.apache.dolphinscheduler.server.master.processor;
 
-import io.netty.channel.Channel;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
+
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
 import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.dolphinscheduler.common.Constants.*;
+import io.netty.channel.Channel;
 
 /**
  *  task response processor
@@ -61,7 +63,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
      */
     private ProcessService processService;
 
-    public TaskResponseProcessor(){
+    public TaskResponseProcessor() {
         this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
         this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
         this.processService = SpringApplicationContext.getBean(ProcessService.class);
@@ -78,7 +80,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
 
-        TaskExecuteResponseCommand responseCommand = JsonSerializer.deserialize(command.getBody(), TaskExecuteResponseCommand.class);
+        TaskExecuteResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
         logger.info("received command : {}", responseCommand);
 
         taskInstanceCacheManager.cacheTaskInstance(responseCommand);
@@ -95,15 +97,14 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
 
         taskResponseService.addResponse(taskResponseEvent);
 
-        while (Stopper.isRunning()){
+        while (Stopper.isRunning()) {
             TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
 
-            if (taskInstance != null && responseStatus.typeIsFinished()){
+            if (taskInstance != null && responseStatus.typeIsFinished()) {
                 break;
             }
             ThreadUtils.sleep(SLEEP_TIME_MILLIS);
         }
     }
 
-
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 3717ce3..13a9dca 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@@ -73,7 +72,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
      */
     private final TaskCallbackService taskCallbackService;
 
-    public TaskExecuteProcessor(){
+    public TaskExecuteProcessor() {
         this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
         this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
         this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
@@ -84,12 +83,12 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
         Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
                 String.format("invalid command type : %s", command.getType()));
 
-        TaskExecuteRequestCommand taskRequestCommand = JsonSerializer.deserialize(
+        TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject(
                 command.getBody(), TaskExecuteRequestCommand.class);
 
         logger.info("received command : {}", taskRequestCommand);
 
-        if(taskRequestCommand == null){
+        if (taskRequestCommand == null) {
             logger.error("task execute request command is null");
             return;
         }
@@ -97,7 +96,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
         String contextJson = taskRequestCommand.getTaskExecutionContext();
         TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
 
-        if(taskExecutionContext == null){
+        if (taskExecutionContext == null) {
             logger.error("task execution context is null");
             return;
         }
@@ -162,9 +161,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
         ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
         ackCommand.setHost(taskExecutionContext.getHost());
         ackCommand.setStartTime(taskExecutionContext.getStartTime());
-        if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
+        if (taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())) {
             ackCommand.setExecutePath(null);
-        }else{
+        } else {
             ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
         }
         taskExecutionContext.setLogPath(ackCommand.getLogPath());
@@ -176,7 +175,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
      * @param taskExecutionContext taskExecutionContext
      * @return execute local path
      */
-    private String getExecLocalPath(TaskExecutionContext taskExecutionContext){
+    private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
         return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
                 taskExecutionContext.getProcessDefineId(),
                 taskExecutionContext.getProcessInstanceId(),
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 800db21..21108d1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -17,9 +17,9 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
-import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
@@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
 import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.Pair;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
@@ -39,12 +38,15 @@ import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContext
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.log.LogClientService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.Channel;
+
 /**
  *  task kill processor
  */
@@ -67,8 +69,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
      */
     private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
 
-
-    public TaskKillProcessor(){
+    public TaskKillProcessor() {
         this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
         this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
         this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
@@ -83,7 +84,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
-        TaskKillRequestCommand killCommand = JsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class);
+        TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class);
         logger.info("received kill command : {}", killCommand);
 
         Pair<Boolean, List<String>> result = doKill(killCommand);
@@ -101,14 +102,14 @@ public class TaskKillProcessor implements NettyRequestProcessor {
      * @param killCommand
      * @return kill result
      */
-    private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand){
+    private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand) {
         List<String> appIds = Collections.EMPTY_LIST;
         try {
             TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
 
             Integer processId = taskExecutionContext.getProcessId();
 
-            if (processId == null || processId.equals(0)){
+            if (processId == null || processId.equals(0)) {
                 logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId());
                 return Pair.of(false, appIds);
             }
@@ -145,7 +146,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         taskKillResponseCommand.setStatus(result.getLeft() ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
         taskKillResponseCommand.setAppIds(result.getRight());
         TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
-        if(taskExecutionContext != null){
+        if (taskExecutionContext != null) {
             taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
             taskKillResponseCommand.setHost(taskExecutionContext.getHost());
             taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
@@ -183,7 +184,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         } catch (Exception e) {
             logger.error("kill yarn job error",e);
         } finally {
-            if(logClient != null){
+            if (logClient != null) {
                 logClient.close();
             }
         }
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
new file mode 100644
index 0000000..f4805a7
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.net.InetSocketAddress;
+import java.util.Date;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import io.netty.channel.Channel;
+
+/**
+ *  task ack processor test
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SpringApplicationContext.class, TaskResponseEvent.class})
+public class TaskAckProcessorTest {
+
+    private TaskAckProcessor taskAckProcessor;
+    private TaskResponseService taskResponseService;
+    private TaskInstanceCacheManagerImpl taskInstanceCacheManager;
+    private ProcessService processService;
+    private TaskExecuteAckCommand taskExecuteAckCommand;
+    private TaskResponseEvent taskResponseEvent;
+    private Channel channel;
+
+    @Before
+    public void before() {
+        PowerMockito.mockStatic(SpringApplicationContext.class);
+
+        taskResponseService = PowerMockito.mock(TaskResponseService.class);
+        PowerMockito.when(SpringApplicationContext.getBean(TaskResponseService.class)).thenReturn(taskResponseService);
+
+        taskInstanceCacheManager = PowerMockito.mock(TaskInstanceCacheManagerImpl.class);
+        PowerMockito.when(SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class)).thenReturn(taskInstanceCacheManager);
+
+        processService = PowerMockito.mock(ProcessService.class);
+        PowerMockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService);
+
+        taskAckProcessor = new TaskAckProcessor();
+
+        channel = PowerMockito.mock(Channel.class);
+        taskResponseEvent = PowerMockito.mock(TaskResponseEvent.class);
+
+        taskExecuteAckCommand = new TaskExecuteAckCommand();
+        taskExecuteAckCommand.setStatus(1);
+        taskExecuteAckCommand.setExecutePath("/dolphinscheduler/worker");
+        taskExecuteAckCommand.setHost("localhost");
+        taskExecuteAckCommand.setLogPath("/temp/worker.log");
+        taskExecuteAckCommand.setStartTime(new Date());
+        taskExecuteAckCommand.setTaskInstanceId(1);
+    }
+
+    @Test
+    public void testProcess() {
+        Command command = taskExecuteAckCommand.convert2Command();
+        Assert.assertEquals(CommandType.TASK_EXECUTE_ACK,command.getType());
+        InetSocketAddress socketAddress = new InetSocketAddress("localhost",12345);
+        PowerMockito.when(channel.remoteAddress()).thenReturn(socketAddress);
+        PowerMockito.mockStatic(TaskResponseEvent.class);
+
+        PowerMockito.when(TaskResponseEvent.newAck(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyInt()))
+                .thenReturn(taskResponseEvent);
+        TaskInstance taskInstance = PowerMockito.mock(TaskInstance.class);
+        PowerMockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance);
+
+        taskAckProcessor.process(channel,command);
+    }
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java
new file mode 100644
index 0000000..c7f0475
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
+
+import java.util.ArrayList;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+
+import io.netty.channel.Channel;
+
+/**
+ *  task response processor test
+ */
+public class TaskKillResponseProcessorTest {
+
+    private TaskKillResponseProcessor taskKillResponseProcessor;
+
+    private TaskKillResponseCommand taskKillResponseCommand;
+
+    private Channel channel;
+
+    @Before
+    public void before() {
+        taskKillResponseProcessor = new TaskKillResponseProcessor();
+        channel = PowerMockito.mock(Channel.class);
+        taskKillResponseCommand = new TaskKillResponseCommand();
+        taskKillResponseCommand.setAppIds(
+                new ArrayList<String>() {{ add("task_1"); }});
+        taskKillResponseCommand.setHost("localhost");
+        taskKillResponseCommand.setProcessId(1);
+        taskKillResponseCommand.setStatus(1);
+        taskKillResponseCommand.setTaskInstanceId(1);
+
+    }
+
+    @Test
+    public void testProcess() {
+        Command command = taskKillResponseCommand.convert2Command();
+        Assert.assertEquals(CommandType.TASK_KILL_RESPONSE,command.getType());
+        taskKillResponseProcessor.process(channel,command);
+    }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
index 474bf12..75753c7 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -14,8 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.service.log;
 
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
@@ -28,12 +30,10 @@ import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
 import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * log client
  */
@@ -90,7 +90,7 @@ public class LogClientService {
             Command command = request.convert2Command();
             Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
             if (response != null) {
-                RollViewLogResponseCommand rollReviewLog = JsonSerializer.deserialize(
+                RollViewLogResponseCommand rollReviewLog = JSONUtils.parseObject(
                         response.getBody(), RollViewLogResponseCommand.class);
                 return rollReviewLog.getMsg();
             }
@@ -119,7 +119,7 @@ public class LogClientService {
             Command command = request.convert2Command();
             Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
             if (response != null) {
-                ViewLogResponseCommand viewLog = JsonSerializer.deserialize(
+                ViewLogResponseCommand viewLog = JSONUtils.parseObject(
                         response.getBody(), ViewLogResponseCommand.class);
                 return viewLog.getMsg();
             }
@@ -148,7 +148,7 @@ public class LogClientService {
             Command command = request.convert2Command();
             Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
             if (response != null) {
-                GetLogBytesResponseCommand getLog = JsonSerializer.deserialize(
+                GetLogBytesResponseCommand getLog = JSONUtils.parseObject(
                         response.getBody(), GetLogBytesResponseCommand.class);
                 return getLog.getData();
             }
@@ -160,7 +160,6 @@ public class LogClientService {
         return result;
     }
 
-
     /**
      * remove task log
      *
@@ -178,7 +177,7 @@ public class LogClientService {
             Command command = request.convert2Command();
             Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
             if (response != null) {
-                RemoveTaskLogResponseCommand taskLogResponse = JsonSerializer.deserialize(
+                RemoveTaskLogResponseCommand taskLogResponse = JSONUtils.parseObject(
                         response.getBody(), RemoveTaskLogResponseCommand.class);
                 return taskLogResponse.getStatus();
             }
diff --git a/pom.xml b/pom.xml
index 9220668..8bba558 100644
--- a/pom.xml
+++ b/pom.xml
@@ -831,6 +831,8 @@
                         <include>**/server/master/MasterExecThreadTest.java</include>
                         <include>**/server/master/ParamsTest.java</include>
                         <include>**/server/master/SubProcessTaskTest.java</include>
+                        <include>**/server/master/processor/TaskAckProcessorTest.java</include>
+                        <include>**/server/master/processor/TaskKillResponseProcessorTest.java</include>
                         <include>**/server/register/ZookeeperNodeManagerTest.java</include>
                         <include>**/server/utils/DataxUtilsTest.java</include>
                         <include>**/server/utils/ExecutionContextTestUtils.java</include>