You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2023/01/03 09:20:45 UTC

[incubator-eventmesh] branch master updated: fix issue2780

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 25450ad24 fix issue2780
     new 26eab9911 Merge pull request #2781 from jonyangx/issue2780
25450ad24 is described below

commit 25450ad24d03243c12c38cdc4cc7949de6522abd
Author: jonyangx <ya...@gmail.com>
AuthorDate: Sun Jan 1 14:38:48 2023 +0800

    fix issue2780
---
 .../protocol/http/consumer/EventMeshConsumer.java  |  80 +++++------
 .../eventmesh/runtime/util/EventMeshUtil.java      | 152 +++++++++++----------
 2 files changed, 121 insertions(+), 111 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index 99da5630a..ffcbfba9d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -104,21 +104,21 @@ public class EventMeshConsumer {
 
         EventListener clusterEventListener = (event, context) -> {
             String protocolVersion =
-                Objects.requireNonNull(event.getSpecVersion()).toString();
+                    Objects.requireNonNull(event.getSpecVersion()).toString();
 
             Span span = TraceUtils.prepareServerSpan(
-                EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
-                EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
+                    EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
+                    EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
             try {
                 String topic = event.getSubject();
                 String bizSeqNo = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO)).toString();
                 String uniqueId = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID)).toString();
 
                 event = CloudEventBuilder.from(event)
