You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2023/02/17 23:46:06 UTC

[incubator-eventmesh] branch master updated: [ISSUE #3183]Refactor Codec (#3187)

This is an automated email from the ASF dual-hosted git repository.

jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new a9c4bb798 [ISSUE #3183]Refactor Codec (#3187)
a9c4bb798 is described below

commit a9c4bb798d93f6fce01d8604c60761f584e8ac27
Author: mxsm <lj...@gmail.com>
AuthorDate: Sat Feb 18 07:46:00 2023 +0800

    [ISSUE #3183]Refactor Codec (#3187)
---
 .../eventmesh/common/protocol/tcp/codec/Codec.java | 38 +++++-----------------
 1 file changed, 8 insertions(+), 30 deletions(-)

diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
index e11a8f24e..e3a2c6c30 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
@@ -31,18 +31,14 @@ import org.apache.commons.lang3.StringUtils;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.TimeZone;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToByteEncoder;
 import io.netty.handler.codec.ReplayingDecoder;
 
-import com.fasterxml.jackson.annotation.JsonInclude;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
 import com.google.common.base.Preconditions;
 
 import lombok.extern.slf4j.Slf4j;
@@ -55,22 +51,6 @@ public class Codec {
     private static final byte[] CONSTANT_MAGIC_FLAG = serializeBytes("EventMesh");
     private static final byte[] VERSION = serializeBytes("0000");
 
-    // todo: move to constants
-    public static final String CLOUD_EVENTS_PROTOCOL_NAME = "cloudevents";
-    public static final String EM_MESSAGE_PROTOCOL_NAME = "eventmeshmessage";
-    public static final String OPEN_MESSAGE_PROTOCOL_NAME = "openmessage";
-
-    // todo: use json util
-    private static ObjectMapper OBJECT_MAPPER;
-
-    static {
-        OBJECT_MAPPER = new ObjectMapper();
-        OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
-        OBJECT_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
-        OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
-        OBJECT_MAPPER.setTimeZone(TimeZone.getDefault());
-    }
-
     public static class Encoder extends MessageToByteEncoder<Package> {
         @Override
         public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws Exception {
@@ -81,13 +61,13 @@ public class Codec {
                 log.debug("Encoder pkg={}", JsonUtils.toJSONString(pkg));
             }
 
-            final byte[] headerData = serializeBytes(OBJECT_MAPPER.writeValueAsString(header));
+            final byte[] headerData = JsonUtils.toJSONBytes(header);
             final byte[] bodyData;
 
-            if (StringUtils.equals(CLOUD_EVENTS_PROTOCOL_NAME, header.getStringProperty(Constants.PROTOCOL_TYPE))) {
+            if (StringUtils.equals(Constants.CLOUD_EVENTS_PROTOCOL_NAME, header.getStringProperty(Constants.PROTOCOL_TYPE))) {
                 bodyData = (byte[]) pkg.getBody();
             } else {
-                bodyData = serializeBytes(OBJECT_MAPPER.writeValueAsString(pkg.getBody()));
+                bodyData = JsonUtils.toJSONBytes(pkg.getBody());
             }
 
             int headerLength = ArrayUtils.getLength(headerData);
@@ -159,7 +139,7 @@ public class Codec {
             if (log.isDebugEnabled()) {
                 log.debug("Decode headerJson={}", deserializeBytes(headerData));
             }
-            return OBJECT_MAPPER.readValue(deserializeBytes(headerData), Header.class);
+            return JsonUtils.parseObject(headerData, Header.class);
         }
 
         private Object parseBody(ByteBuf in, Header header, int bodyLength) throws JsonProcessingException {
@@ -188,10 +168,10 @@ public class Codec {
         switch (command) {
             case HELLO_REQUEST:
             case RECOMMEND_REQUEST:
-                return OBJECT_MAPPER.readValue(bodyJsonString, UserAgent.class);
+                return JsonUtils.parseObject(bodyJsonString, UserAgent.class);
             case SUBSCRIBE_REQUEST:
             case UNSUBSCRIBE_REQUEST:
-                return OBJECT_MAPPER.readValue(bodyJsonString, Subscription.class);
+                return JsonUtils.parseObject(bodyJsonString, Subscription.class);
             case REQUEST_TO_SERVER:
             case RESPONSE_TO_SERVER:
             case ASYNC_MESSAGE_TO_SERVER:
@@ -208,7 +188,7 @@ public class Codec {
                 // just a string.
                 return bodyJsonString;
             case REDIRECT_TO_CLIENT:
-                return OBJECT_MAPPER.readValue(bodyJsonString, RedirectInfo.class);
+                return JsonUtils.parseObject(bodyJsonString, RedirectInfo.class);
             default:
                 if (log.isWarnEnabled()) {
                     log.warn("Invalidate TCP command: {}", command);
@@ -239,6 +219,4 @@ public class Codec {
         }
         return str.getBytes(Constants.DEFAULT_CHARSET);
     }
-
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org