You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2022/12/15 03:47:26 UTC

[incubator-eventmesh] branch master updated: [ISSUE #2518] Method invocation 'toString' may produce 'NullPointerException' [GrpcMessageProtocolResolver] (#2587)

This is an automated email from the ASF dual-hosted git repository.

jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 73f0ed6d3 [ISSUE #2518] Method invocation 'toString' may produce 'NullPointerException' [GrpcMessageProtocolResolver] (#2587)
73f0ed6d3 is described below

commit 73f0ed6d370fcc9a99ef5296aacfa25e01cec500
Author: weihubeats <we...@163.com>
AuthorDate: Thu Dec 15 11:47:20 2022 +0800

    [ISSUE #2518] Method invocation 'toString' may produce 'NullPointerException' [GrpcMessageProtocolResolver] (#2587)
    
    * simplify code
    
    * Simplify the code
---
 .../resolver/grpc/GrpcMessageProtocolResolver.java | 173 +++++++++------------
 1 file changed, 75 insertions(+), 98 deletions(-)

diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/grpc/GrpcMessageProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/grpc/GrpcMessageProtocolResolver.java
index ed7c94eed..637f2b48e 100644
--- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/grpc/GrpcMessageProtocolResolver.java
+++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/grpc/GrpcMessageProtocolResolver.java
@@ -47,42 +47,15 @@ public class GrpcMessageProtocolResolver {
         CloudEvent event = eventFormat.deserialize(cloudEventJson.getBytes(StandardCharsets.UTF_8));
 
         RequestHeader header = message.getHeader();
-        String env = StringUtils.defaultIfEmpty(header.getEnv(), getEventExtension(event, ProtocolKey.ENV));
-        String idc = StringUtils.defaultIfEmpty(header.getIdc(), getEventExtension(event, ProtocolKey.IDC));
-        String ip = StringUtils.defaultIfEmpty(header.getIp(), getEventExtension(event, ProtocolKey.IP));
-        String pid = StringUtils.defaultIfEmpty(header.getPid(), getEventExtension(event, ProtocolKey.PID));
-        String sys = StringUtils.defaultIfEmpty(header.getSys(), getEventExtension(event, ProtocolKey.SYS));
-        String language = StringUtils.defaultIfEmpty(header.getLanguage(), getEventExtension(event, ProtocolKey.LANGUAGE));
-        String protocolType = StringUtils.defaultIfEmpty(header.getProtocolType(), getEventExtension(event, ProtocolKey.PROTOCOL_TYPE));
-        String protocolDesc = StringUtils.defaultIfEmpty(header.getProtocolDesc(), getEventExtension(event, ProtocolKey.PROTOCOL_DESC));
-        String protocolVersion = StringUtils.defaultIfEmpty(header.getProtocolVersion(), getEventExtension(event, ProtocolKey.PROTOCOL_VERSION));
-        String uniqueId = StringUtils.defaultIfEmpty(message.getUniqueId(), getEventExtension(event, ProtocolKey.UNIQUE_ID));
-        String seqNum = StringUtils.defaultIfEmpty(message.getSeqNum(), getEventExtension(event, ProtocolKey.SEQ_NUM));
+
+        String seqNum = getMessageItemValue(message.getSeqNum(), event, ProtocolKey.SEQ_NUM);
+        String uniqueId = getMessageItemValue(message.getUniqueId(), event, ProtocolKey.UNIQUE_ID);
+        String ttl = getMessageItemValue(message.getTtl(), event, ProtocolKey.TTL);
+        String producerGroup = getMessageItemValue(message.getProducerGroup(), event, ProtocolKey.PRODUCERGROUP);
         String topic = StringUtils.defaultIfEmpty(message.getTopic(), event.getSubject());
-        String username = StringUtils.defaultIfEmpty(header.getUsername(), getEventExtension(event, ProtocolKey.USERNAME));
-        String passwd = StringUtils.defaultIfEmpty(header.getPassword(), getEventExtension(event, ProtocolKey.PASSWD));
-        String ttl = StringUtils.defaultIfEmpty(message.getTtl(), getEventExtension(event, ProtocolKey.TTL));
-        String producerGroup = StringUtils.defaultIfEmpty(message.getProducerGroup(), getEventExtension(event, ProtocolKey.PRODUCERGROUP));
-
-        CloudEventBuilder eventBuilder;
-        if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
-            eventBuilder = CloudEventBuilder.v1(event);
-        } else {
-            eventBuilder = CloudEventBuilder.v03(event);
-        }
 
+        CloudEventBuilder eventBuilder = builderCloudEventBuilder(header, event);
         eventBuilder.withSubject(topic)
-            .withExtension(ProtocolKey.ENV, env)
-            .withExtension(ProtocolKey.IDC, idc)
-            .withExtension(ProtocolKey.IP, ip)
-            .withExtension(ProtocolKey.PID, pid)
-            .withExtension(ProtocolKey.SYS, sys)
-            .withExtension(ProtocolKey.USERNAME, username)
-            .withExtension(ProtocolKey.PASSWD, passwd)
-            .withExtension(ProtocolKey.LANGUAGE, language)
-            .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
-            .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
-            .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
             .withExtension(ProtocolKey.SEQ_NUM, seqNum)
             .withExtension(ProtocolKey.UNIQUE_ID, uniqueId)
             .withExtension(ProtocolKey.PRODUCERGROUP, producerGroup)
@@ -94,26 +67,25 @@ public class GrpcMessageProtocolResolver {
     }
 
     public static SimpleMessageWrapper buildSimpleMessage(CloudEvent cloudEvent) {
-        String env = cloudEvent.getExtension(ProtocolKey.ENV) == null ? "env" : cloudEvent.getExtension(ProtocolKey.ENV).toString();
-        String idc = cloudEvent.getExtension(ProtocolKey.IDC) == null ? "idc" : cloudEvent.getExtension(ProtocolKey.IDC).toString();
-        String ip = cloudEvent.getExtension(ProtocolKey.IP) == null ? "127.0.0.1" : cloudEvent.getExtension(ProtocolKey.IP).toString();
-        String pid = cloudEvent.getExtension(ProtocolKey.PID) == null ? "123" : cloudEvent.getExtension(ProtocolKey.PID).toString();
-        String sys = cloudEvent.getExtension(ProtocolKey.SYS) == null ? "sys123" : cloudEvent.getExtension(ProtocolKey.SYS).toString();
-        String userName = cloudEvent.getExtension(ProtocolKey.USERNAME) == null ? "user" : cloudEvent.getExtension(ProtocolKey.USERNAME).toString();
-        String passwd = cloudEvent.getExtension(ProtocolKey.PASSWD) == null ? "pass" : cloudEvent.getExtension(ProtocolKey.PASSWD).toString();
-        String language = cloudEvent.getExtension(ProtocolKey.LANGUAGE) == null ? Constants.LANGUAGE_JAVA :
-            cloudEvent.getExtension(ProtocolKey.LANGUAGE).toString();
-        String protocol = cloudEvent.getExtension(ProtocolKey.PROTOCOL_TYPE) == null ? "protocol" :
-            cloudEvent.getExtension(ProtocolKey.PROTOCOL_TYPE).toString();
-        String protocolDesc = cloudEvent.getExtension(ProtocolKey.PROTOCOL_DESC) == null ? "protocolDesc" :
-            cloudEvent.getExtension(ProtocolKey.PROTOCOL_DESC).toString();
-        String protocolVersion = cloudEvent.getExtension(ProtocolKey.PROTOCOL_VERSION) == null ? "1.0" :
-            cloudEvent.getExtension(ProtocolKey.PROTOCOL_VERSION).toString();
-        String seqNum = cloudEvent.getExtension(ProtocolKey.SEQ_NUM) == null ? "" : cloudEvent.getExtension(ProtocolKey.SEQ_NUM).toString();
-        String uniqueId = cloudEvent.getExtension(ProtocolKey.UNIQUE_ID) == null ? "" : cloudEvent.getExtension(ProtocolKey.UNIQUE_ID).toString();
-        String producerGroup = cloudEvent.getExtension(ProtocolKey.PRODUCERGROUP) == null ? "producerGroup" :
-            cloudEvent.getExtension(ProtocolKey.PRODUCERGROUP).toString();
-        String ttl = cloudEvent.getExtension(ProtocolKey.TTL) == null ? "3000" : cloudEvent.getExtension(ProtocolKey.TTL).toString();
+
+        String env = getCloudEventExtension(cloudEvent, ProtocolKey.ENV, "env");
+        String idc = getCloudEventExtension(cloudEvent, ProtocolKey.IDC, "idc");
+
+        String ip = getCloudEventExtension(cloudEvent, ProtocolKey.IP, "127.0.0.1");
+        String pid = getCloudEventExtension(cloudEvent, ProtocolKey.PID, "123");
+        String sys = getCloudEventExtension(cloudEvent, ProtocolKey.SYS, "sys123");
+        String userName = getCloudEventExtension(cloudEvent, ProtocolKey.USERNAME, "user");
+        String passwd = getCloudEventExtension(cloudEvent, ProtocolKey.PASSWD, "pass");
+        String language = getCloudEventExtension(cloudEvent, ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA);
+
+        String protocol = getCloudEventExtension(cloudEvent, ProtocolKey.PROTOCOL_TYPE, "protocol");
+        String protocolDesc = getCloudEventExtension(cloudEvent, ProtocolKey.PROTOCOL_DESC, "protocolDesc");
+        String protocolVersion = getCloudEventExtension(cloudEvent, ProtocolKey.PROTOCOL_VERSION, "1.0");
+        String seqNum = getCloudEventExtension(cloudEvent, ProtocolKey.SEQ_NUM, "");
+        String uniqueId = getCloudEventExtension(cloudEvent, ProtocolKey.UNIQUE_ID, "");
+
+        String producerGroup = getCloudEventExtension(cloudEvent, ProtocolKey.PRODUCERGROUP, "producerGroup");
+        String ttl = getCloudEventExtension(cloudEvent, ProtocolKey.TTL, "3000");
 
         RequestHeader header = RequestHeader.newBuilder()
             .setEnv(env).setIdc(idc)
@@ -145,6 +117,11 @@ public class GrpcMessageProtocolResolver {
         return new SimpleMessageWrapper(simpleMessage);
     }
 
+    private static String getCloudEventExtension(CloudEvent cloudEvent, String protocolKey, String defaultValue) {
+        Object extension = cloudEvent.getExtension(protocolKey);
+        return Objects.isNull(extension) ? defaultValue : extension.toString();
+    }
+
     public static List<CloudEvent> buildBatchEvents(BatchMessage batchMessage) {
         List<CloudEvent> cloudEvents = new ArrayList<>();
 
@@ -157,55 +134,20 @@ public class GrpcMessageProtocolResolver {
             EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(contentType);
             CloudEvent event = eventFormat.deserialize(cloudEventJson.getBytes(StandardCharsets.UTF_8));
 
-            String env = StringUtils.isEmpty(header.getEnv()) ? event.getExtension(ProtocolKey.ENV).toString() : header.getEnv();
-            String idc = StringUtils.isEmpty(header.getIdc()) ? event.getExtension(ProtocolKey.IDC).toString() : header.getIdc();
-            String ip = StringUtils.isEmpty(header.getIp()) ? event.getExtension(ProtocolKey.IP).toString() : header.getIp();
-            String pid = StringUtils.isEmpty(header.getPid()) ? event.getExtension(ProtocolKey.PID).toString() : header.getPid();
-            String sys = StringUtils.isEmpty(header.getSys()) ? event.getExtension(ProtocolKey.SYS).toString() : header.getSys();
-
-            String language = StringUtils.isEmpty(header.getLanguage())
-                ? event.getExtension(ProtocolKey.LANGUAGE).toString() : header.getLanguage();
-
-            String protocolType = StringUtils.isEmpty(header.getProtocolType())
-                ? event.getExtension(ProtocolKey.PROTOCOL_TYPE).toString() : header.getProtocolType();
-
-            String protocolDesc = StringUtils.isEmpty(header.getProtocolDesc())
-                ? event.getExtension(ProtocolKey.PROTOCOL_DESC).toString() : header.getProtocolDesc();
+            String seqNum = getMessageItemValue(item.getSeqNum(), event, ProtocolKey.SEQ_NUM);
 
-            String protocolVersion = StringUtils.isEmpty(header.getProtocolVersion())
-                ? event.getExtension(ProtocolKey.PROTOCOL_VERSION).toString() : header.getProtocolVersion();
-
-            String username = StringUtils.isEmpty(header.getUsername()) ? event.getExtension(ProtocolKey.USERNAME).toString() : header.getUsername();
-            String passwd = StringUtils.isEmpty(header.getPassword()) ? event.getExtension(ProtocolKey.PASSWD).toString() : header.getPassword();
-
-            String seqNum = StringUtils.isEmpty(item.getSeqNum()) ? event.getExtension(ProtocolKey.SEQ_NUM).toString() : item.getSeqNum();
-            String uniqueId = StringUtils.isEmpty(item.getUniqueId()) ? event.getExtension(ProtocolKey.UNIQUE_ID).toString() : item.getUniqueId();
+            String uniqueId = getMessageItemValue(item.getUniqueId(), event, ProtocolKey.UNIQUE_ID);
+            String ttl = getMessageItemValue(item.getTtl(), event, ProtocolKey.TTL);
 
             String topic = StringUtils.isEmpty(batchMessage.getTopic()) ? event.getSubject() : batchMessage.getTopic();
 
             String producerGroup = StringUtils.isEmpty(batchMessage.getProducerGroup())
-                ? event.getExtension(ProtocolKey.PRODUCERGROUP).toString() : batchMessage.getProducerGroup();
-            String ttl = StringUtils.isEmpty(item.getTtl()) ? event.getExtension(ProtocolKey.TTL).toString() : item.getTtl();
+                ? Objects.requireNonNull(event.getExtension(ProtocolKey.PRODUCERGROUP)).toString() : batchMessage.getProducerGroup();
+
+            CloudEventBuilder eventBuilder = builderCloudEventBuilder(header, event);
 
-            CloudEventBuilder eventBuilder;
-            if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
-                eventBuilder = CloudEventBuilder.v1(event);
-            } else {
-                eventBuilder = CloudEventBuilder.v03(event);
-            }
 
             eventBuilder.withSubject(topic)
-                .withExtension(ProtocolKey.ENV, env)
-                .withExtension(ProtocolKey.IDC, idc)
-                .withExtension(ProtocolKey.IP, ip)
-                .withExtension(ProtocolKey.PID, pid)
-                .withExtension(ProtocolKey.SYS, sys)
-                .withExtension(ProtocolKey.USERNAME, username)
-                .withExtension(ProtocolKey.PASSWD, passwd)
-                .withExtension(ProtocolKey.LANGUAGE, language)
-                .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
-                .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
-                .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
                 .withExtension(ProtocolKey.SEQ_NUM, seqNum)
                 .withExtension(ProtocolKey.UNIQUE_ID, uniqueId)
                 .withExtension(ProtocolKey.PRODUCERGROUP, producerGroup)
@@ -217,9 +159,44 @@ public class GrpcMessageProtocolResolver {
         }
         return cloudEvents;
     }
-    
-    private static String getEventExtension(CloudEvent event, String protocolKey) {
-        return Objects.requireNonNull(event.getExtension(protocolKey)).toString();
+
+    private static String getHeaderValue(String value, CloudEvent event, String key) {
+        return StringUtils.isEmpty(value) ? Objects.requireNonNull(event.getExtension(key)).toString() : value;
+    }
+
+    private static String getMessageItemValue(String value, CloudEvent event, String key) {
+        return StringUtils.isEmpty(value) ? Objects.requireNonNull(event.getExtension(key)).toString() : value;
+    }
+
+    private static CloudEventBuilder builderCloudEventBuilder(RequestHeader header, CloudEvent event) {
+        String env = getHeaderValue(header.getEnv(), event, ProtocolKey.ENV);
+        String idc = getHeaderValue(header.getIdc(), event, ProtocolKey.IDC);
+        String ip = getHeaderValue(header.getIp(), event, ProtocolKey.IP);
+        String pid = getHeaderValue(header.getPid(), event, ProtocolKey.PID);
+        String sys = getHeaderValue(header.getSys(), event, ProtocolKey.SYS);
+        String language = getHeaderValue(header.getLanguage(), event, ProtocolKey.LANGUAGE);
+        String protocolType = getHeaderValue(header.getProtocolType(), event, ProtocolKey.PROTOCOL_TYPE);
+        String protocolDesc = getHeaderValue(header.getProtocolDesc(), event, ProtocolKey.PROTOCOL_DESC);
+        String protocolVersion = getHeaderValue(header.getProtocolVersion(), event, ProtocolKey.PROTOCOL_VERSION);
+        String username = getHeaderValue(header.getUsername(), event, ProtocolKey.USERNAME);
+        String passwd = getHeaderValue(header.getPassword(), event, ProtocolKey.PASSWD);
+
+        CloudEventBuilder eventBuilder = StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)
+            ? CloudEventBuilder.v1(event) : CloudEventBuilder.v03(event);
+
+        return eventBuilder
+            .withExtension(ProtocolKey.ENV, env)
+            .withExtension(ProtocolKey.IDC, idc)
+            .withExtension(ProtocolKey.IP, ip)
+            .withExtension(ProtocolKey.PID, pid)
+            .withExtension(ProtocolKey.SYS, sys)
+            .withExtension(ProtocolKey.USERNAME, username)
+            .withExtension(ProtocolKey.PASSWD, passwd)
+            .withExtension(ProtocolKey.LANGUAGE, language)
+            .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
+            .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
+            .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion);
+
     }
-    
+
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org