You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2023/02/17 23:46:06 UTC
[incubator-eventmesh] branch master updated: [ISSUE #3183]Refactor Codec (#3187)
This is an automated email from the ASF dual-hosted git repository.
jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new a9c4bb798 [ISSUE #3183]Refactor Codec (#3187)
a9c4bb798 is described below
commit a9c4bb798d93f6fce01d8604c60761f584e8ac27
Author: mxsm <lj...@gmail.com>
AuthorDate: Sat Feb 18 07:46:00 2023 +0800
[ISSUE #3183]Refactor Codec (#3187)
---
.../eventmesh/common/protocol/tcp/codec/Codec.java | 38 +++++-----------------
1 file changed, 8 insertions(+), 30 deletions(-)
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
index e11a8f24e..e3a2c6c30 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
@@ -31,18 +31,14 @@ import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.List;
-import java.util.TimeZone;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.ReplayingDecoder;
-import com.fasterxml.jackson.annotation.JsonInclude;
+
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
@@ -55,22 +51,6 @@ public class Codec {
private static final byte[] CONSTANT_MAGIC_FLAG = serializeBytes("EventMesh");
private static final byte[] VERSION = serializeBytes("0000");
- // todo: move to constants
- public static final String CLOUD_EVENTS_PROTOCOL_NAME = "cloudevents";
- public static final String EM_MESSAGE_PROTOCOL_NAME = "eventmeshmessage";
- public static final String OPEN_MESSAGE_PROTOCOL_NAME = "openmessage";
-
- // todo: use json util
- private static ObjectMapper OBJECT_MAPPER;
-
- static {
- OBJECT_MAPPER = new ObjectMapper();
- OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
- OBJECT_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
- OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
- OBJECT_MAPPER.setTimeZone(TimeZone.getDefault());
- }
-
public static class Encoder extends MessageToByteEncoder<Package> {
@Override
public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf out) throws Exception {
@@ -81,13 +61,13 @@ public class Codec {
log.debug("Encoder pkg={}", JsonUtils.toJSONString(pkg));
}
- final byte[] headerData = serializeBytes(OBJECT_MAPPER.writeValueAsString(header));
+ final byte[] headerData = JsonUtils.toJSONBytes(header);
final byte[] bodyData;
- if (StringUtils.equals(CLOUD_EVENTS_PROTOCOL_NAME, header.getStringProperty(Constants.PROTOCOL_TYPE))) {
+ if (StringUtils.equals(Constants.CLOUD_EVENTS_PROTOCOL_NAME, header.getStringProperty(Constants.PROTOCOL_TYPE))) {
bodyData = (byte[]) pkg.getBody();
} else {
- bodyData = serializeBytes(OBJECT_MAPPER.writeValueAsString(pkg.getBody()));
+ bodyData = JsonUtils.toJSONBytes(pkg.getBody());
}
int headerLength = ArrayUtils.getLength(headerData);
@@ -159,7 +139,7 @@ public class Codec {
if (log.isDebugEnabled()) {
log.debug("Decode headerJson={}", deserializeBytes(headerData));
}
- return OBJECT_MAPPER.readValue(deserializeBytes(headerData), Header.class);
+ return JsonUtils.parseObject(headerData, Header.class);
}
private Object parseBody(ByteBuf in, Header header, int bodyLength) throws JsonProcessingException {
@@ -188,10 +168,10 @@ public class Codec {
switch (command) {
case HELLO_REQUEST:
case RECOMMEND_REQUEST:
- return OBJECT_MAPPER.readValue(bodyJsonString, UserAgent.class);
+ return JsonUtils.parseObject(bodyJsonString, UserAgent.class);
case SUBSCRIBE_REQUEST:
case UNSUBSCRIBE_REQUEST:
- return OBJECT_MAPPER.readValue(bodyJsonString, Subscription.class);
+ return JsonUtils.parseObject(bodyJsonString, Subscription.class);
case REQUEST_TO_SERVER:
case RESPONSE_TO_SERVER:
case ASYNC_MESSAGE_TO_SERVER:
@@ -208,7 +188,7 @@ public class Codec {
// just a string.
return bodyJsonString;
case REDIRECT_TO_CLIENT:
- return OBJECT_MAPPER.readValue(bodyJsonString, RedirectInfo.class);
+ return JsonUtils.parseObject(bodyJsonString, RedirectInfo.class);
default:
if (log.isWarnEnabled()) {
log.warn("Invalidate TCP command: {}", command);
@@ -239,6 +219,4 @@ public class Codec {
}
return str.getBytes(Constants.DEFAULT_CHARSET);
}
-
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org