-                    .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
-                    .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
-                        eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
-                    .build();
+                        .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
+                        .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+                                eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
+                        .build();
                 if (messageLogger.isDebugEnabled()) {
                     messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event);
                 } else {
@@ -126,12 +126,12 @@ public class EventMeshConsumer {
                 }
 
                 ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(),
-                    topic, null);
+                        topic, null);
                 EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
 
                 if (currentTopicConfig == null) {
                     logger.error("no topicConfig found, consumerGroup:{} topic:{}",
-                        consumerGroupConf.getConsumerGroup(), topic);
+                            consumerGroupConf.getConsumerGroup(), topic);
                     try {
                         sendMessageBack(event, uniqueId, bizSeqNo);
                         eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
@@ -142,11 +142,13 @@ public class EventMeshConsumer {
                 }
 
                 SubscriptionItem subscriptionItem =
-                    consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem();
-                HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
-                    consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
-                    topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
-                    consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
+                        consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem();
+                HandleMsgContext handleMsgContext = new HandleMsgContext(
+                        EventMeshUtil.buildPushMsgSeqNo(),
+                        consumerGroupConf.getConsumerGroup(),
+                        EventMeshConsumer.this,
+                        topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
+                        consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
 
                 if (httpMessageHandler.handle(handleMsgContext)) {
                     eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
@@ -176,19 +178,19 @@ public class EventMeshConsumer {
         EventListener broadcastEventListener = (event, context) -> {
 
             String protocolVersion =
-                Objects.requireNonNull(event.getSpecVersion()).toString();
+                    Objects.requireNonNull(event.getSpecVersion()).toString();
 
             Span span = TraceUtils.prepareServerSpan(
-                EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
-                EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
+                    EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
+                    EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
             try {
 
                 event = CloudEventBuilder.from(event)
-                    .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
-                        String.valueOf(System.currentTimeMillis()))
-                    .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
-                        eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
-                    .build();
+                        .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+                                String.valueOf(System.currentTimeMillis()))
+                        .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+                                eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
+                        .build();
 
                 String topic = event.getSubject();
                 String bizSeqNo = getEventExtension(event, ProtocolKey.ClientInstanceKey.BIZSEQNO, "");
@@ -198,18 +200,18 @@ public class EventMeshConsumer {
                     messageLogger.debug("message|mq2eventMesh|topic={}|msg={}", topic, event);
                 } else {
                     messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}",
-                        topic, bizSeqNo,
-                        uniqueId);
+                            topic, bizSeqNo,
+                            uniqueId);
                 }
 
                 ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(
-                    consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
+                        consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
                 EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
-                    (EventMeshAsyncConsumeContext) context;
+                        (EventMeshAsyncConsumeContext) context;
 
                 if (currentTopicConfig == null) {
                     logger.error("no topicConfig found, consumerGroup:{} topic:{}",
-                        consumerGroupConf.getConsumerGroup(), topic);
+                            consumerGroupConf.getConsumerGroup(), topic);
                     try {
                         sendMessageBack(event, uniqueId, bizSeqNo);
                         eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
@@ -220,15 +222,15 @@ public class EventMeshConsumer {
                 }
 
                 SubscriptionItem subscriptionItem =
-                    consumerGroupConf.getConsumerGroupTopicConf().get(topic)
-                        .getSubscriptionItem();
+                        consumerGroupConf.getConsumerGroupTopicConf().get(topic)
+                                .getSubscriptionItem();
                 HandleMsgContext handleMsgContext =
-                    new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
-                        consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
-                        topic, event, subscriptionItem,
-                        eventMeshAsyncConsumeContext.getAbstractContext(),
-                        consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId,
-                        currentTopicConfig);
+                        new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
+                                consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
+                                topic, event, subscriptionItem,
+                                eventMeshAsyncConsumeContext.getAbstractContext(),
+                                consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId,
+                                currentTopicConfig);
 
                 if (httpMessageHandler.handle(handleMsgContext)) {
                     eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
@@ -310,16 +312,16 @@ public class EventMeshConsumer {
     public void sendMessageBack(final CloudEvent event, final String uniqueId, String bizSeqNo) throws Exception {
 
         EventMeshProducer sendMessageBack
-            = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(consumerGroupConf.getConsumerGroup());
+                = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(consumerGroupConf.getConsumerGroup());
 
         if (sendMessageBack == null) {
             logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqNo:{}, uniqueId:{}",
-                consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
+                    consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
             return;
         }
 
         final SendMessageContext sendMessageBackContext = new SendMessageContext(bizSeqNo, event, sendMessageBack,
-            eventMeshHTTPServer);
+                eventMeshHTTPServer);
 
         sendMessageBack.send(sendMessageBackContext, new SendCallback() {
             @Override
@@ -329,7 +331,7 @@ public class EventMeshConsumer {
             @Override
             public void onException(OnExceptionContext context) {
                 logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}",
-                    consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
+                        consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
             }
         });
     }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
index 01ffb58d2..1303529aa 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
@@ -37,11 +37,11 @@ import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TimeZone;
 import java.util.concurrent.ThreadPoolExecutor;
 
@@ -63,29 +63,42 @@ public class EventMeshUtil {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(EventMeshUtil.class);
 
-    private static final Logger TCPLOGGER = LoggerFactory.getLogger("tcpMonitor");
-
     public static String buildPushMsgSeqNo() {
-        return StringUtils.rightPad(String.valueOf(System.currentTimeMillis()), 6)
-                + RandomStringUtils.generateNum(4);
+        return new StringBuilder()
+                .append(StringUtils.rightPad(String.valueOf(System.currentTimeMillis()), 6))
+                .append(RandomStringUtils.generateNum(4))
+                .toString();
+
     }
 
-    public static String buildMeshClientID(String clientGroup, String meshCluster) {
-        return StringUtils.trim(clientGroup)
-                + "(" + StringUtils.trim(meshCluster) + ")"
-                + "-" + EventMeshVersion.getCurrentVersionDesc()
-                + "-" + ThreadUtils.getPID();
+    public static String buildMeshClientID(final String clientGroup, final String meshCluster) {
+        return new StringBuilder()
+                .append(StringUtils.trim(clientGroup))
+                .append('(')
+                .append(StringUtils.trim(meshCluster))
+                .append(')')
+                .append('-')
+                .append(EventMeshVersion.getCurrentVersionDesc())
+                .append('-')
+                .append(ThreadUtils.getPID())
+                .toString();
     }
 
-    public static String buildMeshTcpClientID(String clientSysId, String purpose, String meshCluster) {
-        return StringUtils.trim(clientSysId)
-                + "-" + StringUtils.trim(purpose)
-                + "-" + StringUtils.trim(meshCluster)
-                + "-" + EventMeshVersion.getCurrentVersionDesc()
-                + "-" + ThreadUtils.getPID();
+    public static String buildMeshTcpClientID(final String clientSysId, final String purpose,
+                                              final String meshCluster) {
+        return new StringBuilder().append(StringUtils.trim(clientSysId))
+                .append('-')
+                .append(StringUtils.trim(purpose))
+                .append('-')
+                .append(StringUtils.trim(meshCluster))
+                .append('-')
+                .append(EventMeshVersion.getCurrentVersionDesc())
+                .append('-')
+                .append(ThreadUtils.getPID())
+                .toString();
     }
 
-    public static String buildClientGroup(String systemId) {
+    public static String buildClientGroup(final String systemId) {
         return systemId;
     }
 
@@ -95,24 +108,25 @@ public class EventMeshUtil {
      * @param e
      * @return stacktrace
      */
-    public static String stackTrace(Throwable e) {
+    public static String stackTrace(final Throwable e) {
         return stackTrace(e, 0);
     }
 
-    public static String stackTrace(Throwable e, int level) {
+    public static String stackTrace(final Throwable e, final int level) {
         if (e == null) {
             return null;
         }
 
-        StackTraceElement[] eles = e.getStackTrace();
-        int localLevel = (level == 0) ? eles.length : level;
-        StringBuilder sb = new StringBuilder();
+        final StackTraceElement[] eles = e.getStackTrace();
+        final int localLevel = (level == 0) ? eles.length : level;
+        final StringBuilder sb = new StringBuilder();
         sb.append(e.getMessage()).append(System.lineSeparator());
 
         int innerLevel = 0;
-        for (StackTraceElement ele : eles) {
+        for (final StackTraceElement ele : eles) {
             sb.append(ele).append(System.lineSeparator());
-            if (++innerLevel >= localLevel) {
+            innerLevel++;
+            if (innerLevel >= localLevel) {
                 break;
             }
         }
@@ -120,12 +134,11 @@ public class EventMeshUtil {
     }
 
     public static ObjectMapper createJsoner() {
-        ObjectMapper jsonMapper = new ObjectMapper();
-        jsonMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
-        jsonMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
-        jsonMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
-        jsonMapper.setTimeZone(TimeZone.getDefault());
-        return jsonMapper;
+        return new ObjectMapper()
+                .setSerializationInclusion(JsonInclude.Include.NON_NULL)
+                .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
+                .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
+                .setTimeZone(TimeZone.getDefault());
     }
 
 
@@ -135,8 +148,8 @@ public class EventMeshUtil {
      * @param eventMeshMessage
      * @return message string
      */
-    public static String printMqMessage(EventMeshMessage eventMeshMessage) {
-        Map<String, String> properties = eventMeshMessage.getProperties();
+    public static String printMqMessage(final EventMeshMessage eventMeshMessage) {
+        final Map<String, String> properties = eventMeshMessage.getProperties();
 
         String keys = properties.get(EventMeshConstants.KEYS_UPPERCASE);
         if (StringUtils.isBlank(keys)) {
@@ -147,8 +160,7 @@ public class EventMeshUtil {
                 properties.get(EventMeshConstants.TTL), properties.get(EventMeshConstants.RR_REQUEST_UNIQ_ID), keys);
     }
 
-    public static String getMessageBizSeq(CloudEvent event) {
-
+    public static String getMessageBizSeq(final CloudEvent event) {
         String keys = (String) event.getExtension(EventMeshConstants.KEYS_UPPERCASE);
         if (StringUtils.isBlank(keys)) {
             keys = (String) event.getExtension(EventMeshConstants.KEYS_LOWERCASE);
@@ -157,10 +169,9 @@ public class EventMeshUtil {
         return keys;
     }
 
-    public static Map<String, String> getEventProp(CloudEvent event) {
-        Set<String> extensionSet = event.getExtensionNames();
-        Map<String, String> propMap = new HashMap<>();
-        for (String extensionKey : extensionSet) {
+    public static Map<String, String> getEventProp(final CloudEvent event) {
+        final Map<String, String> propMap = new HashMap<>();
+        for (final String extensionKey : event.getExtensionNames()) {
             propMap.put(extensionKey, event.getExtension(extensionKey) == null ? ""
                     : event.getExtension(extensionKey).toString());
         }
@@ -169,19 +180,18 @@ public class EventMeshUtil {
 
     public static String getLocalAddr() {
         //priority of networkInterface when generating client ip
-        String priority = System.getProperty("networkInterface.priority", "bond1<eth1<eth0");
+        final String priority = System.getProperty("networkInterface.priority", "bond1<eth1<eth0");
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("networkInterface.priority: {}", priority);
         }
 
-        List<String> preferList = new ArrayList<String>();
-        for (String eth : priority.split("<")) {
-            preferList.add(eth);
-        }
+        final List<String> preferList = new ArrayList<>();
+        Arrays.stream(priority.split("<")).forEach(preferList::add);
+
         NetworkInterface preferNetworkInterface = null;
 
         try {
-            Enumeration<NetworkInterface> enumeration1 = NetworkInterface.getNetworkInterfaces();
+            final Enumeration<NetworkInterface> enumeration1 = NetworkInterface.getNetworkInterfaces();
             while (enumeration1.hasMoreElements()) {
                 final NetworkInterface networkInterface = enumeration1.nextElement();
                 if (!preferList.contains(networkInterface.getName())) {
@@ -196,8 +206,8 @@ public class EventMeshUtil {
             }
 
             // Traversal Network interface to get the first non-loopback and non-private address
-            ArrayList<String> ipv4Result = new ArrayList<String>();
-            ArrayList<String> ipv6Result = new ArrayList<String>();
+            final ArrayList<String> ipv4Result = new ArrayList<>();
+            final ArrayList<String> ipv6Result = new ArrayList<>();
 
             if (preferNetworkInterface != null) {
                 if (LOGGER.isDebugEnabled()) {
@@ -209,7 +219,7 @@ public class EventMeshUtil {
                 if (LOGGER.isDebugEnabled()) {
                     LOGGER.debug("no preferNetworkInterface");
                 }
-                Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces();
+                final Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces();
                 while (enumeration.hasMoreElements()) {
                     final NetworkInterface networkInterface = enumeration.nextElement();
                     final Enumeration<InetAddress> en = networkInterface.getInetAddresses();
@@ -219,12 +229,10 @@ public class EventMeshUtil {
 
             // prefer ipv4
             if (!ipv4Result.isEmpty()) {
-                for (String ip : ipv4Result) {
-                    if (ip.startsWith("127.0") || ip.startsWith("192.168")) {
-                        continue;
+                for (final String ip : ipv4Result) {
+                    if (!ip.startsWith("127.0") && !ip.startsWith("192.168")) {
+                        return ip;
                     }
-
-                    return ip;
                 }
 
                 return ipv4Result.get(ipv4Result.size() - 1);
@@ -248,8 +256,8 @@ public class EventMeshUtil {
         }
     }
 
-    private static void getIpResult(List<String> ipv4Result, List<String> ipv6Result,
-                                    Enumeration<InetAddress> en) {
+    private static void getIpResult(final List<String> ipv4Result, final List<String> ipv6Result,
+                                    final Enumeration<InetAddress> en) {
         while (en.hasMoreElements()) {
             final InetAddress address = en.nextElement();
             if (address.isLoopbackAddress()) {
@@ -265,26 +273,26 @@ public class EventMeshUtil {
         }
     }
 
-    public static String buildUserAgentClientId(UserAgent client) {
+    public static String buildUserAgentClientId(final UserAgent client) {
         if (client == null) {
             return null;
         }
 
-        StringBuilder sb = new StringBuilder();
-        sb.append(client.getSubsystem())
-                .append("-")
-                .append("-")
+        return new StringBuilder()
+                .append(client.getSubsystem())
+                .append('-')
+                .append('-')
                 .append(client.getPid())
-                .append("-")
+                .append('-')
                 .append(client.getHost())
-                .append(":")
-                .append(client.getPort());
-        return sb.toString();
+                .append(':')
+                .append(client.getPort())
+                .toString();
     }
 
-    public static void printState(ThreadPoolExecutor scheduledExecutorService) {
-        if (TCPLOGGER.isInfoEnabled()) {
-            TCPLOGGER.info("{} [{} {} {} {}]", ((EventMeshThreadFactoryImpl) scheduledExecutorService.getThreadFactory())
+    public static void printState(final ThreadPoolExecutor scheduledExecutorService) {
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("{} [{} {} {} {}]", ((EventMeshThreadFactoryImpl) scheduledExecutorService.getThreadFactory())
                     .getThreadNamePrefix(), scheduledExecutorService.getQueue().size(), scheduledExecutorService
                     .getPoolSize(), scheduledExecutorService.getActiveCount(), scheduledExecutorService
                     .getCompletedTaskCount());
@@ -300,7 +308,7 @@ public class EventMeshUtil {
      * @throws ClassNotFoundException
      */
     @SuppressWarnings("unchecked")
-    public static <T> T cloneObject(T object) throws IOException, ClassNotFoundException {
+    public static <T> T cloneObject(final T object) throws IOException, ClassNotFoundException {
         try (ByteArrayOutputStream byOut = new ByteArrayOutputStream();
              ObjectOutputStream outputStream = new ObjectOutputStream(byOut)) {
 
@@ -315,14 +323,14 @@ public class EventMeshUtil {
 
     }
 
-    public static Map<String, Object> getCloudEventExtensionMap(String protocolVersion,
-                                                                CloudEvent cloudEvent) {
-        EventMeshCloudEventWriter eventMeshCloudEventWriter = new EventMeshCloudEventWriter();
+    public static Map<String, Object> getCloudEventExtensionMap(final String protocolVersion,
+                                                                final CloudEvent cloudEvent) {
+        final EventMeshCloudEventWriter eventMeshCloudEventWriter = new EventMeshCloudEventWriter();
         if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)
-                && (cloudEvent instanceof CloudEventV1)) {
+                && cloudEvent instanceof CloudEventV1) {
             ((CloudEventV1) cloudEvent).readContext(eventMeshCloudEventWriter);
         } else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)
-                && (cloudEvent instanceof CloudEventV03)) {
+                && cloudEvent instanceof CloudEventV03) {
             ((CloudEventV03) cloudEvent).readContext(eventMeshCloudEventWriter);
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org