You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/10/14 14:11:59 UTC

[incubator-eventmesh] branch develop updated: [ISSUE #550] Remove unused jar in plugin module (#551)

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/develop by this push:
     new b652be9  [ISSUE #550] Remove unused jar in plugin module (#551)
b652be9 is described below

commit b652be9f3cfee746272e87f8fc4b7f8b52bb05e5
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Oct 14 22:11:53 2021 +0800

    [ISSUE #550] Remove unused jar in plugin module (#551)
    
    * optimize build.gradle, remove unused jar in plugin module
    
    * resolve license check
    
    * format code
    
    * fix doc
---
 build.gradle                                       |   1 -
 .../instructions/eventmesh-runtime-quickstart.md   |   4 +-
 .../instructions/eventmesh-runtime-quickstart.md   |   2 +-
 eventmesh-common/build.gradle                      |  40 +-
 .../eventmesh/common/command/HttpCommand.java      |  52 +--
 .../eventmesh/common/exception/JsonException.java  |  23 +-
 .../http/body/client/HeartbeatRequestBody.java     |  36 +-
 .../protocol/http/body/client/RegRequestBody.java  |  31 +-
 .../http/body/client/SubscribeRequestBody.java     |  29 +-
 .../http/body/client/UnRegRequestBody.java         |  31 +-
 .../http/body/client/UnSubscribeRequestBody.java   |  29 +-
 .../http/body/message/PushMessageRequestBody.java  |  44 +--
 .../http/body/message/ReplyMessageRequestBody.java |  42 ++-
 .../body/message/SendMessageBatchRequestBody.java  |  49 +--
 .../http/body/message/SendMessageRequestBody.java  |  49 +--
 .../apache/eventmesh/common/utils/JsonUtils.java   |  83 +++++
 .../eventmesh-connector-api/build.gradle           |   4 +-
 .../eventmesh-connector-rocketmq/build.gradle      |  35 +-
 .../connector/rocketmq/producer/ProducerImpl.java  |  84 +++--
 .../eventmesh-connector-standalone/build.gradle    |   4 +-
 eventmesh-examples/build.gradle                    |   6 +
 .../http/demo/sub/controller/SubController.java    |  16 +-
 .../eventmesh-registry-api/build.gradle            |   2 +-
 .../eventmesh-registry-namesrv/build.gradle        |   2 +-
 eventmesh-runtime/build.gradle                     |  29 +-
 .../protocol/http/consumer/ConsumerManager.java    | 120 +++---
 .../http/processor/SendSyncMessageProcessor.java   | 230 +++++++-----
 .../http/processor/SubscribeProcessor.java         | 135 ++++---
 .../http/processor/UnSubscribeProcessor.java       | 175 +++++----
 .../core/protocol/http/processor/inf/Client.java   |  48 +--
 .../protocol/http/push/AsyncHTTPPushRequest.java   | 136 ++++---
 .../tcp/client/group/ClientGroupWrapper.java       | 405 ++++++++++-----------
 eventmesh-schema-registry/build.gradle             |  16 -
 .../eventmesh-schema-registry-server/build.gradle  |   3 -
 eventmesh-sdk-java/build.gradle                    |  18 +
 .../client/http/consumer/LiteConsumer.java         | 169 +++++----
 .../client/http/producer/LiteProducer.java         | 176 +++++----
 .../producer/RRCallbackResponseHandlerAdapter.java |  45 ++-
 .../eventmesh-security-acl/build.gradle            |   2 +-
 .../eventmesh-security-api/build.gradle            |   2 +-
 eventmesh-spi/build.gradle                         |   2 +-
 .../spi/loader/JarExtensionClassLoader.java        |  52 ++-
 style/checkStyle.xml                               |  11 +-
 tool/license/allowed-licenses.txt                  |   4 +-
 44 files changed, 1387 insertions(+), 1089 deletions(-)

diff --git a/build.gradle b/build.gradle
index 889d2d5..32efd44 100644
--- a/build.gradle
+++ b/build.gradle
@@ -389,7 +389,6 @@ subprojects {
             dependency "org.apache.logging.log4j:log4j-web:2.13.3"
 
             dependency "com.lmax:disruptor:3.4.2"
-            dependency "com.alibaba:fastjson:1.2.71"
 
             dependency "com.fasterxml.jackson.core:jackson-databind:2.11.0"
             dependency "com.fasterxml.jackson.core:jackson-core:2.11.0"
diff --git a/docs/cn/instructions/eventmesh-runtime-quickstart.md b/docs/cn/instructions/eventmesh-runtime-quickstart.md
index 6750674..684a0f6 100644
--- a/docs/cn/instructions/eventmesh-runtime-quickstart.md
+++ b/docs/cn/instructions/eventmesh-runtime-quickstart.md
@@ -20,7 +20,7 @@ Gradle至少为7.0, 推荐 7.0.*
 ```$ xslt
 unzip EventMesh-master.zip
 cd / *您的部署路径* /EventMesh-master
-gradle clean dist copyConnectorPlugin tar -x test
+gradle clean dist
 ```
 
 您将在目录/ *您的部署路径* /EventMesh-master/eventmesh-runtime/dist中获得**eventmesh-runtime_1.0.0.tar.gz**
@@ -68,7 +68,7 @@ sh start.sh
 > 插件实例需要在对应模块中的/main/resources/META-INF/eventmesh 下配置相关接口与实现类的映射文件,文件名为SPI接口全类名.
 > 文件内容为插件实例名到插件实例的映射, 具体可以参考eventmesh-connector-rocketmq插件模块
 
-插件可以从classpath和插件目录下面加载. 在本地开发阶段可以将使用的插件在eventmesh-starter模块build.gradle中进行声明,或者执行gradle的copyConnectorPlugin任务
+插件可以从classpath和插件目录下面加载. 在本地开发阶段可以将使用的插件在eventmesh-starter模块build.gradle中进行声明,或者执行gradle的dist任务
 将插件拷贝至dist/plugin目录下, eventmesh默认会加载项目下dist/plugin目录下的插件, 加载目录可以通过-DeventMeshPluginDir=your_plugin_directory来改变插件目录.
 运行时需要使用的插件实例可以在eventmesh.properties中进行配置.如果需要使用rocketmq插件实行快速启动,需要在eventmesh-starter模块build.gradle中进行如下声明
 ```
diff --git a/docs/en/instructions/eventmesh-runtime-quickstart.md b/docs/en/instructions/eventmesh-runtime-quickstart.md
index e3f7228..dd33e72 100644
--- a/docs/en/instructions/eventmesh-runtime-quickstart.md
+++ b/docs/en/instructions/eventmesh-runtime-quickstart.md
@@ -20,7 +20,7 @@ You will get **EventMesh-master.zip**
 ```$xslt
 unzip EventMesh-master.zip
 cd /*YOUR DEPLOY PATH*/EventMesh-master
-gradle clean dist copyConnectorPlugin tar -x test
+gradle clean dist
 ```
 
 You will get **EventMesh_1.3.0-SNAPSHOT.tar.gz** in directory /* YOUR DEPLOY PATH */EventMesh-master/build
diff --git a/eventmesh-common/build.gradle b/eventmesh-common/build.gradle
index 3e160e0..b552781 100644
--- a/eventmesh-common/build.gradle
+++ b/eventmesh-common/build.gradle
@@ -18,36 +18,31 @@
 dependencies {
     api "org.apache.commons:commons-lang3"
     api "org.apache.commons:commons-collections4"
-    api "commons-io:commons-io"
-    api "org.apache.commons:commons-text"
-
     api "com.google.guava:guava"
-
     api "org.slf4j:slf4j-api"
-    api "org.apache.logging.log4j:log4j-api"
-    api "org.apache.logging.log4j:log4j-core"
-    api "org.apache.logging.log4j:log4j-core"
-    api "org.apache.logging.log4j:log4j-slf4j-impl"
-    api "org.apache.logging.log4j:log4j-web"
+    api "junit:junit"
+    api "org.assertj:assertj-core"
 
-    api "com.lmax:disruptor"
-    api "com.alibaba:fastjson"
+    implementation "commons-io:commons-io"
+    implementation "org.apache.commons:commons-text"
 
-    api "com.fasterxml.jackson.core:jackson-databind"
-    api "com.fasterxml.jackson.core:jackson-core"
-    api "com.fasterxml.jackson.core:jackson-annotations"
+    implementation "org.apache.logging.log4j:log4j-api"
+    implementation "org.apache.logging.log4j:log4j-core"
+    implementation "org.apache.logging.log4j:log4j-core"
+    implementation "org.apache.logging.log4j:log4j-slf4j-impl"
+    implementation "org.apache.logging.log4j:log4j-web"
 
-    api "org.apache.httpcomponents:httpclient"
+    implementation "com.lmax:disruptor"
 
-    api "io.netty:netty-all"
+    implementation "com.fasterxml.jackson.core:jackson-databind"
+    implementation "com.fasterxml.jackson.core:jackson-core"
+    implementation "com.fasterxml.jackson.core:jackson-annotations"
 
-    api "junit:junit"
-    api "com.github.stefanbirkner:system-rules"
-    api "org.assertj:assertj-core"
+    implementation "org.apache.httpcomponents:httpclient"
+
+    implementation "io.netty:netty-all"
 
-    api "org.mockito:mockito-core"
-    api "org.powermock:powermock-module-junit4"
-    api "org.powermock:powermock-api-mockito2"
+    implementation "com.github.stefanbirkner:system-rules"
 
     testImplementation "org.apache.commons:commons-lang3"
     testImplementation "org.apache.commons:commons-collections4"
@@ -64,7 +59,6 @@ dependencies {
     testImplementation "org.apache.logging.log4j:log4j-web"
 
     testImplementation "com.lmax:disruptor"
-    testImplementation "com.alibaba:fastjson"
 
     testImplementation "com.fasterxml.jackson.core:jackson-databind"
     testImplementation "com.fasterxml.jackson.core:jackson-core"
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java
index 65bc770..b6c61d6 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java
@@ -17,20 +17,27 @@
 
 package org.apache.eventmesh.common.command;
 
-import com.alibaba.fastjson.JSON;
-import io.netty.buffer.Unpooled;
-import io.netty.handler.codec.http.*;
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.protocol.http.body.BaseResponseBody;
 import org.apache.eventmesh.common.protocol.http.body.Body;
 import org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader;
 import org.apache.eventmesh.common.protocol.http.header.Header;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+
 public class HttpCommand {
 
     private static AtomicLong requestId = new AtomicLong(0);
@@ -178,18 +185,18 @@ public class HttpCommand {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("httpCommand={")
-                .append(cmdType).append(",")
-                .append(httpMethod).append("/").append(httpVersion).append(",")
-                .append("requestCode=").append(requestCode).append(",")
-                .append("opaque=").append(opaque).append(",");
+            .append(cmdType).append(",")
+            .append(httpMethod).append("/").append(httpVersion).append(",")
+            .append("requestCode=").append(requestCode).append(",")
+            .append("opaque=").append(opaque).append(",");
 
         if (cmdType == CmdType.RES) {
             sb.append("cost=").append(resTime - reqTime).append(",");
         }
 
         sb.append("header=").append(header).append(",")
-                .append("body=").append(body)
-                .append("}");
+            .append("body=").append(body)
+            .append("}");
 
         return sb.toString();
     }
@@ -197,17 +204,17 @@ public class HttpCommand {
     public String abstractDesc() {
         StringBuilder sb = new StringBuilder();
         sb.append("httpCommand={")
-                .append(cmdType).append(",")
-                .append(httpMethod).append("/").append(httpVersion).append(",")
-                .append("requestCode=").append(requestCode).append(",")
-                .append("opaque=").append(opaque).append(",");
+            .append(cmdType).append(",")
+            .append(httpMethod).append("/").append(httpVersion).append(",")
+            .append("requestCode=").append(requestCode).append(",")
+            .append("opaque=").append(opaque).append(",");
 
         if (cmdType == CmdType.RES) {
             sb.append("cost=").append(resTime - reqTime).append(",");
         }
 
         sb.append("header=").append(header).append(",")
-                .append("bodySize=").append(body.toString().length()).append("}");
+            .append("bodySize=").append(body.toString().length()).append("}");
 
         return sb.toString();
     }
@@ -215,9 +222,9 @@ public class HttpCommand {
     public String simpleDesc() {
         StringBuilder sb = new StringBuilder();
         sb.append("httpCommand={")
-                .append(cmdType).append(",")
-                .append(httpMethod).append("/").append(httpVersion).append(",")
-                .append("requestCode=").append(requestCode).append("}");
+            .append(cmdType).append(",")
+            .append(httpMethod).append("/").append(httpVersion).append(",")
+            .append("requestCode=").append(requestCode).append("}");
 
         return sb.toString();
     }
@@ -228,10 +235,11 @@ public class HttpCommand {
         }
 
         DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
-                HttpResponseStatus.OK,
-                Unpooled.wrappedBuffer(JSON.toJSONString(this.getBody()).getBytes(Constants.DEFAULT_CHARSET)));
+            HttpResponseStatus.OK,
+            Unpooled.wrappedBuffer(
+                JsonUtils.serialize(this.getBody()).getBytes(Constants.DEFAULT_CHARSET)));
         response.headers().add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN +
-                "; charset=" + Constants.DEFAULT_CHARSET);
+            "; charset=" + Constants.DEFAULT_CHARSET);
         response.headers().add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
         response.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
         Map<String, Object> customHeader = this.getHeader().toMap();
diff --git a/eventmesh-schema-registry/build.gradle b/eventmesh-common/src/main/java/org/apache/eventmesh/common/exception/JsonException.java
similarity index 69%
copy from eventmesh-schema-registry/build.gradle
copy to eventmesh-common/src/main/java/org/apache/eventmesh/common/exception/JsonException.java
index 96a33a1..50f4bfa 100644
--- a/eventmesh-schema-registry/build.gradle
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/exception/JsonException.java
@@ -15,19 +15,18 @@
  * limitations under the License.
  */
 
-plugins {
-}
-
-group 'org.apache.eventmesh'
-version '1.2.0-SNAPSHOT'
+package org.apache.eventmesh.common.exception;
 
-repositories {
-    mavenCentral()
-}
+/**
+ * Json format exception, see {@link org.apache.eventmesh.common.utils.JsonUtils}.
+ */
+public class JsonException extends RuntimeException {
 
-dependencies {
-}
+    public JsonException(String message) {
+        super(message);
+    }
 
-test {
-    useJUnitPlatform()
+    public JsonException(String message, Throwable cause) {
+        super(message, cause);
+    }
 }
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java
index 751b632..0804098 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java
@@ -17,21 +17,22 @@
 
 package org.apache.eventmesh.common.protocol.http.body.client;
 
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.collections4.MapUtils;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-
-import org.apache.commons.collections4.MapUtils;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+import com.fasterxml.jackson.core.type.TypeReference;
 
 public class HeartbeatRequestBody extends Body {
 
-    public static final String CLIENTTYPE = "clientType";
+    public static final String CLIENTTYPE        = "clientType";
     public static final String HEARTBEATENTITIES = "heartbeatEntities";
-    public static final String CONSUMERGROUP = "consumerGroup";
+    public static final String CONSUMERGROUP     = "consumerGroup";
 
     private String consumerGroup;
 
@@ -67,16 +68,19 @@ public class HeartbeatRequestBody extends Body {
         HeartbeatRequestBody body = new HeartbeatRequestBody();
         body.setClientType(MapUtils.getString(bodyParam, CLIENTTYPE));
         body.setConsumerGroup(MapUtils.getString(bodyParam, CONSUMERGROUP));
-        body.setHeartbeatEntities(JSONArray.parseArray(MapUtils.getString(bodyParam, HEARTBEATENTITIES), HeartbeatEntity.class));
+        body.setHeartbeatEntities(JsonUtils
+            .deserialize(MapUtils.getString(bodyParam, HEARTBEATENTITIES),
+                new TypeReference<List<HeartbeatEntity>>() {
+                }));
         return body;
     }
 
     @Override
     public Map<String, Object> toMap() {
-        Map<String, Object> map = new HashMap<String, Object>();
+        Map<String, Object> map = new HashMap<>();
         map.put(CLIENTTYPE, clientType);
         map.put(CONSUMERGROUP, consumerGroup);
-        map.put(HEARTBEATENTITIES, JSON.toJSONString(heartbeatEntities));
+        map.put(HEARTBEATENTITIES, JsonUtils.serialize(heartbeatEntities));
         return map;
     }
 
@@ -90,10 +94,10 @@ public class HeartbeatRequestBody extends Body {
         public String toString() {
             StringBuilder sb = new StringBuilder();
             sb.append("heartbeatEntity={")
-                    .append("topic=").append(topic).append(",")
-                    .append("serviceId=").append(serviceId).append(",")
-                    .append("instanceId=").append(instanceId).append(",")
-                    .append("url=").append(url).append("}");
+                .append("topic=").append(topic).append(",")
+                .append("serviceId=").append(serviceId).append(",")
+                .append("instanceId=").append(instanceId).append(",")
+                .append("url=").append(url).append("}");
             return sb.toString();
         }
     }
@@ -102,8 +106,8 @@ public class HeartbeatRequestBody extends Body {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("heartbeatRequestBody={")
-                .append("consumerGroup=").append(consumerGroup).append(",")
-                .append("clientType=").append(clientType).append("}");
+            .append("consumerGroup=").append(consumerGroup).append(",")
+            .append("clientType=").append(clientType).append("}");
         return sb.toString();
     }
 }
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java
index 78bb684..21a7beb 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java
@@ -17,16 +17,17 @@
 
 package org.apache.eventmesh.common.protocol.http.body.client;
 
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.collections4.MapUtils;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-
-import org.apache.commons.collections4.MapUtils;
-import org.apache.eventmesh.common.protocol.SubscriptionItem;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+import com.fasterxml.jackson.core.type.TypeReference;
 
 public class RegRequestBody extends Body {
 
@@ -70,25 +71,27 @@ public class RegRequestBody extends Body {
         RegRequestBody body = new RegRequestBody();
         body.setClientType(MapUtils.getString(bodyParam, CLIENTTYPE));
         body.setEndPoint(MapUtils.getString(bodyParam, ENDPOINT));
-        body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPICS), SubscriptionItem.class));
+        body.setTopics(JsonUtils.deserialize(MapUtils.getString(bodyParam, TOPICS),
+            new TypeReference<List<SubscriptionItem>>() {
+            }));
         return body;
     }
 
     @Override
     public Map<String, Object> toMap() {
-        Map<String, Object> map = new HashMap<String, Object>();
+        Map<String, Object> map = new HashMap<>();
         map.put(CLIENTTYPE, clientType);
         map.put(ENDPOINT, endPoint);
-        map.put(TOPICS, JSON.toJSONString(topics));
+        map.put(TOPICS, JsonUtils.serialize(topics));
         return map;
     }
 
     @Override
     public String toString() {
-        return "regRequestBody{" +
-                "clientType='" + clientType + '\'' +
-                ", endPoint='" + endPoint + '\'' +
-                ", topics=" + topics +
-                '}';
+        return "regRequestBody{"
+            + "clientType='" + clientType + '\''
+            + ", endPoint='" + endPoint + '\''
+            + ", topics=" + topics
+            + '}';
     }
 }
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java
index 72e2e91..156cc3c 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java
@@ -17,16 +17,17 @@
 
 package org.apache.eventmesh.common.protocol.http.body.client;
 
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.collections4.MapUtils;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-
-import org.apache.commons.collections4.MapUtils;
-import org.apache.eventmesh.common.protocol.SubscriptionItem;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+import com.fasterxml.jackson.core.type.TypeReference;
 
 public class SubscribeRequestBody extends Body {
 
@@ -69,7 +70,9 @@ public class SubscribeRequestBody extends Body {
     public static SubscribeRequestBody buildBody(Map<String, Object> bodyParam) {
         SubscribeRequestBody body = new SubscribeRequestBody();
         body.setUrl(MapUtils.getString(bodyParam, URL));
-        body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), SubscriptionItem.class));
+        body.setTopics(JsonUtils.deserialize(MapUtils.getString(bodyParam, TOPIC),
+            new TypeReference<List<SubscriptionItem>>() {
+            }));
         body.setConsumerGroup(MapUtils.getString(bodyParam, CONSUMERGROUP));
         return body;
     }
@@ -78,17 +81,17 @@ public class SubscribeRequestBody extends Body {
     public Map<String, Object> toMap() {
         Map<String, Object> map = new HashMap<String, Object>();
         map.put(URL, url);
-        map.put(TOPIC, JSON.toJSONString(topics));
+        map.put(TOPIC, JsonUtils.serialize(topics));
         map.put(CONSUMERGROUP, consumerGroup);
         return map;
     }
 
     @Override
     public String toString() {
-        return "subscribeBody{" +
-                "consumerGroup='" + consumerGroup + '\'' +
-                ", url='" + url + '\'' +
-                ", topics=" + topics +
-                '}';
+        return "subscribeBody{"
+            + "consumerGroup='" + consumerGroup + '\''
+            + ", url='" + url + '\''
+            + ", topics=" + topics
+            + '}';
     }
 }
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnRegRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnRegRequestBody.java
index fe44595..b20f4c2 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnRegRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnRegRequestBody.java
@@ -17,15 +17,16 @@
 
 package org.apache.eventmesh.common.protocol.http.body.client;
 
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.collections4.MapUtils;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-
-import org.apache.commons.collections4.MapUtils;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+import com.fasterxml.jackson.core.type.TypeReference;
 
 public class UnRegRequestBody extends Body {
 
@@ -56,15 +57,17 @@ public class UnRegRequestBody extends Body {
     public static UnRegRequestBody buildBody(Map<String, Object> bodyParam) {
         UnRegRequestBody body = new UnRegRequestBody();
         body.setClientType(MapUtils.getString(bodyParam, CLIENTTYPE));
-        body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPICS), UnRegTopicEntity.class));
+        body.setTopics(JsonUtils.deserialize(MapUtils.getString(bodyParam, TOPICS),
+            new TypeReference<List<UnRegTopicEntity>>() {
+            }));
         return body;
     }
 
     @Override
     public Map<String, Object> toMap() {
-        Map<String, Object> map = new HashMap<String, Object>();
+        Map<String, Object> map = new HashMap<>();
         map.put(CLIENTTYPE, clientType);
-        map.put(TOPICS, JSON.toJSONString(topics));
+        map.put(TOPICS, JsonUtils.serialize(topics));
         return map;
     }
 
@@ -72,9 +75,9 @@ public class UnRegRequestBody extends Body {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("regRequestBody={")
-                .append("clientType=").append(clientType)
-                .append("topics=").append(topics)
-                .append("}");
+            .append("clientType=").append(clientType)
+            .append("topics=").append(topics)
+            .append("}");
         return sb.toString();
     }
 
@@ -87,9 +90,9 @@ public class UnRegRequestBody extends Body {
         public String toString() {
             StringBuilder sb = new StringBuilder();
             sb.append("unRegTopicEntity={")
-                    .append("topic=").append(topic).append(",")
-                    .append("serviceId=").append(serviceId).append(",")
-                    .append("instanceId=").append(instanceId).append("}");
+                .append("topic=").append(topic).append(",")
+                .append("serviceId=").append(serviceId).append(",")
+                .append("instanceId=").append(instanceId).append("}");
             return sb.toString();
         }
     }
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnSubscribeRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnSubscribeRequestBody.java
index 756f1a9..5904a96 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnSubscribeRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnSubscribeRequestBody.java
@@ -17,15 +17,16 @@
 
 package org.apache.eventmesh.common.protocol.http.body.client;
 
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.collections4.MapUtils;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-
-import org.apache.commons.collections4.MapUtils;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+import com.fasterxml.jackson.core.type.TypeReference;
 
 public class UnSubscribeRequestBody extends Body {
 
@@ -68,26 +69,28 @@ public class UnSubscribeRequestBody extends Body {
     public static UnSubscribeRequestBody buildBody(Map<String, Object> bodyParam) {
         UnSubscribeRequestBody body = new UnSubscribeRequestBody();
         body.setUrl(MapUtils.getString(bodyParam, URL));
-        body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), String.class));
+        body.setTopics(JsonUtils
+            .deserialize(MapUtils.getString(bodyParam, TOPIC), new TypeReference<List<String>>() {
+            }));
         body.setConsumerGroup(MapUtils.getString(bodyParam, CONSUMERGROUP));
         return body;
     }
 
     @Override
     public Map<String, Object> toMap() {
-        Map<String, Object> map = new HashMap<String, Object>();
+        Map<String, Object> map = new HashMap<>();
         map.put(URL, url);
-        map.put(TOPIC, JSON.toJSONString(topics));
+        map.put(TOPIC, JsonUtils.serialize(topics));
         map.put(CONSUMERGROUP, consumerGroup);
         return map;
     }
 
     @Override
     public String toString() {
-        return "unSubscribeRequestBody{" +
-                "consumerGroup='" + consumerGroup + '\'' +
-                ", url='" + url + '\'' +
-                ", topics=" + topics +
-                '}';
+        return "unSubscribeRequestBody{"
+            + "consumerGroup='" + consumerGroup + '\''
+            + ", url='" + url + '\''
+            + ", topics=" + topics
+            + '}';
     }
 }
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/PushMessageRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/PushMessageRequestBody.java
index accefa6..fad1e95 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/PushMessageRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/PushMessageRequestBody.java
@@ -17,23 +17,24 @@
 
 package org.apache.eventmesh.common.protocol.http.body.message;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
 
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.type.TypeReference;
 
 public class PushMessageRequestBody extends Body {
 
-    public static final String RANDOMNO = "randomNo";
-    public static final String TOPIC = "topic";
-    public static final String BIZSEQNO = "bizSeqNo";
-    public static final String UNIQUEID = "uniqueId";
-    public static final String CONTENT = "content";
+    public static final String RANDOMNO  = "randomNo";
+    public static final String TOPIC     = "topic";
+    public static final String BIZSEQNO  = "bizSeqNo";
+    public static final String UNIQUEID  = "uniqueId";
+    public static final String CONTENT   = "content";
     public static final String EXTFIELDS = "extFields";
 
     private String randomNo;
@@ -96,7 +97,6 @@ public class PushMessageRequestBody extends Body {
         this.extFields = extFields;
     }
 
-    @SuppressWarnings("unchecked")
     public static PushMessageRequestBody buildBody(final Map<String, Object> bodyParam) {
         PushMessageRequestBody pushMessageRequestBody = new PushMessageRequestBody();
         pushMessageRequestBody.setContent(MapUtils.getString(bodyParam, CONTENT));
@@ -107,20 +107,22 @@ public class PushMessageRequestBody extends Body {
         String extFields = MapUtils.getString(bodyParam, EXTFIELDS);
 
         if (StringUtils.isNotBlank(extFields)) {
-            pushMessageRequestBody.setExtFields((HashMap<String, String>) JSONObject.parseObject(extFields, HashMap.class));
+            pushMessageRequestBody.setExtFields(
+                JsonUtils.deserialize(extFields, new TypeReference<HashMap<String, String>>() {
+                }));
         }
         return pushMessageRequestBody;
     }
 
     @Override
     public Map<String, Object> toMap() {
-        Map<String, Object> map = new HashMap<String, Object>();
+        Map<String, Object> map = new HashMap<>();
         map.put(RANDOMNO, randomNo);
         map.put(TOPIC, topic);
         map.put(CONTENT, content);
         map.put(BIZSEQNO, bizSeqNo);
         map.put(UNIQUEID, uniqueId);
-        map.put(EXTFIELDS, JSON.toJSONString(extFields));
+        map.put(EXTFIELDS, JsonUtils.serialize(extFields));
 
         return map;
     }
@@ -129,12 +131,12 @@ public class PushMessageRequestBody extends Body {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("pushMessageRequestBody={")
-                .append("randomNo=").append(randomNo).append(",")
-                .append("topic=").append(topic).append(",")
-                .append("bizSeqNo=").append(bizSeqNo).append(",")
-                .append("uniqueId=").append(uniqueId).append(",")
-                .append("content=").append(content).append(",")
-                .append("extFields=").append(extFields).append("}");
+            .append("randomNo=").append(randomNo).append(",")
+            .append("topic=").append(topic).append(",")
+            .append("bizSeqNo=").append(bizSeqNo).append(",")
+            .append("uniqueId=").append(uniqueId).append(",")
+            .append("content=").append(content).append(",")
+            .append("extFields=").append(extFields).append("}");
         return sb.toString();
     }
 }
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageRequestBody.java
index e2de019..93fa8b5 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageRequestBody.java
@@ -17,23 +17,24 @@
 
 package org.apache.eventmesh.common.protocol.http.body.message;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
 
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.type.TypeReference;
 
 public class ReplyMessageRequestBody extends Body {
 
-    public static final String ORIGTOPIC = "origTopic";
-    public static final String BIZSEQNO = "bizSeqNo";
-    public static final String UNIQUEID = "uniqueId";
-    public static final String CONTENT = "content";
-    public static final String EXTFIELDS = "extFields";
+    public static final String ORIGTOPIC     = "origTopic";
+    public static final String BIZSEQNO      = "bizSeqNo";
+    public static final String UNIQUEID      = "uniqueId";
+    public static final String CONTENT       = "content";
+    public static final String EXTFIELDS     = "extFields";
     public static final String PRODUCERGROUP = "producerGroup";
 
     private String bizSeqNo;
@@ -96,7 +97,6 @@ public class ReplyMessageRequestBody extends Body {
         this.producerGroup = producerGroup;
     }
 
-    @SuppressWarnings("unchecked")
     public static ReplyMessageRequestBody buildBody(Map<String, Object> bodyParam) {
         ReplyMessageRequestBody body = new ReplyMessageRequestBody();
         body.setBizSeqNo(MapUtils.getString(bodyParam, BIZSEQNO));
@@ -105,7 +105,9 @@ public class ReplyMessageRequestBody extends Body {
         body.setOrigTopic(MapUtils.getString(bodyParam, ORIGTOPIC));
         String extFields = MapUtils.getString(bodyParam, EXTFIELDS);
         if (StringUtils.isNotBlank(extFields)) {
-            body.setExtFields((HashMap<String, String>) JSONObject.parseObject(extFields, HashMap.class));
+            body.setExtFields(
+                JsonUtils.deserialize(extFields, new TypeReference<HashMap<String, String>>() {
+                }));
         }
         body.setProducerGroup(MapUtils.getString(bodyParam, PRODUCERGROUP));
         return body;
@@ -115,12 +117,12 @@ public class ReplyMessageRequestBody extends Body {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("replyMessageRequestBody={")
-                .append("bizSeqNo=").append(bizSeqNo).append(",")
-                .append("uniqueId=").append(uniqueId).append(",")
-                .append("origTopic=").append(origTopic).append(",")
-                .append("content=").append(content).append(",")
-                .append("producerGroup=").append(producerGroup).append(",")
-                .append("extFields=").append(extFields).append("}");
+            .append("bizSeqNo=").append(bizSeqNo).append(",")
+            .append("uniqueId=").append(uniqueId).append(",")
+            .append("origTopic=").append(origTopic).append(",")
+            .append("content=").append(content).append(",")
+            .append("producerGroup=").append(producerGroup).append(",")
+            .append("extFields=").append(extFields).append("}");
         return sb.toString();
     }
 
@@ -131,7 +133,7 @@ public class ReplyMessageRequestBody extends Body {
         map.put(ORIGTOPIC, origTopic);
         map.put(UNIQUEID, uniqueId);
         map.put(CONTENT, content);
-        map.put(EXTFIELDS, JSON.toJSONString(extFields));
+        map.put(EXTFIELDS, JsonUtils.serialize(extFields));
         map.put(PRODUCERGROUP, producerGroup);
         return map;
     }
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchRequestBody.java
index 576e352..0b544c4 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchRequestBody.java
@@ -17,22 +17,23 @@
 
 package org.apache.eventmesh.common.protocol.http.body.message;
 
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+import com.fasterxml.jackson.core.type.TypeReference;
 
 public class SendMessageBatchRequestBody extends Body {
 
-    public static final String BATCHID = "batchId";
-    public static final String CONTENTS = "contents";
-    public static final String SIZE = "size";
+    public static final String BATCHID       = "batchId";
+    public static final String CONTENTS      = "contents";
+    public static final String SIZE          = "size";
     public static final String PRODUCERGROUP = "producerGroup";
 
     private String batchId;
@@ -82,10 +83,10 @@ public class SendMessageBatchRequestBody extends Body {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("sendMessageBatchRequestBody={")
-                .append("batchId=").append(batchId).append(",")
-                .append("size=").append(size).append(",")
-                .append("producerGroup=").append(producerGroup).append(",")
-                .append("contents=").append(JSON.toJSONString(contents)).append("}");
+            .append("batchId=").append(batchId).append(",")
+            .append("size=").append(size).append(",")
+            .append("producerGroup=").append(producerGroup).append(",")
+            .append("contents=").append(JsonUtils.serialize(contents)).append("}");
         return sb.toString();
     }
 
@@ -100,27 +101,29 @@ public class SendMessageBatchRequestBody extends Body {
         public String toString() {
             StringBuilder sb = new StringBuilder();
             sb.append("batchMessageEntity={")
-                    .append("bizSeqNo=").append(bizSeqNo).append(",")
-                    .append("topic=").append(topic).append(",")
-                    .append("msg=").append(msg).append(",")
-                    .append("ttl=").append(ttl).append(",")
-                    .append("tag=").append(tag).append("}");
+                .append("bizSeqNo=").append(bizSeqNo).append(",")
+                .append("topic=").append(topic).append(",")
+                .append("msg=").append(msg).append(",")
+                .append("ttl=").append(ttl).append(",")
+                .append("tag=").append(tag).append("}");
             return sb.toString();
         }
     }
 
     public static SendMessageBatchRequestBody buildBody(final Map<String, Object> bodyParam) {
         String batchId = MapUtils.getString(bodyParam,
-                BATCHID);
+            BATCHID);
         String size = StringUtils.isBlank(MapUtils.getString(bodyParam,
-                SIZE)) ? "1" : MapUtils.getString(bodyParam,
-                SIZE);
+            SIZE)) ? "1" : MapUtils.getString(bodyParam,
+            SIZE);
         String contents = MapUtils.getString(bodyParam,
-                CONTENTS, null);
+            CONTENTS, null);
         SendMessageBatchRequestBody body = new SendMessageBatchRequestBody();
         body.setBatchId(batchId);
         if (StringUtils.isNotBlank(contents)) {
-            body.setContents(JSONArray.parseArray(contents, BatchMessageEntity.class));
+            body.setContents(
+                JsonUtils.deserialize(contents, new TypeReference<List<BatchMessageEntity>>() {
+                }));
         }
         body.setSize(size);
         body.setProducerGroup(MapUtils.getString(bodyParam, PRODUCERGROUP));
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageRequestBody.java
index 5c302e8..196a9cc 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageRequestBody.java
@@ -17,24 +17,26 @@
 
 package org.apache.eventmesh.common.protocol.http.body.message;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import com.alibaba.fastjson.JSONObject;
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
 
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.type.TypeReference;
 
 public class SendMessageRequestBody extends Body {
 
-    public static final String TOPIC = "topic";
-    public static final String BIZSEQNO = "bizSeqNo";
-    public static final String UNIQUEID = "uniqueId";
-    public static final String CONTENT = "content";
-    public static final String TTL = "ttl";
-    public static final String TAG = "tag";
-    public static final String EXTFIELDS = "extFields";
+    public static final String TOPIC         = "topic";
+    public static final String BIZSEQNO      = "bizSeqNo";
+    public static final String UNIQUEID      = "uniqueId";
+    public static final String CONTENT       = "content";
+    public static final String TTL           = "ttl";
+    public static final String TAG           = "tag";
+    public static final String EXTFIELDS     = "extFields";
     public static final String PRODUCERGROUP = "producerGroup";
 
     private String topic;
@@ -117,7 +119,6 @@ public class SendMessageRequestBody extends Body {
         this.producerGroup = producerGroup;
     }
 
-    @SuppressWarnings("unchecked")
     public static SendMessageRequestBody buildBody(Map<String, Object> bodyParam) {
         SendMessageRequestBody body = new SendMessageRequestBody();
         body.setTopic(MapUtils.getString(bodyParam, TOPIC));
@@ -128,7 +129,9 @@ public class SendMessageRequestBody extends Body {
         body.setContent(MapUtils.getString(bodyParam, CONTENT));
         String extFields = MapUtils.getString(bodyParam, EXTFIELDS);
         if (StringUtils.isNotBlank(extFields)) {
-            body.setExtFields((HashMap<String, String>) JSONObject.parseObject(extFields, HashMap.class));
+            body.setExtFields(
+                JsonUtils.deserialize(extFields, new TypeReference<HashMap<String, String>>() {
+                }));
         }
         body.setProducerGroup(MapUtils.getString(bodyParam, PRODUCERGROUP));
         return body;
@@ -136,7 +139,7 @@ public class SendMessageRequestBody extends Body {
 
     @Override
     public Map<String, Object> toMap() {
-        Map<String, Object> map = new HashMap<String, Object>();
+        Map<String, Object> map = new HashMap<>();
         map.put(TOPIC, topic);
         map.put(BIZSEQNO, bizSeqNo);
         map.put(UNIQUEID, uniqueId);
@@ -152,14 +155,14 @@ public class SendMessageRequestBody extends Body {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("sendMessageRequestBody={")
-                .append("topic=").append(topic).append(",")
-                .append("bizSeqNo=").append(bizSeqNo).append(",")
-                .append("uniqueId=").append(uniqueId).append(",")
-                .append("content=").append(content).append(",")
-                .append("ttl=").append(ttl).append(",")
-                .append("tag=").append(tag).append(",")
-                .append("producerGroup=").append(producerGroup).append(",")
-                .append("extFields=").append(extFields).append("}");
+            .append("topic=").append(topic).append(",")
+            .append("bizSeqNo=").append(bizSeqNo).append(",")
+            .append("uniqueId=").append(uniqueId).append(",")
+            .append("content=").append(content).append(",")
+            .append("ttl=").append(ttl).append(",")
+            .append("tag=").append(tag).append(",")
+            .append("producerGroup=").append(producerGroup).append(",")
+            .append("extFields=").append(extFields).append("}");
         return sb.toString();
     }
 
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
new file mode 100644
index 0000000..a9c3fc1
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.utils;
+
+import org.apache.eventmesh.common.exception.JsonException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
+/**
+ * Json serialize or deserialize utils.
+ */
+public class JsonUtils {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    static {
+        OBJECT_MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+    }
+
+    /**
+     * Serialize object to json string.
+     *
+     * @param obj obj
+     * @return json string
+     */
+    public static String serialize(Object obj) {
+        try {
+            return OBJECT_MAPPER.writeValueAsString(obj);
+        } catch (JsonProcessingException e) {
+            throw new JsonException("serialize to json error", e);
+        }
+    }
+
+    /**
+     * Deserialize json string to object.
+     *
+     * @param str json string
+     * @param clz object class
+     * @param <T> object type
+     * @return object
+     */
+    public static <T> T deserialize(String str, Class<T> clz) {
+        try {
+            return OBJECT_MAPPER.readValue(str, clz);
+        } catch (JsonProcessingException e) {
+            throw new JsonException("deserialize json string to object error", e);
+        }
+    }
+
+    /**
+     * Deserialize json string to object.
+     *
+     * @param str           json string
+     * @param typeReference object type reference
+     * @param <T>           object type
+     * @return object
+     */
+    public static <T> T deserialize(String str, TypeReference<T> typeReference) {
+        try {
+            return OBJECT_MAPPER.readValue(str, typeReference);
+        } catch (JsonProcessingException e) {
+            throw new JsonException("deserialize json string to object error", e);
+        }
+    }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle
index 4b88cfd..962a46a 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle
@@ -16,7 +16,8 @@
  */
 
 dependencies {
-    api project(":eventmesh-spi")
+    compileOnly project(":eventmesh-spi")
+    compileOnly project(":eventmesh-common")
     api 'io.openmessaging:openmessaging-api'
     api 'io.dropwizard.metrics:metrics-core'
     api "io.dropwizard.metrics:metrics-healthchecks"
@@ -24,6 +25,7 @@ dependencies {
     api "io.dropwizard.metrics:metrics-json"
 
     testImplementation project(":eventmesh-spi")
+    testImplementation project(":eventmesh-common")
     testImplementation 'io.openmessaging:openmessaging-api'
     testImplementation 'io.dropwizard.metrics:metrics-core'
     testImplementation "io.dropwizard.metrics:metrics-healthchecks"
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle
index 5123f8d..7f603d8 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle
@@ -27,26 +27,33 @@ configurations {
 }
 
 List rocketmq = [
-        "org.apache.rocketmq:rocketmq-client:$rocketmq_version",
-        "org.apache.rocketmq:rocketmq-broker:$rocketmq_version",
-        "org.apache.rocketmq:rocketmq-common:$rocketmq_version",
-        "org.apache.rocketmq:rocketmq-store:$rocketmq_version",
-        "org.apache.rocketmq:rocketmq-namesrv:$rocketmq_version",
-        "org.apache.rocketmq:rocketmq-tools:$rocketmq_version",
-        "org.apache.rocketmq:rocketmq-remoting:$rocketmq_version",
-        "org.apache.rocketmq:rocketmq-logging:$rocketmq_version",
-        "org.apache.rocketmq:rocketmq-test:$rocketmq_version",
-        "org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",
-        "org.apache.rocketmq:rocketmq-filter:$rocketmq_version",
-        "org.apache.rocketmq:rocketmq-acl:$rocketmq_version",
-        "org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",
+    "org.apache.rocketmq:rocketmq-client:$rocketmq_version",
+    "org.apache.rocketmq:rocketmq-broker:$rocketmq_version",
+    "org.apache.rocketmq:rocketmq-common:$rocketmq_version",
+    "org.apache.rocketmq:rocketmq-store:$rocketmq_version",
+    "org.apache.rocketmq:rocketmq-namesrv:$rocketmq_version",
+    "org.apache.rocketmq:rocketmq-tools:$rocketmq_version",
+    "org.apache.rocketmq:rocketmq-remoting:$rocketmq_version",
+    "org.apache.rocketmq:rocketmq-logging:$rocketmq_version",
+    "org.apache.rocketmq:rocketmq-test:$rocketmq_version",
+    "org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",
+    "org.apache.rocketmq:rocketmq-filter:$rocketmq_version",
+    "org.apache.rocketmq:rocketmq-acl:$rocketmq_version",
+    "org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",
 
 ]
 
 dependencies {
-    api project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    compileOnly project(":eventmesh-common")
+    implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
     implementation rocketmq
 
     testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    testImplementation project(":eventmesh-common")
+
+    testImplementation "org.mockito:mockito-core"
+    testImplementation "org.powermock:powermock-module-junit4"
+    testImplementation "org.powermock:powermock-api-mockito2"
+
     testImplementation rocketmq
 }
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
index c34a991..dbad619 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
@@ -17,31 +17,29 @@
 
 package org.apache.eventmesh.connector.rocketmq.producer;
 
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-
-import com.alibaba.fastjson.JSONObject;
-import io.openmessaging.api.Message;
-import io.openmessaging.api.MessageBuilder;
-import io.openmessaging.api.OnExceptionContext;
-import io.openmessaging.api.Producer;
-import io.openmessaging.api.SendCallback;
-import io.openmessaging.api.SendResult;
-import io.openmessaging.api.exception.OMSRuntimeException;
-
 import org.apache.eventmesh.api.RRCallback;
 import org.apache.eventmesh.connector.rocketmq.utils.OMSUtil;
+
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.RequestCallback;
-import org.apache.rocketmq.client.producer.RequestResponseFuture;
-import org.apache.rocketmq.client.utils.MessageUtil;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.openmessaging.api.Message;
+import io.openmessaging.api.MessageBuilder;
+import io.openmessaging.api.OnExceptionContext;
+import io.openmessaging.api.Producer;
+import io.openmessaging.api.SendCallback;
+import io.openmessaging.api.SendResult;
+import io.openmessaging.api.exception.OMSRuntimeException;
 
 public class ProducerImpl extends AbstractOMSProducer implements Producer {
 
@@ -63,7 +61,8 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
         super.getRocketmqProducer().setPollNameServerInterval(60000);
 
         super.getRocketmqProducer().getDefaultMQProducerImpl().getmQClientFactory()
-                .getNettyClientConfig().setClientAsyncSemaphoreValue(eventMeshServerAsyncAccumulationThreshold);
+            .getNettyClientConfig()
+            .setClientAsyncSemaphoreValue(eventMeshServerAsyncAccumulationThreshold);
         super.getRocketmqProducer().setCompressMsgBodyOverHowmuch(10);
     }
 
@@ -73,11 +72,12 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
         org.apache.rocketmq.common.message.Message msgRMQ = OMSUtil.msgConvert(message);
 
         try {
-            org.apache.rocketmq.client.producer.SendResult sendResultRMQ = this.rocketmqProducer.send(msgRMQ);
-            message.setMsgID(sendResultRMQ.getMsgId());
+            org.apache.rocketmq.client.producer.SendResult sendResultRmq =
+                this.rocketmqProducer.send(msgRMQ);
+            message.setMsgID(sendResultRmq.getMsgId());
             SendResult sendResult = new SendResult();
-            sendResult.setTopic(sendResultRMQ.getMessageQueue().getTopic());
-            sendResult.setMessageId(sendResultRMQ.getMsgId());
+            sendResult.setTopic(sendResultRmq.getMessageQueue().getTopic());
+            sendResult.setMessageId(sendResultRmq.getMsgId());
             return sendResult;
         } catch (Exception e) {
             log.error(String.format("Send message Exception, %s", message), e);
@@ -114,7 +114,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
     }
 
     public void request(Message message, RRCallback rrCallback, long timeout)
-            throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
 
         this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
         org.apache.rocketmq.common.message.Message msgRMQ = OMSUtil.msgConvert(message);
@@ -133,7 +133,8 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
             public void onException(Throwable e) {
                 String topic = message.getTopic();
                 String msgId = message.getMsgID();
-                OMSRuntimeException onsEx = ProducerImpl.this.checkProducerException(topic, msgId, e);
+                OMSRuntimeException onsEx =
+                    ProducerImpl.this.checkProducerException(topic, msgId, e);
                 OnExceptionContext context = new OnExceptionContext();
                 context.setTopic(topic);
                 context.setMessageId(msgId);
@@ -144,25 +145,28 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
         };
     }
 
-    private org.apache.rocketmq.client.producer.SendCallback sendCallbackConvert(final Message message, final SendCallback sendCallback) {
-        org.apache.rocketmq.client.producer.SendCallback rmqSendCallback = new org.apache.rocketmq.client.producer.SendCallback() {
-            @Override
-            public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
-                sendCallback.onSuccess(OMSUtil.sendResultConvert(sendResult));
-            }
-
-            @Override
-            public void onException(Throwable e) {
-                String topic = message.getTopic();
-                String msgId = message.getMsgID();
-                OMSRuntimeException onsEx = ProducerImpl.this.checkProducerException(topic, msgId, e);
-                OnExceptionContext context = new OnExceptionContext();
-                context.setTopic(topic);
-                context.setMessageId(msgId);
-                context.setException(onsEx);
-                sendCallback.onException(context);
-            }
-        };
+    private org.apache.rocketmq.client.producer.SendCallback sendCallbackConvert(
+        final Message message, final SendCallback sendCallback) {
+        org.apache.rocketmq.client.producer.SendCallback rmqSendCallback =
+            new org.apache.rocketmq.client.producer.SendCallback() {
+                @Override
+                public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
+                    sendCallback.onSuccess(OMSUtil.sendResultConvert(sendResult));
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    String topic = message.getTopic();
+                    String msgId = message.getMsgID();
+                    OMSRuntimeException onsEx =
+                        ProducerImpl.this.checkProducerException(topic, msgId, e);
+                    OnExceptionContext context = new OnExceptionContext();
+                    context.setTopic(topic);
+                    context.setMessageId(msgId);
+                    context.setException(onsEx);
+                    sendCallback.onException(context);
+                }
+            };
         return rmqSendCallback;
     }
 
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-standalone/build.gradle
index b9e1423..74fec4d 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/build.gradle
@@ -16,7 +16,9 @@
  */
 
 dependencies {
-    api project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    compileOnly project(":eventmesh-common")
+    implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
 
     testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    testImplementation project(":eventmesh-common")
 }
\ No newline at end of file
diff --git a/eventmesh-examples/build.gradle b/eventmesh-examples/build.gradle
index 32bfc3c..f62d019 100644
--- a/eventmesh-examples/build.gradle
+++ b/eventmesh-examples/build.gradle
@@ -22,11 +22,17 @@ dependencies {
 //    compile log4j2, sl4j
 //    testCompile log4j2, sl4j
     implementation project(":eventmesh-sdk-java")
+    implementation project(":eventmesh-common")
     implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
     implementation 'org.springframework.boot:spring-boot-starter-web'
+    implementation 'io.netty:netty-all'
+
     testImplementation project(":eventmesh-sdk-java")
+    testImplementation project(":eventmesh-common")
     testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
     testImplementation 'org.springframework.boot:spring-boot-starter-web'
+    testImplementation 'io.netty:netty-all'
+
 }
 
 configurations.all {
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
index 92ca09d..aa400ce 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
@@ -17,9 +17,12 @@
 
 package org.apache.eventmesh.http.demo.sub.controller;
 
-import com.alibaba.fastjson.JSONObject;
-
+import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.http.demo.sub.service.SubService;
+
+import java.util.HashMap;
+import java.util.Map;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -28,7 +31,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
 
-
 @RestController
 @RequestMapping("/sub")
 public class SubController {
@@ -40,12 +42,12 @@ public class SubController {
 
     @RequestMapping(value = "/test", method = RequestMethod.POST)
     public String subTest(@RequestBody String message) {
-        logger.info("=======receive message======= {}", JSONObject.toJSONString(message));
+        logger.info("=======receive message======= {}", JsonUtils.serialize(message));
         subService.consumeMessage(message);
 
-        JSONObject result = new JSONObject();
-        result.put("retCode", 1);
-        return result.toJSONString();
+        Map<String, Object> map = new HashMap<>();
+        map.put("retCode", 1);
+        return JsonUtils.serialize(map);
     }
 
 }
diff --git a/eventmesh-registry-plugin/eventmesh-registry-api/build.gradle b/eventmesh-registry-plugin/eventmesh-registry-api/build.gradle
index 0d41042..926cdba 100644
--- a/eventmesh-registry-plugin/eventmesh-registry-api/build.gradle
+++ b/eventmesh-registry-plugin/eventmesh-registry-api/build.gradle
@@ -16,7 +16,7 @@
  */
 
 dependencies {
-    api project(":eventmesh-spi")
+    compileOnly project(":eventmesh-spi")
 
     testImplementation project(":eventmesh-spi")
 }
diff --git a/eventmesh-registry-plugin/eventmesh-registry-namesrv/build.gradle b/eventmesh-registry-plugin/eventmesh-registry-namesrv/build.gradle
index ec1fbb2..33422de 100644
--- a/eventmesh-registry-plugin/eventmesh-registry-namesrv/build.gradle
+++ b/eventmesh-registry-plugin/eventmesh-registry-namesrv/build.gradle
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 dependencies {
-    api project(":eventmesh-registry-plugin:eventmesh-registry-api")
+    implementation project(":eventmesh-registry-plugin:eventmesh-registry-api")
 
     testImplementation project(":eventmesh-registry-plugin:eventmesh-registry-api")
 }
\ No newline at end of file
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index df1d212..2a58e49 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -16,13 +16,19 @@
  */
 
 dependencies {
-    api 'io.opentelemetry:opentelemetry-api'
-    api 'io.opentelemetry:opentelemetry-sdk'
-    api 'io.opentelemetry:opentelemetry-sdk-metrics'
-    api 'io.opentelemetry:opentelemetry-exporter-prometheus'
-    api 'io.prometheus:simpleclient'
-    api 'io.prometheus:simpleclient_httpserver'
-    api 'io.cloudevents:cloudevents-core'
+    implementation 'io.opentelemetry:opentelemetry-api'
+    implementation 'io.opentelemetry:opentelemetry-sdk'
+    implementation 'io.opentelemetry:opentelemetry-sdk-metrics'
+    implementation 'io.opentelemetry:opentelemetry-exporter-prometheus'
+    implementation 'io.prometheus:simpleclient'
+    implementation 'io.prometheus:simpleclient_httpserver'
+    implementation 'io.cloudevents:cloudevents-core'
+    
+    implementation "org.apache.httpcomponents:httpclient"
+    implementation 'io.netty:netty-all'
+
+    implementation project(":eventmesh-common")
+    implementation project(":eventmesh-spi")
     implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
     implementation project(":eventmesh-connector-plugin:eventmesh-connector-standalone")
     implementation project(":eventmesh-security-plugin:eventmesh-security-api")
@@ -30,10 +36,19 @@ dependencies {
     implementation project(":eventmesh-registry-plugin:eventmesh-registry-api")
     implementation project(":eventmesh-registry-plugin:eventmesh-registry-namesrv")
 
+    testImplementation project(":eventmesh-common")
+    testImplementation project(":eventmesh-spi")
     testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
     testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-standalone")
     testImplementation project(":eventmesh-security-plugin:eventmesh-security-api")
     testImplementation project(":eventmesh-security-plugin:eventmesh-security-acl")
     testImplementation project(":eventmesh-registry-plugin:eventmesh-registry-api")
     testImplementation project(":eventmesh-registry-plugin:eventmesh-registry-namesrv")
+
+    testImplementation "org.apache.httpcomponents:httpclient"
+    testImplementation "io.netty:netty-all"
+
+    testImplementation "org.mockito:mockito-core"
+    testImplementation "org.powermock:powermock-module-junit4"
+    testImplementation "org.powermock:powermock-api-mockito2"
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java
index 8857481..4c23663 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java
@@ -17,6 +17,16 @@
 
 package org.apache.eventmesh.runtime.core.protocol.http.consumer;
 
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
+import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
+import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
+import org.apache.eventmesh.runtime.core.consumergroup.event.ConsumerGroupStateEvent;
+import org.apache.eventmesh.runtime.core.consumergroup.event.ConsumerGroupTopicConfChangeEvent;
+import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
+
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,30 +40,27 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import com.alibaba.fastjson.JSONObject;
-import com.google.common.eventbus.Subscribe;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
-import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
-import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
-import org.apache.eventmesh.runtime.core.consumergroup.event.ConsumerGroupStateEvent;
-import org.apache.eventmesh.runtime.core.consumergroup.event.ConsumerGroupTopicConfChangeEvent;
-import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.eventbus.Subscribe;
+
 public class ConsumerManager {
 
     private EventMeshHTTPServer eventMeshHTTPServer;
 
-    private ConcurrentHashMap<String /** consumerGroup */, ConsumerGroupManager> consumerTable = new ConcurrentHashMap<String, ConsumerGroupManager>();
+    /**
+     * consumerGroup to ConsumerGroupManager.
+     */
+    private ConcurrentHashMap<String, ConsumerGroupManager> consumerTable =
+        new ConcurrentHashMap<>();
 
     private static final int DEFAULT_UPDATE_TIME = 3 * 30 * 1000;
 
     public Logger logger = LoggerFactory.getLogger(this.getClass());
 
-    private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+    private ScheduledExecutorService scheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor();
 
     public ConsumerManager(EventMeshHTTPServer eventMeshHTTPServer) {
         this.eventMeshHTTPServer = eventMeshHTTPServer;
@@ -71,7 +78,8 @@ public class ConsumerManager {
             public void run() {
                 logger.info("clientInfo check start.....");
                 synchronized (eventMeshHTTPServer.localClientInfoMapping) {
-                    Map<String, List<Client>> clientInfoMap = eventMeshHTTPServer.localClientInfoMapping;
+                    Map<String, List<Client>> clientInfoMap =
+                        eventMeshHTTPServer.localClientInfoMapping;
                     if (clientInfoMap.size() > 0) {
                         for (String key : clientInfoMap.keySet()) {
                             String consumerGroup = key.split("@")[0];
@@ -82,9 +90,11 @@ public class ConsumerManager {
                             while (clientIterator.hasNext()) {
                                 Client client = clientIterator.next();
                                 //The time difference is greater than 3 heartbeat cycles
-                                if (System.currentTimeMillis() - client.lastUpTime.getTime() > DEFAULT_UPDATE_TIME) {
-                                    logger.warn("client {} lastUpdate time {} over three heartbeat cycles",
-                                            JSONObject.toJSONString(client), client.lastUpTime);
+                                if (System.currentTimeMillis() - client.lastUpTime.getTime()
+                                    > DEFAULT_UPDATE_TIME) {
+                                    logger.warn(
+                                        "client {} lastUpdate time {} over three heartbeat cycles",
+                                        JsonUtils.serialize(client), client.lastUpTime);
                                     clientIterator.remove();
                                     isChange = true;
                                 }
@@ -92,13 +102,15 @@ public class ConsumerManager {
                             if (isChange) {
                                 if (clientList.size() > 0) {
                                     //change url
-                                    logger.info("consumerGroup {} client info changing", consumerGroup);
+                                    logger.info("consumerGroup {} client info changing",
+                                        consumerGroup);
                                     Map<String, List<String>> idcUrls = new HashMap<>();
                                     Set<String> clientUrls = new HashSet<>();
                                     for (Client client : clientList) {
                                         clientUrls.add(client.url);
                                         if (idcUrls.containsKey(client.idc)) {
-                                            idcUrls.get(client.idc).add(StringUtils.deleteWhitespace(client.url));
+                                            idcUrls.get(client.idc)
+                                                .add(StringUtils.deleteWhitespace(client.url));
                                         } else {
                                             List<String> urls = new ArrayList<>();
                                             urls.add(client.url);
@@ -106,14 +118,19 @@ public class ConsumerManager {
                                         }
                                     }
                                     synchronized (eventMeshHTTPServer.localConsumerGroupMapping) {
-                                        ConsumerGroupConf consumerGroupConf = eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
-                                        Map<String, ConsumerGroupTopicConf> map = consumerGroupConf.getConsumerGroupTopicConf();
+                                        ConsumerGroupConf consumerGroupConf =
+                                            eventMeshHTTPServer.localConsumerGroupMapping
+                                                .get(consumerGroup);
+                                        Map<String, ConsumerGroupTopicConf> map =
+                                            consumerGroupConf.getConsumerGroupTopicConf();
                                         for (String topicKey : map.keySet()) {
                                             if (StringUtils.equals(topic, topicKey)) {
-                                                ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
+                                                ConsumerGroupTopicConf latestTopicConf =
+                                                    new ConsumerGroupTopicConf();
                                                 latestTopicConf.setConsumerGroup(consumerGroup);
                                                 latestTopicConf.setTopic(topic);
-                                                latestTopicConf.setSubscriptionItem(map.get(topicKey).getSubscriptionItem());
+                                                latestTopicConf.setSubscriptionItem(
+                                                    map.get(topicKey).getSubscriptionItem());
                                                 latestTopicConf.setUrls(clientUrls);
 
                                                 latestTopicConf.setIdcUrls(idcUrls);
@@ -121,26 +138,31 @@ public class ConsumerManager {
                                                 map.put(topic, latestTopicConf);
                                             }
                                         }
-                                        eventMeshHTTPServer.localConsumerGroupMapping.put(consumerGroup, consumerGroupConf);
-                                        logger.info("consumerGroup {} client info changed, consumerGroupConf {}", consumerGroup,
-                                                JSONObject.toJSONString(consumerGroupConf));
+                                        eventMeshHTTPServer.localConsumerGroupMapping
+                                            .put(consumerGroup, consumerGroupConf);
+                                        logger.info(
+                                            "consumerGroup {} client info changed, "
+                                                + "consumerGroupConf {}", consumerGroup,
+                                            JsonUtils.serialize(consumerGroupConf));
                                         try {
-                                            notifyConsumerManager(consumerGroup, consumerGroupConf, eventMeshHTTPServer.localConsumerGroupMapping);
+                                            notifyConsumerManager(consumerGroup, consumerGroupConf);
                                         } catch (Exception e) {
                                             e.printStackTrace();
                                         }
                                     }
 
                                 } else {
-                                    logger.info("consumerGroup {} client info removed", consumerGroup);
+                                    logger.info("consumerGroup {} client info removed",
+                                        consumerGroup);
                                     //remove
                                     try {
-                                        notifyConsumerManager(consumerGroup, null, eventMeshHTTPServer.localConsumerGroupMapping);
+                                        notifyConsumerManager(consumerGroup, null);
                                     } catch (Exception e) {
                                         e.printStackTrace();
                                     }
 
-                                    eventMeshHTTPServer.localConsumerGroupMapping.keySet().removeIf(s -> StringUtils.equals(consumerGroup, s));
+                                    eventMeshHTTPServer.localConsumerGroupMapping.keySet()
+                                        .removeIf(s -> StringUtils.equals(consumerGroup, s));
                                 }
                             }
 
@@ -154,9 +176,11 @@ public class ConsumerManager {
     /**
      * notify ConsumerManager groupLevel
      */
-    public void notifyConsumerManager(String consumerGroup, ConsumerGroupConf latestConsumerGroupConfig,
-                                      ConcurrentHashMap<String, ConsumerGroupConf> localConsumerGroupMapping) throws Exception {
-        ConsumerGroupManager cgm = eventMeshHTTPServer.getConsumerManager().getConsumer(consumerGroup);
+    public void notifyConsumerManager(String consumerGroup,
+                                      ConsumerGroupConf latestConsumerGroupConfig)
+        throws Exception {
+        ConsumerGroupManager cgm =
+            eventMeshHTTPServer.getConsumerManager().getConsumer(consumerGroup);
         if (latestConsumerGroupConfig == null) {
             ConsumerGroupStateEvent notification = new ConsumerGroupStateEvent();
             notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.DELETE;
@@ -207,8 +231,10 @@ public class ConsumerManager {
      * @param consumerGroupConfig
      * @throws Exception
      */
-    public synchronized void addConsumer(String consumerGroup, ConsumerGroupConf consumerGroupConfig) throws Exception {
-        ConsumerGroupManager cgm = new ConsumerGroupManager(eventMeshHTTPServer, consumerGroupConfig);
+    public synchronized void addConsumer(String consumerGroup,
+                                         ConsumerGroupConf consumerGroupConfig) throws Exception {
+        ConsumerGroupManager cgm =
+            new ConsumerGroupManager(eventMeshHTTPServer, consumerGroupConfig);
         cgm.init();
         cgm.start();
         consumerTable.put(consumerGroup, cgm);
@@ -217,8 +243,10 @@ public class ConsumerManager {
     /**
      * restart consumer
      */
-    public synchronized void restartConsumer(String consumerGroup, ConsumerGroupConf consumerGroupConfig) throws Exception {
-        if(consumerTable.containsKey(consumerGroup)) {
+    public synchronized void restartConsumer(String consumerGroup,
+                                             ConsumerGroupConf consumerGroupConfig)
+        throws Exception {
+        if (consumerTable.containsKey(consumerGroup)) {
             ConsumerGroupManager cgm = consumerTable.get(consumerGroup);
             cgm.refresh(consumerGroupConfig);
         }
@@ -239,9 +267,10 @@ public class ConsumerManager {
      */
     public synchronized void delConsumer(String consumerGroup) throws Exception {
         logger.info("start delConsumer with consumerGroup {}", consumerGroup);
-        if(consumerTable.containsKey(consumerGroup)) {
+        if (consumerTable.containsKey(consumerGroup)) {
             ConsumerGroupManager cgm = consumerTable.remove(consumerGroup);
-            logger.info("start unsubscribe topic with consumer group manager {}", JSONObject.toJSONString(cgm));
+            logger.info("start unsubscribe topic with consumer group manager {}",
+                JsonUtils.serialize(cgm));
             cgm.unsubscribe(consumerGroup);
             cgm.shutdown();
         }
@@ -252,25 +281,30 @@ public class ConsumerManager {
     public void onChange(ConsumerGroupTopicConfChangeEvent event) {
         try {
             logger.info("onChange event:{}", event);
-            if (event.action == ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.NEW) {
+            if (event.action
+                == ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.NEW) {
                 ConsumerGroupManager manager = getConsumer(event.consumerGroup);
                 if (Objects.isNull(manager)) {
                     return;
                 }
-                manager.getConsumerGroupConfig().getConsumerGroupTopicConf().put(event.topic, event.newTopicConf);
+                manager.getConsumerGroupConfig().getConsumerGroupTopicConf()
+                    .put(event.topic, event.newTopicConf);
                 return;
             }
 
-            if (event.action == ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.CHANGE) {
+            if (event.action
+                == ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.CHANGE) {
                 ConsumerGroupManager manager = getConsumer(event.consumerGroup);
                 if (Objects.isNull(manager)) {
                     return;
                 }
-                manager.getConsumerGroupConfig().getConsumerGroupTopicConf().replace(event.topic, event.newTopicConf);
+                manager.getConsumerGroupConfig().getConsumerGroupTopicConf()
+                    .replace(event.topic, event.newTopicConf);
                 return;
             }
 
-            if (event.action == ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.DELETE) {
+            if (event.action
+                == ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.DELETE) {
                 ConsumerGroupManager manager = getConsumer(event.consumerGroup);
                 if (Objects.isNull(manager)) {
                     return;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index 38f3f25..79f391e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -17,10 +17,6 @@
 
 package org.apache.eventmesh.runtime.core.protocol.http.processor;
 
-import com.alibaba.fastjson.JSON;
-import io.netty.channel.ChannelHandlerContext;
-import io.openmessaging.api.Message;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.eventmesh.api.RRCallback;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.IPUtil;
@@ -32,6 +28,7 @@ import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
 import org.apache.eventmesh.common.protocol.http.common.RequestCode;
 import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader;
 import org.apache.eventmesh.common.protocol.http.header.message.SendMessageResponseHeader;
+import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
@@ -43,10 +40,16 @@ import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageConte
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.OMSUtil;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.concurrent.TimeUnit;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.TimeUnit;
+import io.netty.channel.ChannelHandlerContext;
+import io.openmessaging.api.Message;
 
 public class SendSyncMessageProcessor implements HttpRequestProcessor {
 
@@ -65,48 +68,59 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
     }
 
     @Override
-    public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext) throws Exception {
+    public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext)
+        throws Exception {
 
         HttpCommand responseEventMeshCommand;
 
-        cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
-                EventMeshConstants.PROTOCOL_HTTP,
-                RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
+        cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}",
+            RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
+            EventMeshConstants.PROTOCOL_HTTP,
+            RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
 
-        SendMessageRequestHeader sendMessageRequestHeader = (SendMessageRequestHeader) asyncContext.getRequest().getHeader();
-        SendMessageRequestBody sendMessageRequestBody = (SendMessageRequestBody) asyncContext.getRequest().getBody();
+        SendMessageRequestHeader sendMessageRequestHeader =
+            (SendMessageRequestHeader) asyncContext.getRequest().getHeader();
+        SendMessageRequestBody sendMessageRequestBody =
+            (SendMessageRequestBody) asyncContext.getRequest().getBody();
 
         SendMessageResponseHeader sendMessageResponseHeader =
-                SendMessageResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
-                        IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
-                        eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
+            SendMessageResponseHeader
+                .buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()),
+                    eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
+                    IPUtil.getLocalAddress(),
+                    eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
+                    eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
 
         if (StringUtils.isBlank(sendMessageRequestHeader.getIdc())
-                || StringUtils.isBlank(sendMessageRequestHeader.getPid())
-                || !StringUtils.isNumeric(sendMessageRequestHeader.getPid())
-                || StringUtils.isBlank(sendMessageRequestHeader.getSys())) {
+            || StringUtils.isBlank(sendMessageRequestHeader.getPid())
+            || !StringUtils.isNumeric(sendMessageRequestHeader.getPid())
+            || StringUtils.isBlank(sendMessageRequestHeader.getSys())) {
             responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
-                    sendMessageResponseHeader,
-                    SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
+                sendMessageResponseHeader,
+                SendMessageResponseBody
+                    .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
+                        EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
             asyncContext.onComplete(responseEventMeshCommand);
             return;
         }
 
         if (StringUtils.isBlank(sendMessageRequestBody.getBizSeqNo())
-                || StringUtils.isBlank(sendMessageRequestBody.getUniqueId())
-                || StringUtils.isBlank(sendMessageRequestBody.getProducerGroup())
-                || StringUtils.isBlank(sendMessageRequestBody.getTopic())
-                || StringUtils.isBlank(sendMessageRequestBody.getContent())
-                || (StringUtils.isBlank(sendMessageRequestBody.getTtl()))) {
+            || StringUtils.isBlank(sendMessageRequestBody.getUniqueId())
+            || StringUtils.isBlank(sendMessageRequestBody.getProducerGroup())
+            || StringUtils.isBlank(sendMessageRequestBody.getTopic())
+            || StringUtils.isBlank(sendMessageRequestBody.getContent())
+            || (StringUtils.isBlank(sendMessageRequestBody.getTtl()))) {
             responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
-                    sendMessageResponseHeader,
-                    SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
+                sendMessageResponseHeader,
+                SendMessageResponseBody
+                    .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+                        EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
             asyncContext.onComplete(responseEventMeshCommand);
             return;
         }
 
         //do acl check
-        if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+        if (eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
             String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
             String user = sendMessageRequestHeader.getUsername();
             String pass = sendMessageRequestHeader.getPasswd();
@@ -115,12 +129,13 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
             String topic = sendMessageRequestBody.getTopic();
             try {
                 Acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode);
-            }catch (Exception e){
-                //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
+            } catch (Exception e) {
 
                 responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
-                        sendMessageResponseHeader,
-                        SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+                    sendMessageResponseHeader,
+                    SendMessageResponseBody
+                        .buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(),
+                            e.getMessage()));
                 asyncContext.onComplete(responseEventMeshCommand);
                 aclLogger.warn("CLIENT HAS NO PERMISSION,SendSyncMessageProcessor send failed", e);
                 return;
@@ -129,67 +144,84 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
 
         // control flow rate limit
         if (!eventMeshHTTPServer.getMsgRateLimiter()
-                .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
+            .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS,
+                TimeUnit.MILLISECONDS)) {
             responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
-                    sendMessageResponseHeader,
-                    SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(),
-                            EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg()));
+                sendMessageResponseHeader,
+                SendMessageResponseBody
+                    .buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(),
+                        EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg()));
             eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPDiscard();
             asyncContext.onComplete(responseEventMeshCommand);
             return;
         }
 
         String producerGroup = sendMessageRequestBody.getProducerGroup();
-        EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
+        EventMeshProducer eventMeshProducer =
+            eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
 
         if (!eventMeshProducer.getStarted().get()) {
             responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
-                    sendMessageResponseHeader,
-                    SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getErrMsg()));
+                sendMessageResponseHeader,
+                SendMessageResponseBody
+                    .buildBody(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getRetCode(),
+                        EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getErrMsg()));
             asyncContext.onComplete(responseEventMeshCommand);
             return;
         }
 
         String ttl = String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS);
-        if (StringUtils.isNotBlank(sendMessageRequestBody.getTtl()) && StringUtils.isNumeric(sendMessageRequestBody.getTtl())) {
+        if (StringUtils.isNotBlank(sendMessageRequestBody.getTtl())
+            && StringUtils.isNumeric(sendMessageRequestBody.getTtl())) {
             ttl = sendMessageRequestBody.getTtl();
         }
 
         Message omsMsg = new Message();
         try {
             // body
-            omsMsg.setBody(sendMessageRequestBody.getContent().getBytes(EventMeshConstants.DEFAULT_CHARSET));
+            omsMsg.setBody(
+                sendMessageRequestBody.getContent().getBytes(EventMeshConstants.DEFAULT_CHARSET));
             // topic
             omsMsg.setTopic(sendMessageRequestBody.getTopic());
-            omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION, sendMessageRequestBody.getTopic());
+            omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION,
+                sendMessageRequestBody.getTopic());
             if (!StringUtils.isBlank(sendMessageRequestBody.getTag())) {
                 omsMsg.putUserProperties("Tag", sendMessageRequestBody.getTag());
             }
             // ttl
             omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_TIMEOUT, ttl);
             // bizNo
-            omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_SEARCH_KEYS, sendMessageRequestBody.getBizSeqNo());
+            omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_SEARCH_KEYS,
+                sendMessageRequestBody.getBizSeqNo());
             omsMsg.putUserProperties("msgType", "persistent");
-            omsMsg.putUserProperties(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
+            omsMsg.putUserProperties(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP,
+                String.valueOf(System.currentTimeMillis()));
             omsMsg.putUserProperties(Constants.RMB_UNIQ_ID, sendMessageRequestBody.getUniqueId());
-//            omsMsg.putUserProperties("REPLY_TO", eventMeshProducer.getMqProducerWrapper().getMeshMQProducer().buildMQClientId());
 
             if (messageLogger.isDebugEnabled()) {
-                messageLogger.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", sendMessageRequestBody.getBizSeqNo(),
-                        sendMessageRequestBody.getTopic());
+                messageLogger.debug("msg2MQMsg suc, bizSeqNo={}, topic={}",
+                    sendMessageRequestBody.getBizSeqNo(),
+                    sendMessageRequestBody.getTopic());
             }
-            omsMsg.putUserProperties(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
+            omsMsg.putUserProperties(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP,
+                String.valueOf(System.currentTimeMillis()));
         } catch (Exception e) {
-            messageLogger.error("msg2MQMsg err, bizSeqNo={}, topic={}", sendMessageRequestBody.getBizSeqNo(),
+            messageLogger
+                .error("msg2MQMsg err, bizSeqNo={}, topic={}", sendMessageRequestBody.getBizSeqNo(),
                     sendMessageRequestBody.getTopic(), e);
             responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
-                    sendMessageResponseHeader,
-                    SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
+                sendMessageResponseHeader,
+                SendMessageResponseBody
+                    .buildBody(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getRetCode(),
+                        EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg()
+                            + EventMeshUtil.stackTrace(e, 2)));
             asyncContext.onComplete(responseEventMeshCommand);
             return;
         }
 
-        final SendMessageContext sendMessageContext = new SendMessageContext(sendMessageRequestBody.getBizSeqNo(), omsMsg, eventMeshProducer, eventMeshHTTPServer);
+        final SendMessageContext sendMessageContext =
+            new SendMessageContext(sendMessageRequestBody.getBizSeqNo(), omsMsg, eventMeshProducer,
+                eventMeshHTTPServer);
         eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsg();
 
         long startTime = System.currentTimeMillis();
@@ -202,46 +234,58 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
                         httpLogger.debug("{}", httpCommand);
                     }
                     eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
-                    eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
+                    eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(
+                        System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
                 } catch (Exception ex) {
+                    // ignore
                 }
             }
         };
 
         LiteMessage liteMessage = new LiteMessage(sendMessageRequestBody.getBizSeqNo(),
-                sendMessageRequestBody.getUniqueId(), sendMessageRequestBody.getTopic(),
-                sendMessageRequestBody.getContent())
-                .setProp(sendMessageRequestBody.getExtFields());
+            sendMessageRequestBody.getUniqueId(), sendMessageRequestBody.getTopic(),
+            sendMessageRequestBody.getContent())
+            .setProp(sendMessageRequestBody.getExtFields());
 
         try {
             eventMeshProducer.request(sendMessageContext, new RRCallback() {
                 @Override
                 public void onSuccess(Message omsMsg) {
-                    omsMsg.getUserProperties().put(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP, omsMsg.getSystemProperties("BORN_TIMESTAMP"));
-                    omsMsg.getUserProperties().put(EventMeshConstants.STORE_TIMESTAMP, omsMsg.getSystemProperties("STORE_TIMESTAMP"));
-                    omsMsg.getUserProperties().put(EventMeshConstants.RSP_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
-                    messageLogger.info("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
-                            System.currentTimeMillis() - startTime,
-                            sendMessageRequestBody.getTopic(),
-                            sendMessageRequestBody.getBizSeqNo(),
-                            sendMessageRequestBody.getUniqueId());
+                    omsMsg.getUserProperties().put(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP,
+                        omsMsg.getSystemProperties("BORN_TIMESTAMP"));
+                    omsMsg.getUserProperties().put(EventMeshConstants.STORE_TIMESTAMP,
+                        omsMsg.getSystemProperties("STORE_TIMESTAMP"));
+                    omsMsg.getUserProperties().put(EventMeshConstants.RSP_MQ2EVENTMESH_TIMESTAMP,
+                        String.valueOf(System.currentTimeMillis()));
+                    messageLogger.info(
+                        "message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
+                            + "|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime,
+                        sendMessageRequestBody.getTopic(),
+                        sendMessageRequestBody.getBizSeqNo(),
+                        sendMessageRequestBody.getUniqueId());
 
                     try {
-                        final String rtnMsg = new String(omsMsg.getBody(), EventMeshConstants.DEFAULT_CHARSET);
-                        omsMsg.getUserProperties().put(EventMeshConstants.RSP_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
+                        final String rtnMsg =
+                            new String(omsMsg.getBody(), EventMeshConstants.DEFAULT_CHARSET);
+                        omsMsg.getUserProperties().put(EventMeshConstants.RSP_EVENTMESH2C_TIMESTAMP,
+                            String.valueOf(System.currentTimeMillis()));
                         HttpCommand succ = asyncContext.getRequest().createHttpCommandResponse(
-                                sendMessageResponseHeader,
-                                SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(),
-                                        JSON.toJSONString(new SendMessageResponseBody.ReplyMessage(omsMsg.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION), rtnMsg,
-                                                OMSUtil.combineProp(omsMsg.getSystemProperties(),
-                                                        omsMsg.getUserProperties()))
-                                        )));
+                            sendMessageResponseHeader,
+                            SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(),
+                                JsonUtils.serialize(new SendMessageResponseBody.ReplyMessage(omsMsg
+                                    .getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION),
+                                    rtnMsg,
+                                    OMSUtil.combineProp(omsMsg.getSystemProperties(),
+                                        omsMsg.getUserProperties()))
+                                )));
                         asyncContext.onComplete(succ, handler);
                     } catch (Exception ex) {
                         HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
-                                sendMessageResponseHeader,
-                                SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
-                                        EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(ex, 2)));
+                            sendMessageResponseHeader,
+                            SendMessageResponseBody.buildBody(
+                                EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
+                                EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg()
+                                    + EventMeshUtil.stackTrace(ex, 2)));
                         asyncContext.onComplete(err, handler);
                         messageLogger.warn("message|mq2eventMesh|RSP", ex);
                     }
@@ -250,35 +294,41 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
                 @Override
                 public void onException(Throwable e) {
                     HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
-                            sendMessageResponseHeader,
-                            SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
-                                    EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
+                        sendMessageResponseHeader,
+                        SendMessageResponseBody
+                            .buildBody(EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
+                                EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg()
+                                    + EventMeshUtil.stackTrace(e, 2)));
                     asyncContext.onComplete(err, handler);
 
                     eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
-                    messageLogger.error("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
-                            System.currentTimeMillis() - startTime,
-                            sendMessageRequestBody.getTopic(),
-                            sendMessageRequestBody.getBizSeqNo(),
-                            sendMessageRequestBody.getUniqueId(), e);
+                    messageLogger.error(
+                        "message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
+                            + "|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime,
+                        sendMessageRequestBody.getTopic(),
+                        sendMessageRequestBody.getBizSeqNo(),
+                        sendMessageRequestBody.getUniqueId(), e);
                 }
             }, Integer.valueOf(ttl));
         } catch (Exception ex) {
             HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
-                    sendMessageResponseHeader,
-                    SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getRetCode(),
-                            EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(ex, 2)));
+                sendMessageResponseHeader,
+                SendMessageResponseBody
+                    .buildBody(EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getRetCode(),
+                        EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg()
+                            + EventMeshUtil.stackTrace(ex, 2)));
             asyncContext.onComplete(err);
 
             eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
             long endTime = System.currentTimeMillis();
             eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
             eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime);
-            messageLogger.error("message|eventMesh2mq|REQ|SYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
-                    endTime - startTime,
-                    sendMessageRequestBody.getTopic(),
-                    sendMessageRequestBody.getBizSeqNo(),
-                    sendMessageRequestBody.getUniqueId(), ex);
+            messageLogger.error(
+                "message|eventMesh2mq|REQ|SYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
+                endTime - startTime,
+                sendMessageRequestBody.getTopic(),
+                sendMessageRequestBody.getBizSeqNo(),
+                sendMessageRequestBody.getUniqueId(), ex);
         }
 
         return;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
index 14e185e..4be4fb2 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
@@ -17,17 +17,6 @@
 
 package org.apache.eventmesh.runtime.core.protocol.http.processor;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import com.alibaba.fastjson.JSONObject;
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.eventmesh.common.IPUtil;
 import org.apache.eventmesh.common.command.HttpCommand;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
@@ -38,6 +27,7 @@ import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
 import org.apache.eventmesh.common.protocol.http.common.RequestCode;
 import org.apache.eventmesh.common.protocol.http.header.client.SubscribeRequestHeader;
 import org.apache.eventmesh.common.protocol.http.header.client.SubscribeResponseHeader;
+import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
@@ -49,9 +39,23 @@ import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
 import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.channel.ChannelHandlerContext;
+
 public class SubscribeProcessor implements HttpRequestProcessor {
 
     public Logger httpLogger = LoggerFactory.getLogger("http");
@@ -65,63 +69,76 @@ public class SubscribeProcessor implements HttpRequestProcessor {
     }
 
     @Override
-    public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext) throws Exception {
+    public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext)
+        throws Exception {
         HttpCommand responseEventMeshCommand;
-        httpLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
-                EventMeshConstants.PROTOCOL_HTTP,
-                RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
-        SubscribeRequestHeader subscribeRequestHeader = (SubscribeRequestHeader) asyncContext.getRequest().getHeader();
-        SubscribeRequestBody subscribeRequestBody = (SubscribeRequestBody) asyncContext.getRequest().getBody();
+        httpLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}",
+            RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
+            EventMeshConstants.PROTOCOL_HTTP,
+            RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
+        SubscribeRequestHeader subscribeRequestHeader =
+            (SubscribeRequestHeader) asyncContext.getRequest().getHeader();
+        SubscribeRequestBody subscribeRequestBody =
+            (SubscribeRequestBody) asyncContext.getRequest().getBody();
 
         SubscribeResponseHeader subscribeResponseHeader =
-                SubscribeResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
-                        IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
-                        eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
-
+            SubscribeResponseHeader
+                .buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()),
+                    eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
+                    IPUtil.getLocalAddress(),
+                    eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
+                    eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
 
         //validate header
         if (StringUtils.isBlank(subscribeRequestHeader.getIdc())
-                || StringUtils.isBlank(subscribeRequestHeader.getPid())
-                || !StringUtils.isNumeric(subscribeRequestHeader.getPid())
-                || StringUtils.isBlank(subscribeRequestHeader.getSys())) {
+            || StringUtils.isBlank(subscribeRequestHeader.getPid())
+            || !StringUtils.isNumeric(subscribeRequestHeader.getPid())
+            || StringUtils.isBlank(subscribeRequestHeader.getSys())) {
             responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
-                    subscribeResponseHeader,
-                    SubscribeResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
+                subscribeResponseHeader,
+                SubscribeResponseBody
+                    .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
+                        EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
             asyncContext.onComplete(responseEventMeshCommand);
             return;
         }
 
         //validate body
         if (StringUtils.isBlank(subscribeRequestBody.getUrl())
-                || CollectionUtils.isEmpty(subscribeRequestBody.getTopics())
-                || StringUtils.isBlank(subscribeRequestBody.getConsumerGroup())) {
+            || CollectionUtils.isEmpty(subscribeRequestBody.getTopics())
+            || StringUtils.isBlank(subscribeRequestBody.getConsumerGroup())) {
 
             responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
-                    subscribeResponseHeader,
-                    SubscribeResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
+                subscribeResponseHeader,
+                SubscribeResponseBody
+                    .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+                        EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
             asyncContext.onComplete(responseEventMeshCommand);
             return;
         }
         List<SubscriptionItem> subTopicList = subscribeRequestBody.getTopics();
 
         //do acl check
-        if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+        if (eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
             String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
             String user = subscribeRequestHeader.getUsername();
             String pass = subscribeRequestHeader.getPasswd();
             String subsystem = subscribeRequestHeader.getSys();
             int requestCode = Integer.valueOf(subscribeRequestHeader.getCode());
-            for(SubscriptionItem item : subTopicList) {
+            for (SubscriptionItem item : subTopicList) {
                 try {
-                    Acl.doAclCheckInHttpReceive(remoteAddr, user, pass, subsystem, item.getTopic(), requestCode);
+                    Acl.doAclCheckInHttpReceive(remoteAddr, user, pass, subsystem, item.getTopic(),
+                        requestCode);
                 } catch (Exception e) {
-                    //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
 
                     responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
-                            subscribeResponseHeader,
-                            SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+                        subscribeResponseHeader,
+                        SendMessageResponseBody
+                            .buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(),
+                                e.getMessage()));
                     asyncContext.onComplete(responseEventMeshCommand);
-                    aclLogger.warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
+                    aclLogger
+                        .warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
                     return;
                 }
             }
@@ -135,7 +152,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
             registerClient(subscribeRequestHeader, consumerGroup, subTopicList, url);
 
             for (SubscriptionItem subTopic : subTopicList) {
-                List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping.get(consumerGroup + "@" + subTopic.getTopic());
+                List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping
+                    .get(consumerGroup + "@" + subTopic.getTopic());
 
                 if (CollectionUtils.isEmpty(groupTopicClients)) {
                     httpLogger.error("group {} topic {} clients is empty", consumerGroup, subTopic);
@@ -151,7 +169,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
                         idcUrls.put(client.idc, urls);
                     }
                 }
-                ConsumerGroupConf consumerGroupConf = eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
+                ConsumerGroupConf consumerGroupConf =
+                    eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
                 if (consumerGroupConf == null) {
                     // new subscription
                     consumerGroupConf = new ConsumerGroupConf(consumerGroup);
@@ -168,7 +187,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
                     consumerGroupConf.setConsumerGroupTopicConf(map);
                 } else {
                     // already subscribed
-                    Map<String, ConsumerGroupTopicConf> map = consumerGroupConf.getConsumerGroupTopicConf();
+                    Map<String, ConsumerGroupTopicConf> map =
+                        consumerGroupConf.getConsumerGroupTopicConf();
                     for (String key : map.keySet()) {
                         if (StringUtils.equals(subTopic.getTopic(), key)) {
                             ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
@@ -191,8 +211,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
             long startTime = System.currentTimeMillis();
             try {
                 // subscription relationship change notification
-                eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup),
-                        eventMeshHTTPServer.localConsumerGroupMapping);
+                eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
+                    eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
 
                 final CompleteHandler<HttpCommand> handler = new CompleteHandler<HttpCommand>() {
                     @Override
@@ -202,26 +222,32 @@ public class SubscribeProcessor implements HttpRequestProcessor {
                                 httpLogger.debug("{}", httpCommand);
                             }
                             eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
-                            eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
+                            eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(
+                                System.currentTimeMillis()
+                                    - asyncContext.getRequest().getReqTime());
                         } catch (Exception ex) {
+                            // ignore
                         }
                     }
                 };
 
                 responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
-                        EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg());
+                    EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg());
                 asyncContext.onComplete(responseEventMeshCommand, handler);
             } catch (Exception e) {
                 HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
-                        subscribeResponseHeader,
-                        SubscribeResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getRetCode(),
-                                EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
+                    subscribeResponseHeader,
+                    SubscribeResponseBody
+                        .buildBody(EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getRetCode(),
+                            EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getErrMsg()
+                                + EventMeshUtil.stackTrace(e, 2)));
                 asyncContext.onComplete(err);
                 long endTime = System.currentTimeMillis();
-                httpLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
-                        endTime - startTime,
-                        JSONObject.toJSONString(subscribeRequestBody.getTopics()),
-                        subscribeRequestBody.getUrl(), e);
+                httpLogger.error(
+                    "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}"
+                        + "|bizSeqNo={}|uniqueId={}", endTime - startTime,
+                    JsonUtils.serialize(subscribeRequestBody.getTopics()),
+                    subscribeRequestBody.getUrl(), e);
                 eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
                 eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime);
             }
@@ -234,8 +260,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
     }
 
     private void registerClient(SubscribeRequestHeader subscribeRequestHeader, String consumerGroup,
-                                      List<SubscriptionItem> subscriptionItems, String url) {
-        for(SubscriptionItem item: subscriptionItems) {
+                                List<SubscriptionItem> subscriptionItems, String url) {
+        for (SubscriptionItem item : subscriptionItems) {
             Client client = new Client();
             client.env = subscribeRequestHeader.getEnv();
             client.idc = subscribeRequestHeader.getIdc();
@@ -250,7 +276,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
             String groupTopicKey = client.consumerGroup + "@" + client.topic;
 
             if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
-                List<Client> localClients = eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
+                List<Client> localClients =
+                    eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
                 boolean isContains = false;
                 for (Client localClient : localClients) {
                     if (StringUtils.equals(localClient.url, client.url)) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
index e5feb76..df3921d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
@@ -17,18 +17,6 @@
 
 package org.apache.eventmesh.runtime.core.protocol.http.processor;
 
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import com.alibaba.fastjson.JSONObject;
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.eventmesh.common.IPUtil;
 import org.apache.eventmesh.common.command.HttpCommand;
 import org.apache.eventmesh.common.protocol.http.body.client.UnSubscribeRequestBody;
@@ -37,6 +25,7 @@ import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
 import org.apache.eventmesh.common.protocol.http.common.RequestCode;
 import org.apache.eventmesh.common.protocol.http.header.client.UnSubscribeRequestHeader;
 import org.apache.eventmesh.common.protocol.http.header.client.UnSubscribeResponseHeader;
+import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
@@ -47,9 +36,24 @@ import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
 import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.channel.ChannelHandlerContext;
+
 public class UnSubscribeProcessor implements HttpRequestProcessor {
 
     public Logger httpLogger = LoggerFactory.getLogger("http");
@@ -61,40 +65,50 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
     }
 
     @Override
-    public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext) throws Exception {
+    public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext)
+        throws Exception {
         HttpCommand responseEventMeshCommand;
-        httpLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
-                EventMeshConstants.PROTOCOL_HTTP,
-                RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
-        UnSubscribeRequestHeader unSubscribeRequestHeader = (UnSubscribeRequestHeader) asyncContext.getRequest().getHeader();
-        UnSubscribeRequestBody unSubscribeRequestBody = (UnSubscribeRequestBody) asyncContext.getRequest().getBody();
+        httpLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}",
+            RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
+            EventMeshConstants.PROTOCOL_HTTP,
+            RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
+        UnSubscribeRequestHeader unSubscribeRequestHeader =
+            (UnSubscribeRequestHeader) asyncContext.getRequest().getHeader();
+        UnSubscribeRequestBody unSubscribeRequestBody =
+            (UnSubscribeRequestBody) asyncContext.getRequest().getBody();
 
         UnSubscribeResponseHeader unSubscribeResponseHeader =
-                UnSubscribeResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
-                        IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
-                        eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
-
+            UnSubscribeResponseHeader
+                .buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()),
+                    eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
+                    IPUtil.getLocalAddress(),
+                    eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
+                    eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
 
         //validate header
         if (StringUtils.isBlank(unSubscribeRequestHeader.getIdc())
-                || StringUtils.isBlank(unSubscribeRequestHeader.getPid())
-                || !StringUtils.isNumeric(unSubscribeRequestHeader.getPid())
-                || StringUtils.isBlank(unSubscribeRequestHeader.getSys())) {
+            || StringUtils.isBlank(unSubscribeRequestHeader.getPid())
+            || !StringUtils.isNumeric(unSubscribeRequestHeader.getPid())
+            || StringUtils.isBlank(unSubscribeRequestHeader.getSys())) {
             responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
-                    unSubscribeResponseHeader,
-                    UnSubscribeResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
+                unSubscribeResponseHeader,
+                UnSubscribeResponseBody
+                    .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
+                        EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
             asyncContext.onComplete(responseEventMeshCommand);
             return;
         }
 
         //validate body
         if (StringUtils.isBlank(unSubscribeRequestBody.getUrl())
-                || CollectionUtils.isEmpty(unSubscribeRequestBody.getTopics())
-                || StringUtils.isBlank(unSubscribeRequestBody.getConsumerGroup())) {
+            || CollectionUtils.isEmpty(unSubscribeRequestBody.getTopics())
+            || StringUtils.isBlank(unSubscribeRequestBody.getConsumerGroup())) {
 
             responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
-                    unSubscribeResponseHeader,
-                    UnSubscribeResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
+                unSubscribeResponseHeader,
+                UnSubscribeResponseBody
+                    .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+                        EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
             asyncContext.onComplete(responseEventMeshCommand);
             return;
         }
@@ -115,8 +129,10 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
                         httpLogger.debug("{}", httpCommand);
                     }
                     eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
-                    eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
+                    eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(
+                        System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
                 } catch (Exception ex) {
+                    // ignore
                 }
             }
         };
@@ -127,12 +143,14 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
             registerClient(unSubscribeRequestHeader, consumerGroup, unSubTopicList, unSubscribeUrl);
 
             for (String unSubTopic : unSubTopicList) {
-                List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping.get(consumerGroup + "@" + unSubTopic);
+                List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping
+                    .get(consumerGroup + "@" + unSubTopic);
                 Iterator<Client> clientIterator = groupTopicClients.iterator();
                 while (clientIterator.hasNext()) {
                     Client client = clientIterator.next();
-                    if (StringUtils.equals(client.pid, pid) && StringUtils.equals(client.url, unSubscribeUrl)) {
-                        httpLogger.warn("client {} start unsubscribe", JSONObject.toJSONString(client));
+                    if (StringUtils.equals(client.pid, pid)
+                        && StringUtils.equals(client.url, unSubscribeUrl)) {
+                        httpLogger.warn("client {} start unsubscribe", JsonUtils.serialize(client));
                         clientIterator.remove();
                     }
                 }
@@ -145,7 +163,8 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
                         if (!StringUtils.equals(unSubscribeUrl, client.url)) {
                             clientUrls.add(client.url);
                             if (idcUrls.containsKey(client.idc)) {
-                                idcUrls.get(client.idc).add(StringUtils.deleteWhitespace(client.url));
+                                idcUrls.get(client.idc)
+                                    .add(StringUtils.deleteWhitespace(client.url));
                             } else {
                                 List<String> urls = new ArrayList<>();
                                 urls.add(client.url);
@@ -155,15 +174,19 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
 
                     }
                     synchronized (eventMeshHTTPServer.localConsumerGroupMapping) {
-                        ConsumerGroupConf consumerGroupConf = eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
-                        Map<String, ConsumerGroupTopicConf> map = consumerGroupConf.getConsumerGroupTopicConf();
+                        ConsumerGroupConf consumerGroupConf =
+                            eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
+                        Map<String, ConsumerGroupTopicConf> map =
+                            consumerGroupConf.getConsumerGroupTopicConf();
                         for (String topicKey : map.keySet()) {
                             // only modify the topic to subscribe
                             if (StringUtils.equals(unSubTopic, topicKey)) {
-                                ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
+                                ConsumerGroupTopicConf latestTopicConf =
+                                    new ConsumerGroupTopicConf();
                                 latestTopicConf.setConsumerGroup(consumerGroup);
                                 latestTopicConf.setTopic(unSubTopic);
-                                latestTopicConf.setSubscriptionItem(map.get(topicKey).getSubscriptionItem());
+                                latestTopicConf
+                                    .setSubscriptionItem(map.get(topicKey).getSubscriptionItem());
                                 latestTopicConf.setUrls(clientUrls);
 
                                 latestTopicConf.setIdcUrls(idcUrls);
@@ -171,7 +194,8 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
                                 map.put(unSubTopic, latestTopicConf);
                             }
                         }
-                        eventMeshHTTPServer.localConsumerGroupMapping.put(consumerGroup, consumerGroupConf);
+                        eventMeshHTTPServer.localConsumerGroupMapping
+                            .put(consumerGroup, consumerGroupConf);
                     }
                 } else {
                     isChange = false;
@@ -181,51 +205,64 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
             long startTime = System.currentTimeMillis();
             if (isChange) {
                 try {
-                    eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup),
-                            eventMeshHTTPServer.localConsumerGroupMapping);
+                    eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
+                        eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
 
                     responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
-                            EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg());
+                        EventMeshRetCode.SUCCESS.getRetCode(),
+                        EventMeshRetCode.SUCCESS.getErrMsg());
                     asyncContext.onComplete(responseEventMeshCommand, handler);
 
                 } catch (Exception e) {
                     HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
-                            unSubscribeResponseHeader,
-                            UnSubscribeResponseBody.buildBody(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getRetCode(),
-                                    EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
+                        unSubscribeResponseHeader,
+                        UnSubscribeResponseBody
+                            .buildBody(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getRetCode(),
+                                EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getErrMsg()
+                                    + EventMeshUtil.stackTrace(e, 2)));
                     asyncContext.onComplete(err);
                     long endTime = System.currentTimeMillis();
-                    httpLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
-                            endTime - startTime,
-                            JSONObject.toJSONString(unSubscribeRequestBody.getTopics()),
-                            unSubscribeRequestBody.getUrl(), e);
+                    httpLogger.error(
+                        "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms"
+                            + "|topic={}|bizSeqNo={}|uniqueId={}", endTime - startTime,
+                        JsonUtils.serialize(unSubscribeRequestBody.getTopics()),
+                        unSubscribeRequestBody.getUrl(), e);
                     eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
-                    eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime);
+                    eventMeshHTTPServer.metrics.summaryMetrics
+                        .recordSendMsgCost(endTime - startTime);
                 }
             } else {
                 //remove
                 try {
-                    eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, null, eventMeshHTTPServer.localConsumerGroupMapping);
+                    eventMeshHTTPServer.getConsumerManager()
+                        .notifyConsumerManager(consumerGroup, null);
                     responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
-                            EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg());
+                        EventMeshRetCode.SUCCESS.getRetCode(),
+                        EventMeshRetCode.SUCCESS.getErrMsg());
                     asyncContext.onComplete(responseEventMeshCommand, handler);
                     // clean ClientInfo
-                    eventMeshHTTPServer.localClientInfoMapping.keySet().removeIf(s -> StringUtils.contains(s, consumerGroup));
+                    eventMeshHTTPServer.localClientInfoMapping.keySet()
+                        .removeIf(s -> StringUtils.contains(s, consumerGroup));
                     // clean ConsumerGroupInfo
-                    eventMeshHTTPServer.localConsumerGroupMapping.keySet().removeIf(s -> StringUtils.equals(consumerGroup, s));
+                    eventMeshHTTPServer.localConsumerGroupMapping.keySet()
+                        .removeIf(s -> StringUtils.equals(consumerGroup, s));
                 } catch (Exception e) {
                     HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
-                            unSubscribeResponseHeader,
-                            UnSubscribeResponseBody.buildBody(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getRetCode(),
-                                    EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
+                        unSubscribeResponseHeader,
+                        UnSubscribeResponseBody
+                            .buildBody(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getRetCode(),
+                                EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getErrMsg()
+                                    + EventMeshUtil.stackTrace(e, 2)));
                     asyncContext.onComplete(err);
                     long endTime = System.currentTimeMillis();
-                    httpLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
-                            endTime - startTime,
-                            JSONObject.toJSONString(unSubscribeRequestBody.getTopics()),
-                            unSubscribeRequestBody.getUrl(), e);
+                    httpLogger.error(
+                        "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms"
+                            + "|topic={}|bizSeqNo={}|uniqueId={}", endTime - startTime,
+                        JsonUtils.serialize(unSubscribeRequestBody.getTopics()),
+                        unSubscribeRequestBody.getUrl(), e);
                     eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
-                    eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime);
+                    eventMeshHTTPServer.metrics.summaryMetrics
+                        .recordSendMsgCost(endTime - startTime);
                 }
             }
         }
@@ -236,9 +273,10 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
         return false;
     }
 
-    private void registerClient(UnSubscribeRequestHeader unSubscribeRequestHeader, String consumerGroup,
-                                        List<String> topicList, String url) {
-        for(String topic: topicList) {
+    private void registerClient(UnSubscribeRequestHeader unSubscribeRequestHeader,
+                                String consumerGroup,
+                                List<String> topicList, String url) {
+        for (String topic : topicList) {
             Client client = new Client();
             client.env = unSubscribeRequestHeader.getEnv();
             client.idc = unSubscribeRequestHeader.getIdc();
@@ -252,7 +290,8 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
 
             String groupTopicKey = client.consumerGroup + "@" + client.topic;
             if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
-                List<Client> localClients = eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
+                List<Client> localClients =
+                    eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
                 boolean isContains = false;
                 for (Client localClient : localClients) {
                     if (StringUtils.equals(localClient.url, client.url)) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java
index 1b834d9..58711b1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java
@@ -20,10 +20,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor.inf;
 
 import java.util.Date;
 
-import com.alibaba.fastjson.JSONObject;
-
-import org.apache.commons.lang3.StringUtils;
-
 public class Client {
 
     public String env;
@@ -48,44 +44,20 @@ public class Client {
 
     public Date lastUpTime;
 
-    public static Client buildClientFromJSONObject(JSONObject jsonObject) {
-        if (jsonObject == null) {
-            return null;
-        }
-
-        Client client = null;
-        try {
-            client = new Client();
-
-            client.env = StringUtils.trim(jsonObject.getString("env"));
-            client.consumerGroup = StringUtils.trim(jsonObject.getString("groupName"));
-            client.topic = StringUtils.trim(jsonObject.getString("topic"));
-            client.url = StringUtils.trim(jsonObject.getString("url"));
-            client.sys = StringUtils.trim(jsonObject.getString("sys"));
-            client.idc = StringUtils.trim(jsonObject.getString("idc"));
-            client.ip = StringUtils.trim(jsonObject.getString("ip"));
-            client.pid = StringUtils.trim(jsonObject.getString("pid"));
-            client.hostname = StringUtils.trim(jsonObject.getString("hostname"));
-            client.apiVersion = StringUtils.trim(jsonObject.getString("apiversion"));
-        } catch (Exception ex) {
-        }
-        return client;
-    }
-
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("endPoint={env=").append(env)
-                .append(",idc=").append(idc)
-                .append(",consumerGroup=").append(consumerGroup)
-                .append(",topic=").append(topic)
-                .append(",url=").append(url)
-                .append(",sys=").append(sys)
-                .append(",ip=").append(ip)
-                .append(",pid=").append(pid)
-                .append(",hostname=").append(hostname)
-                .append(",apiVersion=").append(apiVersion)
-                .append(",registerTime=").append("}");
+            .append(",idc=").append(idc)
+            .append(",consumerGroup=").append(consumerGroup)
+            .append(",topic=").append(topic)
+            .append(",url=").append(url)
+            .append(",sys=").append(sys)
+            .append(",ip=").append(ip)
+            .append(",pid=").append(pid)
+            .append(",hostname=").append(hostname)
+            .append(",apiVersion=").append(apiVersion)
+            .append(",registerTime=").append("}");
         return sb.toString();
     }
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index f50c3ee..e0e0349 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -17,33 +17,23 @@
 
 package org.apache.eventmesh.runtime.core.protocol.http.push;
 
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONException;
-import com.alibaba.fastjson.JSONObject;
-import com.google.common.collect.Sets;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.DateFormatUtils;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.IPUtil;
 import org.apache.eventmesh.common.RandomStringUtil;
+import org.apache.eventmesh.common.exception.JsonException;
 import org.apache.eventmesh.common.protocol.SubscriptionType;
 import org.apache.eventmesh.common.protocol.http.body.message.PushMessageRequestBody;
 import org.apache.eventmesh.common.protocol.http.common.ClientRetCode;
 import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
 import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
 import org.apache.eventmesh.common.protocol.http.common.RequestCode;
+import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext;
 import org.apache.eventmesh.runtime.util.OMSUtil;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.DateFormatUtils;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
 import org.apache.http.NameValuePair;
@@ -52,9 +42,21 @@ import org.apache.http.client.entity.UrlEncodedFormEntity;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.message.BasicNameValuePair;
 import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Sets;
+
 public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
 
     public Logger messageLogger = LoggerFactory.getLogger("message");
@@ -67,7 +69,8 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
 
     public String currPushUrl;
 
-    public AsyncHTTPPushRequest(HandleMsgContext handleMsgContext, Map<String, Set<AbstractHTTPPushRequest>> waitingRequests) {
+    public AsyncHTTPPushRequest(HandleMsgContext handleMsgContext,
+                                Map<String, Set<AbstractHTTPPushRequest>> waitingRequests) {
         super(handleMsgContext);
         this.waitingRequests = waitingRequests;
     }
@@ -94,37 +97,50 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
         builder.addHeader(ProtocolKey.REQUEST_CODE, requestCode);
         builder.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA);
         builder.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion());
-        builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER, handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshCluster);
+        builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
+            handleMsgContext.getEventMeshHTTPServer()
+                .getEventMeshHttpConfiguration().eventMeshCluster);
         builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, IPUtil.getLocalAddress());
-        builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshEnv);
-        builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshIDC);
+        builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
+            handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshEnv);
+        builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
+            handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshIDC);
 
-        handleMsgContext.getMsg().getUserProperties().put(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
+        handleMsgContext.getMsg().getUserProperties()
+            .put(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
+                String.valueOf(System.currentTimeMillis()));
 
         String content = "";
         try {
-            content = new String(handleMsgContext.getMsg().getBody(), EventMeshConstants.DEFAULT_CHARSET);
+            content =
+                new String(handleMsgContext.getMsg().getBody(), EventMeshConstants.DEFAULT_CHARSET);
         } catch (Exception ex) {
             return;
         }
 
-        List<NameValuePair> body = new ArrayList<NameValuePair>();
+        List<NameValuePair> body = new ArrayList<>();
         body.add(new BasicNameValuePair(PushMessageRequestBody.CONTENT, content));
         if (StringUtils.isBlank(handleMsgContext.getBizSeqNo())) {
-            body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO, RandomStringUtil.generateNum(20)));
+            body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
+                RandomStringUtil.generateNum(20)));
         } else {
-            body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO, handleMsgContext.getBizSeqNo()));
+            body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
+                handleMsgContext.getBizSeqNo()));
         }
         if (StringUtils.isBlank(handleMsgContext.getUniqueId())) {
-            body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID, RandomStringUtil.generateNum(20)));
+            body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
+                RandomStringUtil.generateNum(20)));
         } else {
-            body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID, handleMsgContext.getUniqueId()));
+            body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
+                handleMsgContext.getUniqueId()));
         }
 
-        body.add(new BasicNameValuePair(PushMessageRequestBody.RANDOMNO, handleMsgContext.getMsgRandomNo()));
+        body.add(new BasicNameValuePair(PushMessageRequestBody.RANDOMNO,
+            handleMsgContext.getMsgRandomNo()));
         body.add(new BasicNameValuePair(PushMessageRequestBody.TOPIC, handleMsgContext.getTopic()));
 
-        body.add(new BasicNameValuePair(PushMessageRequestBody.EXTFIELDS, JSON.toJSONString(OMSUtil.getMessageProp(handleMsgContext.getMsg()))));
+        body.add(new BasicNameValuePair(PushMessageRequestBody.EXTFIELDS,
+            JsonUtils.serialize(OMSUtil.getMessageProp(handleMsgContext.getMsg()))));
 
         try {
             builder.setEntity(new UrlEncodedFormEntity(body));
@@ -139,7 +155,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
         addToWaitingMap(this);
 
         cmdLogger.info("cmd={}|eventMesh2client|from={}|to={}", requestCode,
-                IPUtil.getLocalAddress(), currPushUrl);
+            IPUtil.getLocalAddress(), currPushUrl);
 
         try {
             httpClientPool.getClient().execute(builder, new ResponseHandler<Object>() {
@@ -150,8 +166,10 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
                     eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPPushTimeCost(cost);
                     if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                         eventMeshHTTPServer.metrics.summaryMetrics.recordHttpPushMsgFailed();
-                        messageLogger.info("message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}|uniqueId={}|cost={}", currPushUrl, handleMsgContext.getTopic(),
-                                handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
+                        messageLogger.info(
+                            "message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}"
+                                + "|uniqueId={}|cost={}", currPushUrl, handleMsgContext.getTopic(),
+                            handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
 
                         delayRetry();
                         if (isComplete()) {
@@ -160,14 +178,18 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
                     } else {
                         String res = "";
                         try {
-                            res = EntityUtils.toString(response.getEntity(), Charset.forName(EventMeshConstants.DEFAULT_CHARSET));
+                            res = EntityUtils.toString(response.getEntity(),
+                                Charset.forName(EventMeshConstants.DEFAULT_CHARSET));
                         } catch (IOException e) {
                             handleMsgContext.finish();
                             return new Object();
                         }
                         ClientRetCode result = processResponseContent(res);
-                        messageLogger.info("message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}|uniqueId={}|cost={}", result, currPushUrl, handleMsgContext.getTopic(),
-                                handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
+                        messageLogger.info(
+                            "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}"
+                                + "|uniqueId={}|cost={}",
+                            result, currPushUrl, handleMsgContext.getTopic(),
+                            handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
                         if (result == ClientRetCode.OK) {
                             complete();
                             if (isComplete()) {
@@ -195,10 +217,13 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
             });
 
             if (messageLogger.isDebugEnabled()) {
-                messageLogger.debug("message|eventMesh2client|url={}|topic={}|msg={}", currPushUrl, handleMsgContext.getTopic(),
-                        handleMsgContext.getMsg());
+                messageLogger.debug("message|eventMesh2client|url={}|topic={}|msg={}", currPushUrl,
+                    handleMsgContext.getTopic(),
+                    handleMsgContext.getMsg());
             } else {
-                messageLogger.info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}", currPushUrl, handleMsgContext.getTopic(),
+                messageLogger
+                    .info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
+                        currPushUrl, handleMsgContext.getTopic(),
                         handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId());
             }
         } catch (IOException e) {
@@ -215,13 +240,16 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("asyncPushRequest={")
-                .append("bizSeqNo=").append(handleMsgContext.getBizSeqNo())
-                .append(",startIdx=").append(startIdx)
-                .append(",retryTimes=").append(retryTimes)
-                .append(",uniqueId=").append(handleMsgContext.getUniqueId())
-                .append(",executeTime=").append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT))
-                .append(",lastPushTime=").append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT))
-                .append(",createTime=").append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}");
+            .append("bizSeqNo=").append(handleMsgContext.getBizSeqNo())
+            .append(",startIdx=").append(startIdx)
+            .append(",retryTimes=").append(retryTimes)
+            .append(",uniqueId=").append(handleMsgContext.getUniqueId())
+            .append(",executeTime=")
+            .append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT))
+            .append(",lastPushTime=")
+            .append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT))
+            .append(",createTime=")
+            .append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}");
         return sb.toString();
     }
 
@@ -231,21 +259,26 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
         }
 
         try {
-            JSONObject ret = JSONObject.parseObject(content);
-            Integer retCode = ret.getInteger("retCode");
+            Map<String, Object> ret =
+                JsonUtils.deserialize(content, new TypeReference<Map<String, Object>>() {
+                });
+            Integer retCode = (Integer) ret.get("retCode");
             if (retCode != null && ClientRetCode.contains(retCode)) {
                 return ClientRetCode.get(retCode);
             }
 
             return ClientRetCode.FAIL;
         } catch (NumberFormatException e) {
-            messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl, handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+            messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
+                handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
             return ClientRetCode.FAIL;
-        } catch (JSONException e) {
-            messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{},  httpResponse:{}", currPushUrl, handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+        } catch (JsonException e) {
+            messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{},  httpResponse:{}", currPushUrl,
+                handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
             return ClientRetCode.FAIL;
         } catch (Throwable t) {
-            messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{},  httpResponse:{}", currPushUrl, handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+            messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{},  httpResponse:{}", currPushUrl,
+                handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
             return ClientRetCode.FAIL;
         }
     }
@@ -255,7 +288,8 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
             waitingRequests.get(request.handleMsgContext.getConsumerGroup()).add(request);
             return;
         }
-        waitingRequests.put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
+        waitingRequests
+            .put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
         waitingRequests.get(request.handleMsgContext.getConsumerGroup()).add(request);
         return;
     }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 87f7b9b..02c2388 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -17,16 +17,13 @@
 
 package org.apache.eventmesh.runtime.core.protocol.tcp.client.group;
 
-import com.alibaba.fastjson.JSON;
-import io.openmessaging.api.*;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.eventmesh.api.EventMeshAction;
 import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
 import org.apache.eventmesh.api.RRCallback;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
 import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
@@ -40,15 +37,31 @@ import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.UpStre
 import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.HttpTinyClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.openmessaging.api.AsyncConsumeContext;
+import io.openmessaging.api.AsyncMessageListener;
+import io.openmessaging.api.Message;
+import io.openmessaging.api.OnExceptionContext;
+import io.openmessaging.api.SendCallback;
+import io.openmessaging.api.SendResult;
+
 public class ClientGroupWrapper {
 
     public static Logger logger = LoggerFactory.getLogger(ClientGroupWrapper.class);
@@ -88,7 +101,8 @@ public class ClientGroupWrapper {
 
     private MQConsumerWrapper broadCastMsgConsumer;
 
-    private ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping = new ConcurrentHashMap<String, Set<Session>>();
+    private ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping =
+        new ConcurrentHashMap<String, Set<Session>>();
 
     public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE);
 
@@ -105,9 +119,12 @@ public class ClientGroupWrapper {
         this.eventMeshTcpRetryer = eventMeshTCPServer.getEventMeshTcpRetryer();
         this.eventMeshTcpMonitor = eventMeshTCPServer.getEventMeshTcpMonitor();
         this.downstreamDispatchStrategy = downstreamDispatchStrategy;
-        this.persistentMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
-        this.broadCastMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
-        this.mqProducerWrapper = new MQProducerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+        this.persistentMsgConsumer = new MQConsumerWrapper(
+            eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+        this.broadCastMsgConsumer = new MQConsumerWrapper(
+            eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+        this.mqProducerWrapper = new MQProducerWrapper(
+            eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
     }
 
     public ConcurrentHashMap<String, Set<Session>> getTopic2sessionInGroupMapping() {
@@ -128,13 +145,14 @@ public class ClientGroupWrapper {
         return has;
     }
 
-    public boolean send(UpStreamMsgContext upStreamMsgContext, SendCallback sendCallback) throws Exception {
+    public boolean send(UpStreamMsgContext upStreamMsgContext, SendCallback sendCallback)
+        throws Exception {
         mqProducerWrapper.send(upStreamMsgContext.getMsg(), sendCallback);
         return true;
     }
 
     public void request(UpStreamMsgContext upStreamMsgContext, RRCallback rrCallback, long timeout)
-            throws Exception {
+        throws Exception {
         mqProducerWrapper.request(upStreamMsgContext.getMsg(), rrCallback, timeout);
     }
 
@@ -147,10 +165,12 @@ public class ClientGroupWrapper {
 
             @Override
             public void onException(OnExceptionContext context) {
-                String bizSeqNo = upStreamMsgContext.getMsg().getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_KEYS);
+                String bizSeqNo = upStreamMsgContext.getMsg()
+                    .getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_KEYS);
                 logger.error("reply err! topic:{}, bizSeqNo:{}, client:{}",
-                        upStreamMsgContext.getMsg().getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION), bizSeqNo,
-                        upStreamMsgContext.getSession().getClient(), context.getException());
+                    upStreamMsgContext.getMsg()
+                        .getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION), bizSeqNo,
+                    upStreamMsgContext.getSession().getClient(), context.getException());
             }
         });
         return true;
@@ -161,7 +181,8 @@ public class ClientGroupWrapper {
     }
 
     public boolean addSubscription(String topic, Session session) throws Exception {
-        if (session == null || !StringUtils.equalsIgnoreCase(consumerGroup, EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
+        if (session == null || !StringUtils.equalsIgnoreCase(consumerGroup,
+            EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
             logger.error("addSubscription param error,topic:{},session:{}", topic, session);
             return false;
         }
@@ -175,12 +196,16 @@ public class ClientGroupWrapper {
             }
             r = topic2sessionInGroupMapping.get(topic).add(session);
             if (r) {
-                logger.info("addSubscription success, group:{} topic:{} client:{}", consumerGroup, topic, session.getClient());
+                logger.info("addSubscription success, group:{} topic:{} client:{}", consumerGroup,
+                    topic, session.getClient());
             } else {
-                logger.warn("addSubscription fail, group:{} topic:{} client:{}", consumerGroup, topic, session.getClient());
+                logger
+                    .warn("addSubscription fail, group:{} topic:{} client:{}", consumerGroup, topic,
+                        session.getClient());
             }
         } catch (Exception e) {
-            logger.error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e);
+            logger
+                .error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e);
             throw new Exception("addSubscription fail");
         } finally {
             this.groupLock.writeLock().unlock();
@@ -190,7 +215,8 @@ public class ClientGroupWrapper {
 
     public boolean removeSubscription(String topic, Session session) {
         if (session == null
-                || !StringUtils.equalsIgnoreCase(consumerGroup, EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
+            || !StringUtils.equalsIgnoreCase(consumerGroup,
+            EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
             logger.error("removeSubscription param error,topic:{},session:{}", topic, session);
             return false;
         }
@@ -201,17 +227,23 @@ public class ClientGroupWrapper {
             if (topic2sessionInGroupMapping.containsKey(topic)) {
                 r = topic2sessionInGroupMapping.get(topic).remove(session);
                 if (r) {
-                    logger.info("removeSubscription remove session success, group:{} topic:{} client:{}", consumerGroup, topic, session.getClient());
+                    logger.info(
+                        "removeSubscription remove session success, group:{} topic:{} client:{}",
+                        consumerGroup, topic, session.getClient());
                 } else {
-                    logger.warn("removeSubscription remove session failed, group:{} topic:{} client:{}", consumerGroup, topic, session.getClient());
+                    logger.warn(
+                        "removeSubscription remove session failed, group:{} topic:{} client:{}",
+                        consumerGroup, topic, session.getClient());
                 }
             }
             if (CollectionUtils.size(topic2sessionInGroupMapping.get(topic)) == 0) {
                 topic2sessionInGroupMapping.remove(topic);
-                logger.info("removeSubscription remove topic success, group:{} topic:{}", consumerGroup, topic);
+                logger.info("removeSubscription remove topic success, group:{} topic:{}",
+                    consumerGroup, topic);
             }
         } catch (Exception e) {
-            logger.error("removeSubscription error! topic:{} client:{}", topic, session.getClient(), e);
+            logger.error("removeSubscription error! topic:{} client:{}", topic, session.getClient(),
+                e);
         } finally {
             this.groupLock.writeLock().unlock();
         }
@@ -224,9 +256,9 @@ public class ClientGroupWrapper {
         }
 
         Properties keyValue = new Properties();
-//        KeyValue keyValue = OMS.newKeyValue();
         keyValue.put("producerGroup", producerGroup);
-        keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "PUB", eventMeshTCPConfiguration.eventMeshCluster));
+        keyValue.put("instanceName", EventMeshUtil
+            .buildMeshTcpClientID(sysId, "PUB", eventMeshTCPConfiguration.eventMeshCluster));
 
         //TODO for defibus
         keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
@@ -264,7 +296,8 @@ public class ClientGroupWrapper {
 
     public boolean addGroupConsumerSession(Session session) {
         if (session == null
-                || !StringUtils.equalsIgnoreCase(consumerGroup, EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
+            || !StringUtils.equalsIgnoreCase(consumerGroup,
+            EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
             logger.error("addGroupConsumerSession param error,session:{}", session);
             return false;
         }
@@ -274,10 +307,12 @@ public class ClientGroupWrapper {
             this.groupLock.writeLock().lockInterruptibly();
             r = groupConsumerSessions.add(session);
             if (r) {
-                logger.info("addGroupConsumerSession success, group:{} client:{}", consumerGroup, session.getClient());
+                logger.info("addGroupConsumerSession success, group:{} client:{}", consumerGroup,
+                    session.getClient());
             }
         } catch (Exception e) {
-            logger.error("addGroupConsumerSession error! group:{} client:{}", consumerGroup, session.getClient(), e);
+            logger.error("addGroupConsumerSession error! group:{} client:{}", consumerGroup,
+                session.getClient(), e);
         } finally {
             this.groupLock.writeLock().unlock();
         }
@@ -286,7 +321,8 @@ public class ClientGroupWrapper {
 
     public boolean addGroupProducerSession(Session session) {
         if (session == null
-                || !StringUtils.equalsIgnoreCase(producerGroup, EventMeshUtil.buildClientGroup(session.getClient().getProducerGroup()))) {
+            || !StringUtils.equalsIgnoreCase(producerGroup,
+            EventMeshUtil.buildClientGroup(session.getClient().getProducerGroup()))) {
             logger.error("addGroupProducerSession param error,session:{}", session);
             return false;
         }
@@ -296,10 +332,12 @@ public class ClientGroupWrapper {
             this.groupLock.writeLock().lockInterruptibly();
             r = groupProducerSessions.add(session);
             if (r) {
-                logger.info("addGroupProducerSession success, group:{} client:{}", producerGroup, session.getClient());
+                logger.info("addGroupProducerSession success, group:{} client:{}", producerGroup,
+                    session.getClient());
             }
         } catch (Exception e) {
-            logger.error("addGroupProducerSession error! group:{} client:{}", producerGroup, session.getClient(), e);
+            logger.error("addGroupProducerSession error! group:{} client:{}", producerGroup,
+                session.getClient(), e);
         } finally {
             this.groupLock.writeLock().unlock();
         }
@@ -308,7 +346,8 @@ public class ClientGroupWrapper {
 
     public boolean removeGroupConsumerSession(Session session) {
         if (session == null
-                || !StringUtils.equalsIgnoreCase(consumerGroup, EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
+            || !StringUtils.equalsIgnoreCase(consumerGroup,
+            EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
             logger.error("removeGroupConsumerSession param error,session:{}", session);
             return false;
         }
@@ -318,10 +357,12 @@ public class ClientGroupWrapper {
             this.groupLock.writeLock().lockInterruptibly();
             r = groupConsumerSessions.remove(session);
             if (r) {
-                logger.info("removeGroupConsumerSession success, group:{} client:{}", consumerGroup, session.getClient());
+                logger.info("removeGroupConsumerSession success, group:{} client:{}", consumerGroup,
+                    session.getClient());
             }
         } catch (Exception e) {
-            logger.error("removeGroupConsumerSession error! group:{} client:{}", consumerGroup, session.getClient(), e);
+            logger.error("removeGroupConsumerSession error! group:{} client:{}", consumerGroup,
+                session.getClient(), e);
         } finally {
             this.groupLock.writeLock().unlock();
         }
@@ -330,7 +371,8 @@ public class ClientGroupWrapper {
 
     public boolean removeGroupProducerSession(Session session) {
         if (session == null
-                || !StringUtils.equalsIgnoreCase(producerGroup, EventMeshUtil.buildClientGroup(session.getClient().getProducerGroup()))) {
+            || !StringUtils.equalsIgnoreCase(producerGroup,
+            EventMeshUtil.buildClientGroup(session.getClient().getProducerGroup()))) {
             logger.error("removeGroupProducerSession param error,session:{}", session);
             return false;
         }
@@ -340,10 +382,12 @@ public class ClientGroupWrapper {
             this.groupLock.writeLock().lockInterruptibly();
             r = groupProducerSessions.remove(session);
             if (r) {
-                logger.info("removeGroupProducerSession success, group:{} client:{}", producerGroup, session.getClient());
+                logger.info("removeGroupProducerSession success, group:{} client:{}", producerGroup,
+                    session.getClient());
             }
         } catch (Exception e) {
-            logger.error("removeGroupProducerSession error! group:{} client:{}", producerGroup, session.getClient(), e);
+            logger.error("removeGroupProducerSession error! group:{} client:{}", producerGroup,
+                session.getClient(), e);
         } finally {
             this.groupLock.writeLock().unlock();
         }
@@ -360,73 +404,11 @@ public class ClientGroupWrapper {
         keyValue.put("isBroadcast", "false");
         keyValue.put("consumerGroup", consumerGroup);
         keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
-        keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
+        keyValue.put("instanceName", EventMeshUtil
+            .buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
 
         persistentMsgConsumer.init(keyValue);
-//        persistentMsgConsumer.registerMessageListener(new EventMeshMessageListenerConcurrently() {
-//
-//            @Override
-//            public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventMeshConsumeConcurrentlyContext context) {
-//
-//            if (msg == null)
-//                return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-//
-//            eventMeshTcpMonitor.getMq2eventMeshMsgNum().incrementAndGet();
-//            String topic = msg.getTopic();
-//            msg.putUserProperty(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
-//            msg.putUserProperty(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, accessConfiguration.eventMeshServerIp);
-//            msg.putUserProperty(EventMeshConstants.BORN_TIMESTAMP, String.valueOf(msg.getBornTimestamp()));
-//            msg.putUserProperty(EventMeshConstants.STORE_TIMESTAMP, String.valueOf(msg.getStoreTimestamp()));
-//
-//            if (!EventMeshUtil.isValidRMBTopic(topic)) {
-//                return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-//            }
-//
-//            Session session = downstreamDispatchStrategy.select(groupName, topic, groupConsumerSessions);
-//            String bizSeqNo = EventMeshUtil.getMessageBizSeq(msg);
-//            if(session == null){
-//                try {
-//                    Integer sendBackTimes = MapUtils.getInteger(msg.getProperties(), EventMeshConstants.EVENTMESH_SEND_BACK_TIMES, new Integer(0));
-//                    String sendBackFromEventMeshIp = MapUtils.getString(msg.getProperties(), EventMeshConstants.EVENTMESH_SEND_BACK_IP, "");
-//                    logger.error("found no session to downstream msg,groupName:{}, topic:{}, bizSeqNo:{}", groupName, topic, bizSeqNo);
-//
-//                    if (sendBackTimes >= eventMeshTCPServer.getAccessConfiguration().eventMeshTcpSendBackMaxTimes) {
-//                        logger.error("sendBack to broker over max times:{}, groupName:{}, topic:{}, bizSeqNo:{}", eventMeshTCPServer.getAccessConfiguration().eventMeshTcpSendBackMaxTimes, groupName, topic, bizSeqNo);
-//                    } else {
-//                        sendBackTimes++;
-//                        msg.putUserProperty(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES, sendBackTimes.toString());
-//                        msg.putUserProperty(EventMeshConstants.EVENTMESH_SEND_BACK_IP, sendBackFromEventMeshIp);
-//                        sendMsgBackToBroker(msg, bizSeqNo);
-//                    }
-//                } catch (Exception e){
-//                    logger.warn("handle msg exception when no session found", e);
-//                }
-//
-//                return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-//            }
-//
-//            DownStreamMsgContext downStreamMsgContext =
-//                    new DownStreamMsgContext(msg, session, persistentMsgConsumer, (EventMeshConsumeConcurrentlyContext)context, false);
-//
-//            if(downstreamMap.size() < eventMeshTCPServer.getAccessConfiguration().eventMeshTcpDownStreamMapSize){
-//                downstreamMap.putIfAbsent(downStreamMsgContext.seq, downStreamMsgContext);
-//            }else{
-//                logger.warn("downStreamMap is full,group:{}", groupName);
-//            }
-//
-//            if (session.isCanDownStream()) {
-//                session.downstreamMsg(downStreamMsgContext);
-//                return EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH;
-//            }
-//
-//            logger.warn("session is busy,dispatch retry,seq:{}, session:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.session.getClient(), bizSeqNo);
-//            long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getTopic()) ? 0 : eventMeshTCPServer.getAccessConfiguration().eventMeshTcpMsgRetryDelayInMills;
-//            downStreamMsgContext.delay(delayTime);
-//            eventMeshTcpRetryer.pushRetry(downStreamMsgContext);
-//
-//            return EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH;
-//            }
-//        });
+
         inited4Persistent.compareAndSet(false, true);
         logger.info("init persistentMsgConsumer success, group:{}", consumerGroup);
     }
@@ -449,59 +431,10 @@ public class ClientGroupWrapper {
         keyValue.put("isBroadcast", "true");
         keyValue.put("consumerGroup", consumerGroup);
         keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
-        keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
+        keyValue.put("instanceName", EventMeshUtil
+            .buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
         broadCastMsgConsumer.init(keyValue);
-//        broadCastMsgConsumer.registerMessageListener(new EventMeshMessageListenerConcurrently() {
-//            @Override
-//            public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventMeshConsumeConcurrentlyContext context) {
-//                if (msg == null)
-//                    return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-//
-//                eventMeshTcpMonitor.getMq2eventMeshMsgNum().incrementAndGet();
-//
-//                String topic = msg.getTopic();
-//
-//                msg.putUserProperty(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
-//                msg.putUserProperty(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, accessConfiguration.eventMeshServerIp);
-//                msg.putUserProperty(EventMeshConstants.BORN_TIMESTAMP, String.valueOf(msg.getBornTimestamp()));
-//                msg.putUserProperty(EventMeshConstants.STORE_TIMESTAMP, String.valueOf(msg.getStoreTimestamp()));
-//
-//                if (!EventMeshUtil.isValidRMBTopic(topic)) {
-//                    return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-//                }
-//
-//                if(CollectionUtils.isEmpty(groupConsumerSessions)){
-//                    logger.warn("found no session to downstream broadcast msg");
-//                    return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-//                }
-//
-//                Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
-//
-//                while (sessionsItr.hasNext()) {
-//                    Session session = sessionsItr.next();
-//
-//                    if (!session.isAvailable(topic)) {
-//                        logger.warn("downstream broadcast msg,session is not available,client:{}",session.getClient());
-//                        continue;
-//                    }
-//
-//                    DownStreamMsgContext downStreamMsgContext =
-//                            new DownStreamMsgContext(msg, session, broadCastMsgConsumer, context, false);
-//
-//                    if (session.isCanDownStream()) {
-//                        session.downstreamMsg(downStreamMsgContext);
-//                        continue;
-//                    }
-//
-//                    logger.warn("downstream broadcast msg,session is busy,dispatch retry,seq:{}, session:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.session.getClient(), EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
-//                    long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getTopic()) ? 0 : eventMeshTCPServer.getAccessConfiguration().eventMeshTcpMsgRetryDelayInMills;
-//                    downStreamMsgContext.delay(delayTime);
-//                    eventMeshTcpRetryer.pushRetry(downStreamMsgContext);
-//                }
-//
-//                return EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH;
-//            }
-//        });
+
         inited4Broadcast.compareAndSet(false, true);
         logger.info("init broadCastMsgConsumer success, group:{}", consumerGroup);
     }
@@ -523,15 +456,17 @@ public class ClientGroupWrapper {
                 public void consume(Message message, AsyncConsumeContext context) {
 
                     eventMeshTcpMonitor.getMq2EventMeshMsgNum().incrementAndGet();
-                    String topic = message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
-                    message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
-                    message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);
-
-                    EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
+                    String topic =
+                        message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
+                    message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+                        String.valueOf(System.currentTimeMillis()));
+                    message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+                        eventMeshTCPConfiguration.eventMeshServerIp);
+
+                    EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
+                        (EventMeshAsyncConsumeContext) context;
                     if (CollectionUtils.isEmpty(groupConsumerSessions)) {
                         logger.warn("found no session to downstream broadcast msg");
-//                        context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
-//                        context.ack();
                         eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                         return;
                     }
@@ -539,31 +474,35 @@ public class ClientGroupWrapper {
                     Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
 
                     DownStreamMsgContext downStreamMsgContext =
-                            new DownStreamMsgContext(message, null, broadCastMsgConsumer, eventMeshAsyncConsumeContext.getAbstractContext(), false, subscriptionItem);
+                        new DownStreamMsgContext(message, null, broadCastMsgConsumer,
+                            eventMeshAsyncConsumeContext.getAbstractContext(), false,
+                            subscriptionItem);
 
                     while (sessionsItr.hasNext()) {
                         Session session = sessionsItr.next();
 
                         if (!session.isAvailable(topic)) {
-                            logger.warn("downstream broadcast msg,session is not available,client:{}", session.getClient());
+                            logger
+                                .warn("downstream broadcast msg,session is not available,client:{}",
+                                    session.getClient());
                             continue;
                         }
 
                         downStreamMsgContext.session = session;
 
                         //downstream broadcast msg asynchronously
-                        eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService().submit(new Runnable() {
-                            @Override
-                            public void run() {
-                                //msg put in eventmesh,waiting client ack
-                                session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
-                                session.downstreamMsg(downStreamMsgContext);
-                            }
-                        });
+                        eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService()
+                            .submit(new Runnable() {
+                                @Override
+                                public void run() {
+                                    //msg put in eventmesh,waiting client ack
+                                    session.getPusher()
+                                        .unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
+                                    session.downstreamMsg(downStreamMsgContext);
+                                }
+                            });
                     }
 
-//                    context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
-//                    context.ack();
                     eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
                 }
             };
@@ -573,51 +512,72 @@ public class ClientGroupWrapper {
                 @Override
                 public void consume(Message message, AsyncConsumeContext context) {
                     eventMeshTcpMonitor.getMq2EventMeshMsgNum().incrementAndGet();
-                    String topic = message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
-                    message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
-                    message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);
-
-                    EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
-                    Session session = downstreamDispatchStrategy.select(consumerGroup, topic, groupConsumerSessions);
+                    String topic =
+                        message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
+                    message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+                        String.valueOf(System.currentTimeMillis()));
+                    message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+                        eventMeshTCPConfiguration.eventMeshServerIp);
+
+                    EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
+                        (EventMeshAsyncConsumeContext) context;
+                    Session session = downstreamDispatchStrategy
+                        .select(consumerGroup, topic, groupConsumerSessions);
                     String bizSeqNo = EventMeshUtil.getMessageBizSeq(message);
                     if (session == null) {
                         try {
                             Integer sendBackTimes = new Integer(0);
                             String sendBackFromEventMeshIp = "";
-                            if (StringUtils.isNotBlank(message.getSystemProperties(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES))) {
-                                sendBackTimes = Integer.valueOf(message.getSystemProperties(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES));
+                            if (StringUtils.isNotBlank(message.getSystemProperties(
+                                EventMeshConstants.EVENTMESH_SEND_BACK_TIMES))) {
+                                sendBackTimes = Integer.valueOf(message.getSystemProperties(
+                                    EventMeshConstants.EVENTMESH_SEND_BACK_TIMES));
                             }
-                            if (StringUtils.isNotBlank(message.getSystemProperties(EventMeshConstants.EVENTMESH_SEND_BACK_IP))) {
-                                sendBackFromEventMeshIp = message.getSystemProperties(EventMeshConstants.EVENTMESH_SEND_BACK_IP);
+                            if (StringUtils.isNotBlank(message
+                                .getSystemProperties(EventMeshConstants.EVENTMESH_SEND_BACK_IP))) {
+                                sendBackFromEventMeshIp = message
+                                    .getSystemProperties(EventMeshConstants.EVENTMESH_SEND_BACK_IP);
                             }
 
-                            logger.error("found no session to downstream msg,groupName:{}, topic:{}, bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}", consumerGroup, topic, bizSeqNo, sendBackTimes, sendBackFromEventMeshIp);
-
-                            if (sendBackTimes >= eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
-                                logger.error("sendBack to broker over max times:{}, groupName:{}, topic:{}, bizSeqNo:{}", eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes, consumerGroup, topic, bizSeqNo);
+                            logger.error(
+                                "found no session to downstream msg,groupName:{}, topic:{}, "
+                                    + "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}",
+                                consumerGroup, topic, bizSeqNo, sendBackTimes,
+                                sendBackFromEventMeshIp);
+
+                            if (sendBackTimes >= eventMeshTCPServer
+                                .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
+                                logger.error(
+                                    "sendBack to broker over max times:{}, groupName:{}, topic:{}, "
+                                        + "bizSeqNo:{}", eventMeshTCPServer
+                                        .getEventMeshTCPConfiguration()
+                                        .eventMeshTcpSendBackMaxTimes,
+                                    consumerGroup, topic, bizSeqNo);
                             } else {
                                 sendBackTimes++;
-                                message.getSystemProperties().put(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES, sendBackTimes.toString());
-                                message.getSystemProperties().put(EventMeshConstants.EVENTMESH_SEND_BACK_IP, eventMeshTCPConfiguration.eventMeshServerIp);
+                                message.getSystemProperties()
+                                    .put(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES,
+                                        sendBackTimes.toString());
+                                message.getSystemProperties()
+                                    .put(EventMeshConstants.EVENTMESH_SEND_BACK_IP,
+                                        eventMeshTCPConfiguration.eventMeshServerIp);
                                 sendMsgBackToBroker(message, bizSeqNo);
                             }
                         } catch (Exception e) {
                             logger.warn("handle msg exception when no session found", e);
                         }
 
-//                        context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
-//                        context.ack();
                         eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                         return;
                     }
 
                     DownStreamMsgContext downStreamMsgContext =
-                            new DownStreamMsgContext(message, session, persistentMsgConsumer, eventMeshAsyncConsumeContext.getAbstractContext(), false, subscriptionItem);
+                        new DownStreamMsgContext(message, session, persistentMsgConsumer,
+                            eventMeshAsyncConsumeContext.getAbstractContext(), false,
+                            subscriptionItem);
                     //msg put in eventmesh,waiting client ack
                     session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
                     session.downstreamMsg(downStreamMsgContext);
-//                    context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
-//                    context.ack();
                     eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
                 }
             };
@@ -690,7 +650,8 @@ public class ClientGroupWrapper {
         return downstreamDispatchStrategy;
     }
 
-    public void setDownstreamDispatchStrategy(DownstreamDispatchStrategy downstreamDispatchStrategy) {
+    public void setDownstreamDispatchStrategy(
+        DownstreamDispatchStrategy downstreamDispatchStrategy) {
         this.downstreamDispatchStrategy = downstreamDispatchStrategy;
     }
 
@@ -700,23 +661,25 @@ public class ClientGroupWrapper {
 
     private String pushMsgToEventMesh(Message msg, String ip, int port) throws Exception {
         StringBuilder targetUrl = new StringBuilder();
-        targetUrl.append("http://").append(ip).append(":").append(port).append("/eventMesh/msg/push");
+        targetUrl.append("http://").append(ip).append(":").append(port)
+            .append("/eventMesh/msg/push");
         HttpTinyClient.HttpResult result = null;
 
         try {
-            logger.info("pushMsgToEventMesh,targetUrl:{},msg:{}", targetUrl.toString(), msg.toString());
+            logger.info("pushMsgToEventMesh,targetUrl:{},msg:{}", targetUrl.toString(),
+                msg.toString());
             List<String> paramValues = new ArrayList<String>();
             paramValues.add("msg");
-            paramValues.add(JSON.toJSONString(msg));
+            paramValues.add(JsonUtils.serialize(msg));
             paramValues.add("group");
             paramValues.add(consumerGroup);
 
             result = HttpTinyClient.httpPost(
-                    targetUrl.toString(),
-                    null,
-                    paramValues,
-                    "UTF-8",
-                    3000);
+                targetUrl.toString(),
+                null,
+                paramValues,
+                "UTF-8",
+                3000);
         } catch (Exception e) {
             logger.error("httpPost " + targetUrl + " is fail,", e);
             //throw new RuntimeException("httpPost " + targetUrl + " is fail," , e);
@@ -727,7 +690,8 @@ public class ClientGroupWrapper {
             return result.content;
 
         } else {
-            throw new Exception("httpPost targetUrl[" + targetUrl + "] is not OK when getContentThroughHttp, httpResult: " + result + ".");
+            throw new Exception("httpPost targetUrl[" + targetUrl
+                + "] is not OK when getContentThroughHttp, httpResult: " + result + ".");
         }
     }
 
@@ -742,22 +706,23 @@ public class ClientGroupWrapper {
 
             long startTime = System.currentTimeMillis();
             long taskExcuteTime = startTime;
-            send(new UpStreamMsgContext(null, msg, null, startTime, taskExcuteTime), new SendCallback() {
-                @Override
-                public void onSuccess(SendResult sendResult) {
-                    logger.info("consumerGroup:{} consume fail, sendMessageBack success, bizSeqno:{}, topic:{}", consumerGroup, bizSeqNo, topic);
-                }
+            send(new UpStreamMsgContext(null, msg, null, startTime, taskExcuteTime),
+                new SendCallback() {
+                    @Override
+                    public void onSuccess(SendResult sendResult) {
+                        logger.info(
+                            "consumerGroup:{} consume fail, sendMessageBack success, bizSeqno:{}, "
+                                + "topic:{}", consumerGroup, bizSeqNo, topic);
+                    }
 
-                @Override
-                public void onException(OnExceptionContext context) {
-                    logger.warn("consumerGroup:{} consume fail, sendMessageBack fail, bizSeqno:{}, topic:{}", consumerGroup, bizSeqNo, topic);
-                }
+                    @Override
+                    public void onException(OnExceptionContext context) {
+                        logger.warn(
+                            "consumerGroup:{} consume fail, sendMessageBack fail, bizSeqno:{},"
+                                + " topic:{}", consumerGroup, bizSeqNo, topic);
+                    }
 
-//                @Override
-//                public void onException(Throwable e) {
-//                    logger.warn("consumerGroup:{} consume fail, sendMessageBack fail, bizSeqno:{}, topic:{}", groupName, bizSeqNo, topic);
-//                }
-            });
+                });
             eventMeshTcpMonitor.getEventMesh2mqMsgNum().incrementAndGet();
         } catch (Exception e) {
             logger.warn("try send msg back to broker failed");
diff --git a/eventmesh-schema-registry/build.gradle b/eventmesh-schema-registry/build.gradle
index 96a33a1..6a9c064 100644
--- a/eventmesh-schema-registry/build.gradle
+++ b/eventmesh-schema-registry/build.gradle
@@ -15,19 +15,3 @@
  * limitations under the License.
  */
 
-plugins {
-}
-
-group 'org.apache.eventmesh'
-version '1.2.0-SNAPSHOT'
-
-repositories {
-    mavenCentral()
-}
-
-dependencies {
-}
-
-test {
-    useJUnitPlatform()
-}
diff --git a/eventmesh-schema-registry/eventmesh-schema-registry-server/build.gradle b/eventmesh-schema-registry/eventmesh-schema-registry-server/build.gradle
index d97e5d8..15e630e 100644
--- a/eventmesh-schema-registry/eventmesh-schema-registry-server/build.gradle
+++ b/eventmesh-schema-registry/eventmesh-schema-registry-server/build.gradle
@@ -15,9 +15,6 @@
  * limitations under the License.
  */
 
-group 'org.apache.eventmesh'
-version '1.2.0-SNAPSHOT'
-
 configurations {
     compileOnly {
         extendsFrom annotationProcessor
diff --git a/eventmesh-sdk-java/build.gradle b/eventmesh-sdk-java/build.gradle
index c11729d..e57a607 100644
--- a/eventmesh-sdk-java/build.gradle
+++ b/eventmesh-sdk-java/build.gradle
@@ -17,5 +17,23 @@
 
 dependencies {
     implementation project(":eventmesh-common")
+    implementation project(":eventmesh-common")
+
+    implementation "com.fasterxml.jackson.core:jackson-databind"
+    implementation "com.fasterxml.jackson.core:jackson-core"
+    implementation "com.fasterxml.jackson.core:jackson-annotations"
+
+    implementation "io.netty:netty-all"
+    implementation "org.apache.httpcomponents:httpclient"
+
     testImplementation project(":eventmesh-common")
+    testImplementation project(":eventmesh-common")
+
+    testImplementation "com.fasterxml.jackson.core:jackson-databind"
+    testImplementation "com.fasterxml.jackson.core:jackson-core"
+    testImplementation "com.fasterxml.jackson.core:jackson-annotations"
+
+    testImplementation "io.netty:netty-all"
+    testImplementation "org.apache.httpcomponents:httpclient"
+
 }
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
index 61d3c48..7915d40 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
@@ -17,25 +17,6 @@
 
 package org.apache.eventmesh.client.http.consumer;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.*;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import io.netty.handler.codec.http.HttpMethod;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.RandomUtils;
 import org.apache.eventmesh.client.http.AbstractLiteClient;
 import org.apache.eventmesh.client.http.EventMeshRetObj;
 import org.apache.eventmesh.client.http.RemotingServer;
@@ -57,12 +38,28 @@ import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
 import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
 import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
 import org.apache.eventmesh.common.protocol.http.common.RequestCode;
-import org.apache.eventmesh.common.protocol.tcp.Subscription;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
 import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import io.netty.handler.codec.http.HttpMethod;
+
 public class LiteConsumer extends AbstractLiteClient {
 
     public Logger logger = LoggerFactory.getLogger(LiteConsumer.class);
@@ -77,11 +74,13 @@ public class LiteConsumer extends AbstractLiteClient {
 
     private LiteMessageListener messageListener;
 
-    protected final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4, new EventMeshThreadFactoryImpl("TCPClientScheduler", true));
+    protected final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4,
+        new EventMeshThreadFactoryImpl("TCPClientScheduler", true));
 
     public LiteConsumer(LiteClientConfig liteClientConfig) throws Exception {
         super(liteClientConfig);
-        this.consumeExecutor = ThreadPoolFactory.createThreadPoolExecutor(liteClientConfig.getConsumeThreadCore(),
+        this.consumeExecutor =
+            ThreadPoolFactory.createThreadPoolExecutor(liteClientConfig.getConsumeThreadCore(),
                 liteClientConfig.getConsumeThreadMax(), "eventMesh-client-consume-");
         this.eventMeshClientConfig = liteClientConfig;
 //        this.remotingServer = new RemotingServer(10106, consumeExecutor);
@@ -100,7 +99,8 @@ public class LiteConsumer extends AbstractLiteClient {
 
     @Override
     public void start() throws Exception {
-        Preconditions.checkState(eventMeshClientConfig != null, "eventMeshClientConfig can't be null");
+        Preconditions
+            .checkState(eventMeshClientConfig != null, "eventMeshClientConfig can't be null");
         Preconditions.checkState(consumeExecutor != null, "consumeExecutor can't be null");
 //        Preconditions.checkState(messageListener != null, "messageListener can't be null");
         logger.info("LiteConsumer starting");
@@ -134,15 +134,18 @@ public class LiteConsumer extends AbstractLiteClient {
         String target = selectEventMesh();
         String subRes = "";
 
-        try (CloseableHttpClient httpClient = setHttpClient()){
+        try (CloseableHttpClient httpClient = setHttpClient()) {
             subRes = HttpUtil.post(httpClient, target, subscribeParam);
         }
 
         if (logger.isDebugEnabled()) {
-            logger.debug("subscribe message by await, targetEventMesh:{}, cost:{}ms, subscribeParam:{}, rtn:{}", target, System.currentTimeMillis() - startTime, JSON.toJSONString(subscribeParam), subRes);
+            logger.debug(
+                "subscribe message by await, targetEventMesh:{}, cost:{}ms, subscribeParam:{}, "
+                    + "rtn:{}", target, System.currentTimeMillis() - startTime,
+                JsonUtils.serialize(subscribeParam), subRes);
         }
 
-        EventMeshRetObj ret = JSON.parseObject(subRes, EventMeshRetObj.class);
+        EventMeshRetObj ret = JsonUtils.deserialize(subRes, EventMeshRetObj.class);
 
         if (ret.getRetCode() == EventMeshRetCode.SUCCESS.getRetCode()) {
             return Boolean.TRUE;
@@ -152,53 +155,58 @@ public class LiteConsumer extends AbstractLiteClient {
 
     }
 
-    private RequestParam generateSubscribeRequestParam(List<SubscriptionItem> topicList, String url) {
+    private RequestParam generateSubscribeRequestParam(List<SubscriptionItem> topicList,
+                                                       String url) {
 //        final LiteMessage liteMessage = new LiteMessage();
 //        liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30))
 //                .setContent("subscribe message")
 //                .setUniqueId(RandomStringUtils.randomNumeric(30));
         RequestParam requestParam = new RequestParam(HttpMethod.POST);
-        requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.SUBSCRIBE.getRequestCode()))
-                .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv())
-                .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc())
-                .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp())
-                .addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshClientConfig.getPid())
-                .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshClientConfig.getSys())
-                .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshClientConfig.getUserName())
-                .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshClientConfig.getPassword())
-                .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
-                .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
-                .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
-                .addBody(SubscribeRequestBody.TOPIC, JSONObject.toJSONString(topicList))
-                .addBody(SubscribeRequestBody.CONSUMERGROUP, eventMeshClientConfig.getConsumerGroup())
-                .addBody(SubscribeRequestBody.URL, url);
+        requestParam.addHeader(ProtocolKey.REQUEST_CODE,
+            String.valueOf(RequestCode.SUBSCRIBE.getRequestCode()))
+            .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv())
+            .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc())
+            .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp())
+            .addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshClientConfig.getPid())
+            .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshClientConfig.getSys())
+            .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshClientConfig.getUserName())
+            .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshClientConfig.getPassword())
+            .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
+            .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
+            .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
+            .addBody(SubscribeRequestBody.TOPIC, JsonUtils.serialize(topicList))
+            .addBody(SubscribeRequestBody.CONSUMERGROUP, eventMeshClientConfig.getConsumerGroup())
+            .addBody(SubscribeRequestBody.URL, url);
         return requestParam;
     }
 
     private RequestParam generateHeartBeatRequestParam(List<SubscriptionItem> topics, String url) {
         List<HeartbeatRequestBody.HeartbeatEntity> heartbeatEntities = new ArrayList<>();
         for (SubscriptionItem item : topics) {
-            HeartbeatRequestBody.HeartbeatEntity heartbeatEntity = new HeartbeatRequestBody.HeartbeatEntity();
+            HeartbeatRequestBody.HeartbeatEntity heartbeatEntity =
+                new HeartbeatRequestBody.HeartbeatEntity();
             heartbeatEntity.topic = item.getTopic();
             heartbeatEntity.url = url;
             heartbeatEntities.add(heartbeatEntity);
         }
 
         RequestParam requestParam = new RequestParam(HttpMethod.POST);
-        requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.HEARTBEAT.getRequestCode()))
-                .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv())
-                .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc())
-                .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp())
-                .addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshClientConfig.getPid())
-                .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshClientConfig.getSys())
-                .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshClientConfig.getUserName())
-                .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshClientConfig.getPassword())
-                .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
-                .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
-                .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
-                .addBody(HeartbeatRequestBody.CLIENTTYPE, ClientType.SUB.name())
-                .addBody(HeartbeatRequestBody.CONSUMERGROUP, eventMeshClientConfig.getConsumerGroup())
-                .addBody(HeartbeatRequestBody.HEARTBEATENTITIES, JSON.toJSONString(heartbeatEntities));
+        requestParam.addHeader(ProtocolKey.REQUEST_CODE,
+            String.valueOf(RequestCode.HEARTBEAT.getRequestCode()))
+            .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv())
+            .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc())
+            .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp())
+            .addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshClientConfig.getPid())
+            .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshClientConfig.getSys())
+            .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshClientConfig.getUserName())
+            .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshClientConfig.getPassword())
+            .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
+            .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
+            .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
+            .addBody(HeartbeatRequestBody.CLIENTTYPE, ClientType.SUB.name())
+            .addBody(HeartbeatRequestBody.CONSUMERGROUP, eventMeshClientConfig.getConsumerGroup())
+            .addBody(HeartbeatRequestBody.HEARTBEATENTITIES,
+                JsonUtils.serialize(heartbeatEntities));
         return requestParam;
     }
 
@@ -221,10 +229,12 @@ public class LiteConsumer extends AbstractLiteClient {
                     }
 
                     if (logger.isDebugEnabled()) {
-                        logger.debug("heartBeat message by await, targetEventMesh:{}, cost:{}ms, rtn:{}", target, System.currentTimeMillis() - startTime, res);
+                        logger.debug(
+                            "heartBeat message by await, targetEventMesh:{}, cost:{}ms, rtn:{}",
+                            target, System.currentTimeMillis() - startTime, res);
                     }
 
-                    EventMeshRetObj ret = JSON.parseObject(res, EventMeshRetObj.class);
+                    EventMeshRetObj ret = JsonUtils.deserialize(res, EventMeshRetObj.class);
 
                     if (ret.getRetCode() != EventMeshRetCode.SUCCESS.getRetCode()) {
                         throw new EventMeshException(ret.getRetCode(), ret.getRetMsg());
@@ -239,7 +249,7 @@ public class LiteConsumer extends AbstractLiteClient {
     public boolean unsubscribe(List<String> topicList, String url) throws Exception {
         Set<String> unSub = new HashSet<>(topicList);
         Iterator<SubscriptionItem> itr = subscription.iterator();
-        while(itr.hasNext()) {
+        while (itr.hasNext()) {
             SubscriptionItem item = itr.next();
             if (unSub.contains(item.getTopic())) {
                 itr.remove();
@@ -257,10 +267,13 @@ public class LiteConsumer extends AbstractLiteClient {
         }
 
         if (logger.isDebugEnabled()) {
-            logger.debug("unSubscribe message by await, targetEventMesh:{}, cost:{}ms, unSubscribeParam:{}, rtn:{}", target, System.currentTimeMillis() - startTime, JSON.toJSONString(unSubscribeParam), unSubRes);
+            logger.debug(
+                "unSubscribe message by await, targetEventMesh:{}, cost:{}ms, unSubscribeParam:{}, "
+                    + "rtn:{}", target, System.currentTimeMillis() - startTime,
+                JsonUtils.serialize(unSubscribeParam), unSubRes);
         }
 
-        EventMeshRetObj ret = JSON.parseObject(unSubRes, EventMeshRetObj.class);
+        EventMeshRetObj ret = JsonUtils.deserialize(unSubRes, EventMeshRetObj.class);
 
         if (ret.getRetCode() == EventMeshRetCode.SUCCESS.getRetCode()) {
             return Boolean.TRUE;
@@ -271,24 +284,26 @@ public class LiteConsumer extends AbstractLiteClient {
 
     private RequestParam generateUnSubscribeRequestParam(List<String> topicList, String url) {
         RequestParam requestParam = new RequestParam(HttpMethod.POST);
-        requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.UNSUBSCRIBE.getRequestCode()))
-                .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv())
-                .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc())
-                .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp())
-                .addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshClientConfig.getPid())
-                .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshClientConfig.getSys())
-                .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshClientConfig.getUserName())
-                .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshClientConfig.getPassword())
-                .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
-                .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
-                .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
-                .addBody(UnSubscribeRequestBody.TOPIC, JSONObject.toJSONString(topicList))
-                .addBody(UnSubscribeRequestBody.CONSUMERGROUP, eventMeshClientConfig.getConsumerGroup())
-                .addBody(UnSubscribeRequestBody.URL, url);
+        requestParam.addHeader(ProtocolKey.REQUEST_CODE,
+            String.valueOf(RequestCode.UNSUBSCRIBE.getRequestCode()))
+            .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv())
+            .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc())
+            .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp())
+            .addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshClientConfig.getPid())
+            .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshClientConfig.getSys())
+            .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshClientConfig.getUserName())
+            .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshClientConfig.getPassword())
+            .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
+            .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
+            .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
+            .addBody(UnSubscribeRequestBody.TOPIC, JsonUtils.serialize(topicList))
+            .addBody(UnSubscribeRequestBody.CONSUMERGROUP, eventMeshClientConfig.getConsumerGroup())
+            .addBody(UnSubscribeRequestBody.URL, url);
         return requestParam;
     }
 
-    public void registerMessageListener(LiteMessageListener messageListener) throws EventMeshException {
+    public void registerMessageListener(LiteMessageListener messageListener)
+        throws EventMeshException {
         this.messageListener = messageListener;
         remotingServer.registerMessageListener(this.messageListener);
     }
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java
index a240b95..376e6e6 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java
@@ -17,29 +17,11 @@
 
 package org.apache.eventmesh.client.http.producer;
 
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-
-
-import com.alibaba.fastjson.JSON;
-import com.google.common.base.Preconditions;
-
-import io.netty.handler.codec.http.HttpMethod;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.ObjectUtils;
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.eventmesh.client.http.AbstractLiteClient;
 import org.apache.eventmesh.client.http.EventMeshRetObj;
 import org.apache.eventmesh.client.http.conf.LiteClientConfig;
 import org.apache.eventmesh.client.http.http.HttpUtil;
 import org.apache.eventmesh.client.http.http.RequestParam;
-import org.apache.eventmesh.client.http.ssl.MyX509TrustManager;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.EventMeshException;
 import org.apache.eventmesh.common.LiteMessage;
@@ -49,14 +31,21 @@ import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
 import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
 import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
 import org.apache.eventmesh.common.protocol.http.common.RequestCode;
-import org.apache.http.conn.ssl.DefaultHostnameVerifier;
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
+import io.netty.handler.codec.http.HttpMethod;
+
 public class LiteProducer extends AbstractLiteClient {
 
     public Logger logger = LoggerFactory.getLogger(LiteProducer.class);
@@ -70,7 +59,8 @@ public class LiteProducer extends AbstractLiteClient {
     @Override
     public void start() throws Exception {
         Preconditions.checkState(liteClientConfig != null, "liteClientConfig can't be null");
-        Preconditions.checkState(liteClientConfig.getLiteEventMeshAddr() != null, "liteClientConfig.liteServerAddr can't be null");
+        Preconditions.checkState(liteClientConfig.getLiteEventMeshAddr() != null,
+            "liteClientConfig.liteServerAddr can't be null");
         if (started.get()) {
             return;
         }
@@ -100,27 +90,29 @@ public class LiteProducer extends AbstractLiteClient {
             start();
         }
         Preconditions.checkState(StringUtils.isNotBlank(message.getTopic()),
-                "eventMeshMessage[topic] invalid");
+            "eventMeshMessage[topic] invalid");
         Preconditions.checkState(StringUtils.isNotBlank(message.getContent()),
-                "eventMeshMessage[content] invalid");
+            "eventMeshMessage[content] invalid");
         RequestParam requestParam = new RequestParam(HttpMethod.POST);
-        requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.MSG_SEND_ASYNC.getRequestCode()))
-                .addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv())
-                .addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc())
-                .addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp())
-                .addHeader(ProtocolKey.ClientInstanceKey.PID, liteClientConfig.getPid())
-                .addHeader(ProtocolKey.ClientInstanceKey.SYS, liteClientConfig.getSys())
-                .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, liteClientConfig.getUserName())
-                .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, liteClientConfig.getPassword())
-                .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
-                .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
-                .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
-                .addBody(SendMessageRequestBody.PRODUCERGROUP, liteClientConfig.getProducerGroup())
-                .addBody(SendMessageRequestBody.TOPIC, message.getTopic())
-                .addBody(SendMessageRequestBody.CONTENT, message.getContent())
-                .addBody(SendMessageRequestBody.TTL, message.getPropKey(Constants.EVENTMESH_MESSAGE_CONST_TTL))
-                .addBody(SendMessageRequestBody.BIZSEQNO, message.getBizSeqNo())
-                .addBody(SendMessageRequestBody.UNIQUEID, message.getUniqueId());
+        requestParam.addHeader(ProtocolKey.REQUEST_CODE,
+            String.valueOf(RequestCode.MSG_SEND_ASYNC.getRequestCode()))
+            .addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv())
+            .addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc())
+            .addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp())
+            .addHeader(ProtocolKey.ClientInstanceKey.PID, liteClientConfig.getPid())
+            .addHeader(ProtocolKey.ClientInstanceKey.SYS, liteClientConfig.getSys())
+            .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, liteClientConfig.getUserName())
+            .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, liteClientConfig.getPassword())
+            .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
+            .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
+            .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
+            .addBody(SendMessageRequestBody.PRODUCERGROUP, liteClientConfig.getProducerGroup())
+            .addBody(SendMessageRequestBody.TOPIC, message.getTopic())
+            .addBody(SendMessageRequestBody.CONTENT, message.getContent())
+            .addBody(SendMessageRequestBody.TTL,
+                message.getPropKey(Constants.EVENTMESH_MESSAGE_CONST_TTL))
+            .addBody(SendMessageRequestBody.BIZSEQNO, message.getBizSeqNo())
+            .addBody(SendMessageRequestBody.UNIQUEID, message.getUniqueId());
 
         long startTime = System.currentTimeMillis();
         String target = selectEventMesh();
@@ -132,10 +124,10 @@ public class LiteProducer extends AbstractLiteClient {
 
         if (logger.isDebugEnabled()) {
             logger.debug("publish async message, targetEventMesh:{}, cost:{}ms, message:{}, rtn:{}",
-                    target, System.currentTimeMillis() - startTime, message, res);
+                target, System.currentTimeMillis() - startTime, message, res);
         }
 
-        EventMeshRetObj ret = JSON.parseObject(res, EventMeshRetObj.class);
+        EventMeshRetObj ret = JsonUtils.deserialize(res, EventMeshRetObj.class);
 
         if (ret.getRetCode() == EventMeshRetCode.SUCCESS.getRetCode()) {
             return Boolean.TRUE;
@@ -157,27 +149,28 @@ public class LiteProducer extends AbstractLiteClient {
             start();
         }
         Preconditions.checkState(StringUtils.isNotBlank(message.getTopic()),
-                "eventMeshMessage[topic] invalid");
+            "eventMeshMessage[topic] invalid");
         Preconditions.checkState(StringUtils.isNotBlank(message.getContent()),
-                "eventMeshMessage[content] invalid");
+            "eventMeshMessage[content] invalid");
         RequestParam requestParam = new RequestParam(HttpMethod.POST);
-        requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.MSG_SEND_SYNC.getRequestCode()))
-                .addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv())
-                .addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc())
-                .addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp())
-                .addHeader(ProtocolKey.ClientInstanceKey.PID, liteClientConfig.getPid())
-                .addHeader(ProtocolKey.ClientInstanceKey.SYS, liteClientConfig.getSys())
-                .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, liteClientConfig.getUserName())
-                .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, liteClientConfig.getPassword())
-                .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
-                .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
-                .setTimeout(timeout)
-                .addBody(SendMessageRequestBody.PRODUCERGROUP, liteClientConfig.getProducerGroup())
-                .addBody(SendMessageRequestBody.TOPIC, message.getTopic())
-                .addBody(SendMessageRequestBody.CONTENT, message.getContent())
-                .addBody(SendMessageRequestBody.TTL, String.valueOf(timeout))
-                .addBody(SendMessageRequestBody.BIZSEQNO, message.getBizSeqNo())
-                .addBody(SendMessageRequestBody.UNIQUEID, message.getUniqueId());
+        requestParam.addHeader(ProtocolKey.REQUEST_CODE,
+            String.valueOf(RequestCode.MSG_SEND_SYNC.getRequestCode()))
+            .addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv())
+            .addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc())
+            .addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp())
+            .addHeader(ProtocolKey.ClientInstanceKey.PID, liteClientConfig.getPid())
+            .addHeader(ProtocolKey.ClientInstanceKey.SYS, liteClientConfig.getSys())
+            .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, liteClientConfig.getUserName())
+            .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, liteClientConfig.getPassword())
+            .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
+            .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
+            .setTimeout(timeout)
+            .addBody(SendMessageRequestBody.PRODUCERGROUP, liteClientConfig.getProducerGroup())
+            .addBody(SendMessageRequestBody.TOPIC, message.getTopic())
+            .addBody(SendMessageRequestBody.CONTENT, message.getContent())
+            .addBody(SendMessageRequestBody.TTL, String.valueOf(timeout))
+            .addBody(SendMessageRequestBody.BIZSEQNO, message.getBizSeqNo())
+            .addBody(SendMessageRequestBody.UNIQUEID, message.getUniqueId());
 
         long startTime = System.currentTimeMillis();
         String target = selectEventMesh();
@@ -188,16 +181,18 @@ public class LiteProducer extends AbstractLiteClient {
         }
 
         if (logger.isDebugEnabled()) {
-            logger.debug("publish sync message by await, targetEventMesh:{}, cost:{}ms, message:{}, rtn:{}", target, System.currentTimeMillis() - startTime, message, res);
+            logger.debug(
+                "publish sync message by await, targetEventMesh:{}, cost:{}ms, message:{}, rtn:{}",
+                target, System.currentTimeMillis() - startTime, message, res);
         }
 
-        EventMeshRetObj ret = JSON.parseObject(res, EventMeshRetObj.class);
+        EventMeshRetObj ret = JsonUtils.deserialize(res, EventMeshRetObj.class);
         if (ret.getRetCode() == EventMeshRetCode.SUCCESS.getRetCode()) {
             LiteMessage eventMeshMessage = new LiteMessage();
             SendMessageResponseBody.ReplyMessage replyMessage =
-                    JSON.parseObject(ret.getRetMsg(), SendMessageResponseBody.ReplyMessage.class);
+                JsonUtils.deserialize(ret.getRetMsg(), SendMessageResponseBody.ReplyMessage.class);
             eventMeshMessage.setContent(replyMessage.body).setProp(replyMessage.properties)
-                    .setTopic(replyMessage.topic);
+                .setTopic(replyMessage.topic);
             return eventMeshMessage;
         }
 
@@ -209,39 +204,42 @@ public class LiteProducer extends AbstractLiteClient {
             start();
         }
         Preconditions.checkState(StringUtils.isNotBlank(message.getTopic()),
-                "eventMeshMessage[topic] invalid");
+            "eventMeshMessage[topic] invalid");
         Preconditions.checkState(StringUtils.isNotBlank(message.getContent()),
-                "eventMeshMessage[content] invalid");
+            "eventMeshMessage[content] invalid");
         Preconditions.checkState(ObjectUtils.allNotNull(rrCallback),
-                "rrCallback invalid");
+            "rrCallback invalid");
         RequestParam requestParam = new RequestParam(HttpMethod.POST);
-        requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.MSG_SEND_SYNC.getRequestCode()))
-                .addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv())
-                .addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc())
-                .addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp())
-                .addHeader(ProtocolKey.ClientInstanceKey.PID, liteClientConfig.getPid())
-                .addHeader(ProtocolKey.ClientInstanceKey.SYS, liteClientConfig.getSys())
-                .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, liteClientConfig.getUserName())
-                .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, liteClientConfig.getPassword())
-                .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
-                .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
-                .setTimeout(timeout)
-                .addBody(SendMessageRequestBody.PRODUCERGROUP, liteClientConfig.getProducerGroup())
-                .addBody(SendMessageRequestBody.TOPIC, message.getTopic())
-                .addBody(SendMessageRequestBody.CONTENT, message.getContent())
-                .addBody(SendMessageRequestBody.TTL, String.valueOf(timeout))
-                .addBody(SendMessageRequestBody.BIZSEQNO, message.getBizSeqNo())
-                .addBody(SendMessageRequestBody.UNIQUEID, message.getUniqueId());
+        requestParam.addHeader(ProtocolKey.REQUEST_CODE,
+            String.valueOf(RequestCode.MSG_SEND_SYNC.getRequestCode()))
+            .addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv())
+            .addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc())
+            .addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp())
+            .addHeader(ProtocolKey.ClientInstanceKey.PID, liteClientConfig.getPid())
+            .addHeader(ProtocolKey.ClientInstanceKey.SYS, liteClientConfig.getSys())
+            .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, liteClientConfig.getUserName())
+            .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, liteClientConfig.getPassword())
+            .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
+            .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
+            .setTimeout(timeout)
+            .addBody(SendMessageRequestBody.PRODUCERGROUP, liteClientConfig.getProducerGroup())
+            .addBody(SendMessageRequestBody.TOPIC, message.getTopic())
+            .addBody(SendMessageRequestBody.CONTENT, message.getContent())
+            .addBody(SendMessageRequestBody.TTL, String.valueOf(timeout))
+            .addBody(SendMessageRequestBody.BIZSEQNO, message.getBizSeqNo())
+            .addBody(SendMessageRequestBody.UNIQUEID, message.getUniqueId());
 
         long startTime = System.currentTimeMillis();
         String target = selectEventMesh();
 
         try (CloseableHttpClient httpClient = setHttpClient()) {
-            HttpUtil.post(httpClient, null, target, requestParam, new RRCallbackResponseHandlerAdapter(message, rrCallback, timeout));
+            HttpUtil.post(httpClient, null, target, requestParam,
+                new RRCallbackResponseHandlerAdapter(message, rrCallback, timeout));
         }
 
         if (logger.isDebugEnabled()) {
-            logger.debug("publish sync message by async, target:{}, cost:{}, message:{}", target, System.currentTimeMillis() - startTime, message);
+            logger.debug("publish sync message by async, target:{}, cost:{}, message:{}", target,
+                System.currentTimeMillis() - startTime, message);
         }
     }
 }
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/RRCallbackResponseHandlerAdapter.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/RRCallbackResponseHandlerAdapter.java
index aba948b..1dcefe5 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/RRCallbackResponseHandlerAdapter.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/RRCallbackResponseHandlerAdapter.java
@@ -17,27 +17,30 @@
 
 package org.apache.eventmesh.client.http.producer;
 
-import java.io.IOException;
-import java.nio.charset.Charset;
-
-
-import com.alibaba.fastjson.JSON;
-
-import org.apache.commons.lang3.time.DateFormatUtils;
 import org.apache.eventmesh.client.http.EventMeshRetObj;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.EventMeshException;
 import org.apache.eventmesh.common.LiteMessage;
 import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody;
 import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.lang3.time.DateFormatUtils;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.ResponseHandler;
 import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * RRCallbackResponseHandlerAdapter.
+ */
 public class RRCallbackResponseHandlerAdapter implements ResponseHandler<String> {
 
     public Logger logger = LoggerFactory.getLogger(RRCallbackResponseHandlerAdapter.class);
@@ -50,7 +53,8 @@ public class RRCallbackResponseHandlerAdapter implements ResponseHandler<String>
 
     private long timeout;
 
-    public RRCallbackResponseHandlerAdapter(LiteMessage liteMessage, RRCallback rrCallback, long timeout) {
+    public RRCallbackResponseHandlerAdapter(LiteMessage liteMessage, RRCallback rrCallback,
+                                            long timeout) {
         this.liteMessage = liteMessage;
         this.rrCallback = rrCallback;
         this.timeout = timeout;
@@ -58,26 +62,29 @@ public class RRCallbackResponseHandlerAdapter implements ResponseHandler<String>
     }
 
     @Override
-    public String handleResponse(HttpResponse response) throws ClientProtocolException, IOException {
+    public String handleResponse(HttpResponse response)
+        throws ClientProtocolException, IOException {
         if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
             rrCallback.onException(new EventMeshException(response.toString()));
             return response.toString();
         }
 
         if (System.currentTimeMillis() - createTime > timeout) {
-            String err = String.format("response too late, bizSeqNo=%s, uniqueId=%s, createTime=%s, ttl=%s, cost=%sms",
-                    liteMessage.getBizSeqNo(),
-                    liteMessage.getUniqueId(),
-                    DateFormatUtils.format(createTime, Constants.DATE_FORMAT),
-                    timeout,
-                    System.currentTimeMillis() - createTime);
+            String err = String.format(
+                "response too late, bizSeqNo=%s, uniqueId=%s, createTime=%s, ttl=%s, cost=%sms",
+                liteMessage.getBizSeqNo(),
+                liteMessage.getUniqueId(),
+                DateFormatUtils.format(createTime, Constants.DATE_FORMAT),
+                timeout,
+                System.currentTimeMillis() - createTime);
             logger.warn(err);
             rrCallback.onException(new EventMeshException(err));
             return err;
         }
 
-        String res = EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
-        EventMeshRetObj ret = JSON.parseObject(res, EventMeshRetObj.class);
+        String res =
+            EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
+        EventMeshRetObj ret = JsonUtils.deserialize(res, EventMeshRetObj.class);
         if (ret.getRetCode() != EventMeshRetCode.SUCCESS.getRetCode()) {
             rrCallback.onException(new EventMeshException(ret.getRetCode(), ret.getRetMsg()));
             return res;
@@ -86,9 +93,9 @@ public class RRCallbackResponseHandlerAdapter implements ResponseHandler<String>
         LiteMessage liteMessage = new LiteMessage();
         try {
             SendMessageResponseBody.ReplyMessage replyMessage =
-                    JSON.parseObject(ret.getRetMsg(), SendMessageResponseBody.ReplyMessage.class);
+                JsonUtils.deserialize(ret.getRetMsg(), SendMessageResponseBody.ReplyMessage.class);
             liteMessage.setContent(replyMessage.body).setProp(replyMessage.properties)
-                    .setTopic(replyMessage.topic);
+                .setTopic(replyMessage.topic);
             rrCallback.onSuccess(liteMessage);
         } catch (Exception ex) {
             rrCallback.onException(new EventMeshException(ex));
diff --git a/eventmesh-security-plugin/eventmesh-security-acl/build.gradle b/eventmesh-security-plugin/eventmesh-security-acl/build.gradle
index 1a18796..1ee3c75 100644
--- a/eventmesh-security-plugin/eventmesh-security-acl/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-acl/build.gradle
@@ -16,7 +16,7 @@
  */
 
 dependencies {
-    api project(":eventmesh-security-plugin:eventmesh-security-api")
+    implementation project(":eventmesh-security-plugin:eventmesh-security-api")
 
     testImplementation project(":eventmesh-security-plugin:eventmesh-security-api")
 }
diff --git a/eventmesh-security-plugin/eventmesh-security-api/build.gradle b/eventmesh-security-plugin/eventmesh-security-api/build.gradle
index 0d41042..926cdba 100644
--- a/eventmesh-security-plugin/eventmesh-security-api/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-api/build.gradle
@@ -16,7 +16,7 @@
  */
 
 dependencies {
-    api project(":eventmesh-spi")
+    compileOnly project(":eventmesh-spi")
 
     testImplementation project(":eventmesh-spi")
 }
diff --git a/eventmesh-spi/build.gradle b/eventmesh-spi/build.gradle
index f5ce59f..7c61ebc 100644
--- a/eventmesh-spi/build.gradle
+++ b/eventmesh-spi/build.gradle
@@ -15,6 +15,6 @@
  * limitations under the License.
  */
 dependencies {
-    api project(":eventmesh-common")
+    compileOnly project(":eventmesh-common")
     testImplementation project(":eventmesh-common")
 }
\ No newline at end of file
diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java
index 3692bc1..0ac3d40 100644
--- a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java
@@ -17,12 +17,8 @@
 
 package org.apache.eventmesh.spi.loader;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
 import org.apache.eventmesh.spi.EventMeshSPI;
 import org.apache.eventmesh.spi.ExtensionException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -31,6 +27,7 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
@@ -38,6 +35,12 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
 /**
  * Load extension from '${eventMeshPluginDir}', the default loading directory is './plugin'
  */
@@ -46,30 +49,36 @@ public class JarExtensionClassLoader implements ExtensionClassLoader {
     private static final Logger logger = LoggerFactory.getLogger(JarExtensionClassLoader.class);
 
     private static final ConcurrentHashMap<Class<?>, Map<String, Class<?>>> EXTENSION_CLASS_CACHE =
-            new ConcurrentHashMap<>(16);
+        new ConcurrentHashMap<>(16);
 
-    private static final String EVENTMESH_EXTENSION_PLUGIN_DIR = System.getProperty("eventMeshPluginDir",
+    private static final String EVENTMESH_EXTENSION_PLUGIN_DIR =
+        System.getProperty("eventMeshPluginDir",
             Joiner.on(File.separator).join(Lists.newArrayList(".", "plugin")));
 
     // META-INF/eventmesh
-    private static final String EVENTMESH_EXTENSION_META_DIR = Paths.get("META-INF", "eventmesh").toString();
+    private static final String EVENTMESH_EXTENSION_META_DIR =
+        Paths.get("META-INF", "eventmesh").toString();
 
     @Override
-    public <T> Map<String, Class<?>> loadExtensionClass(Class<T> extensionType, String extensionInstanceName) {
-        return EXTENSION_CLASS_CACHE.computeIfAbsent(extensionType, t -> doLoadExtensionClass(t, extensionInstanceName));
+    public <T> Map<String, Class<?>> loadExtensionClass(Class<T> extensionType,
+                                                        String extensionInstanceName) {
+        return EXTENSION_CLASS_CACHE
+            .computeIfAbsent(extensionType, t -> doLoadExtensionClass(t, extensionInstanceName));
     }
 
-    private <T> Map<String, Class<?>> doLoadExtensionClass(Class<T> extensionType, String extensionInstanceName) {
+    private <T> Map<String, Class<?>> doLoadExtensionClass(Class<T> extensionType,
+                                                           String extensionInstanceName) {
         Map<String, Class<?>> extensionMap = new HashMap<>(16);
-        EventMeshSPI eventMeshSPIAnnotation = extensionType.getAnnotation(EventMeshSPI.class);
+        EventMeshSPI eventMeshSpiAnnotation = extensionType.getAnnotation(EventMeshSPI.class);
 
         String pluginDir = Paths.get(
-                EVENTMESH_EXTENSION_PLUGIN_DIR,
-                eventMeshSPIAnnotation.eventMeshExtensionType().getExtensionTypeName(),
-                extensionInstanceName
+            EVENTMESH_EXTENSION_PLUGIN_DIR,
+            eventMeshSpiAnnotation.eventMeshExtensionType().getExtensionTypeName(),
+            extensionInstanceName
         ).toString();
 
-        String extensionFileName = EVENTMESH_EXTENSION_META_DIR + File.separator + extensionType.getName();
+        String extensionFileName =
+            EVENTMESH_EXTENSION_META_DIR + File.separator + extensionType.getName();
         EventMeshUrlClassLoader urlClassLoader = EventMeshUrlClassLoader.getInstance();
         urlClassLoader.addUrls(loadJarPathFromResource(pluginDir));
         try {
@@ -106,10 +115,15 @@ public class JarExtensionClassLoader implements ExtensionClassLoader {
                 pluginUrls.addAll(loadJarPathFromResource(file.getPath()));
             }
         }
+        // TODO: Sort the path here just to guarantee load the ConsumeMessageConcurrentlyService
+        //  defined in EventMesh rather than defined in rocketmq
+        pluginUrls.sort(Comparator.comparing(URL::getPath));
         return pluginUrls;
     }
 
-    private static <T> Map<String, Class<?>> loadResources(URLClassLoader urlClassLoader, URL url, Class<T> extensionType) throws IOException {
+    private static <T> Map<String, Class<?>> loadResources(URLClassLoader urlClassLoader, URL url,
+                                                           Class<T> extensionType)
+        throws IOException {
         Map<String, Class<?>> extensionMap = new HashMap<>();
         try (InputStream inputStream = url.openStream()) {
             Properties properties = new Properties();
@@ -119,11 +133,13 @@ public class JarExtensionClassLoader implements ExtensionClassLoader {
                 String extensionClassStr = (String) extensionClass;
                 try {
                     Class<?> targetClass = urlClassLoader.loadClass(extensionClassStr);
-                    logger.info("load extension class success, extensionType: {}, extensionClass: {}",
+                    logger
+                        .info("load extension class success, extensionType: {}, extensionClass: {}",
                             extensionType, targetClass);
                     if (!extensionType.isAssignableFrom(targetClass)) {
                         throw new ExtensionException(
-                                String.format("class: %s is not subClass of %s", targetClass, extensionType));
+                            String.format("class: %s is not subClass of %s", targetClass,
+                                extensionType));
                     }
                     extensionMap.put(extensionNameStr, targetClass);
                 } catch (ClassNotFoundException e) {
diff --git a/style/checkStyle.xml b/style/checkStyle.xml
index 98ec78f..8c0a35d 100644
--- a/style/checkStyle.xml
+++ b/style/checkStyle.xml
@@ -327,20 +327,20 @@
     <module name="AtclauseOrder">
       <property name="tagOrder" value="@param, @return, @throws, @deprecated"/>
       <property name="target"
-                value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF, VARIABLE_DEF"/>
+                value="CLASS_DEF, ENUM_DEF, METHOD_DEF, VARIABLE_DEF"/>
     </module>
     <module name="JavadocMethod">
       <property name="accessModifiers" value="public"/>
       <property name="allowMissingParamTags" value="true"/>
       <property name="allowMissingReturnTag" value="true"/>
       <property name="allowedAnnotations" value="Override, Test"/>
-      <property name="tokens" value="METHOD_DEF, CTOR_DEF, ANNOTATION_FIELD_DEF, COMPACT_CTOR_DEF"/>
+      <property name="tokens" value="METHOD_DEF, ANNOTATION_FIELD_DEF"/>
     </module>
     <module name="MissingJavadocMethod">
       <property name="scope" value="public"/>
       <property name="minLineCount" value="2"/>
       <property name="allowedAnnotations" value="Override, Test"/>
-      <property name="tokens" value="METHOD_DEF, CTOR_DEF, ANNOTATION_FIELD_DEF,
+      <property name="tokens" value="METHOD_DEF,
                                    COMPACT_CTOR_DEF"/>
     </module>
     <module name="MissingJavadocType">
@@ -362,11 +362,6 @@
     <module name="CommentsIndentation">
       <property name="tokens" value="SINGLE_LINE_COMMENT, BLOCK_COMMENT_BEGIN"/>
     </module>
-    <module name="SuppressionXpathFilter">
-      <property name="file" value="${org.checkstyle.google.suppressionxpathfilter.config}"
-                default="checkstyle-xpath-suppressions.xml" />
-      <property name="optional" value="true"/>
-    </module>
   </module>
 
 </module>
diff --git a/tool/license/allowed-licenses.txt b/tool/license/allowed-licenses.txt
index 976d6c3..580554d 100644
--- a/tool/license/allowed-licenses.txt
+++ b/tool/license/allowed-licenses.txt
@@ -24,7 +24,7 @@
         },
         {
             "moduleLicense": "Apache License, Version 2.0",
-            "moduleVersion": "1.2.71",
+            "moduleVersion": "1.2.69",
             "moduleName": "com.alibaba:fastjson"
         },
         {
@@ -372,7 +372,7 @@
         },
         {
             "moduleLicense": "Apache License 2.0",
-            "moduleVersion": "3.24.0-GA",
+            "moduleVersion": "3.20.0-GA",
             "moduleName": "org.javassist:javassist"
         },
         {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org