You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2022/12/15 03:47:26 UTC
[incubator-eventmesh] branch master updated: [ISSUE #2518] Method invocation 'toString' may produce 'NullPointerException' [GrpcMessageProtocolResolver] (#2587)
This is an automated email from the ASF dual-hosted git repository.
jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 73f0ed6d3 [ISSUE #2518] Method invocation 'toString' may produce 'NullPointerException' [GrpcMessageProtocolResolver] (#2587)
73f0ed6d3 is described below
commit 73f0ed6d370fcc9a99ef5296aacfa25e01cec500
Author: weihubeats <we...@163.com>
AuthorDate: Thu Dec 15 11:47:20 2022 +0800
[ISSUE #2518] Method invocation 'toString' may produce 'NullPointerException' [GrpcMessageProtocolResolver] (#2587)
* simplify code
* Simplify the code
---
.../resolver/grpc/GrpcMessageProtocolResolver.java | 173 +++++++++------------
1 file changed, 75 insertions(+), 98 deletions(-)
diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/grpc/GrpcMessageProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/grpc/GrpcMessageProtocolResolver.java
index ed7c94eed..637f2b48e 100644
--- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/grpc/GrpcMessageProtocolResolver.java
+++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/grpc/GrpcMessageProtocolResolver.java
@@ -47,42 +47,15 @@ public class GrpcMessageProtocolResolver {
CloudEvent event = eventFormat.deserialize(cloudEventJson.getBytes(StandardCharsets.UTF_8));
RequestHeader header = message.getHeader();
- String env = StringUtils.defaultIfEmpty(header.getEnv(), getEventExtension(event, ProtocolKey.ENV));
- String idc = StringUtils.defaultIfEmpty(header.getIdc(), getEventExtension(event, ProtocolKey.IDC));
- String ip = StringUtils.defaultIfEmpty(header.getIp(), getEventExtension(event, ProtocolKey.IP));
- String pid = StringUtils.defaultIfEmpty(header.getPid(), getEventExtension(event, ProtocolKey.PID));
- String sys = StringUtils.defaultIfEmpty(header.getSys(), getEventExtension(event, ProtocolKey.SYS));
- String language = StringUtils.defaultIfEmpty(header.getLanguage(), getEventExtension(event, ProtocolKey.LANGUAGE));
- String protocolType = StringUtils.defaultIfEmpty(header.getProtocolType(), getEventExtension(event, ProtocolKey.PROTOCOL_TYPE));
- String protocolDesc = StringUtils.defaultIfEmpty(header.getProtocolDesc(), getEventExtension(event, ProtocolKey.PROTOCOL_DESC));
- String protocolVersion = StringUtils.defaultIfEmpty(header.getProtocolVersion(), getEventExtension(event, ProtocolKey.PROTOCOL_VERSION));
- String uniqueId = StringUtils.defaultIfEmpty(message.getUniqueId(), getEventExtension(event, ProtocolKey.UNIQUE_ID));
- String seqNum = StringUtils.defaultIfEmpty(message.getSeqNum(), getEventExtension(event, ProtocolKey.SEQ_NUM));
+
+ String seqNum = getMessageItemValue(message.getSeqNum(), event, ProtocolKey.SEQ_NUM);
+ String uniqueId = getMessageItemValue(message.getUniqueId(), event, ProtocolKey.UNIQUE_ID);
+ String ttl = getMessageItemValue(message.getTtl(), event, ProtocolKey.TTL);
+ String producerGroup = getMessageItemValue(message.getProducerGroup(), event, ProtocolKey.PRODUCERGROUP);
String topic = StringUtils.defaultIfEmpty(message.getTopic(), event.getSubject());
- String username = StringUtils.defaultIfEmpty(header.getUsername(), getEventExtension(event, ProtocolKey.USERNAME));
- String passwd = StringUtils.defaultIfEmpty(header.getPassword(), getEventExtension(event, ProtocolKey.PASSWD));
- String ttl = StringUtils.defaultIfEmpty(message.getTtl(), getEventExtension(event, ProtocolKey.TTL));
- String producerGroup = StringUtils.defaultIfEmpty(message.getProducerGroup(), getEventExtension(event, ProtocolKey.PRODUCERGROUP));
-
- CloudEventBuilder eventBuilder;
- if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
- eventBuilder = CloudEventBuilder.v1(event);
- } else {
- eventBuilder = CloudEventBuilder.v03(event);
- }
+ CloudEventBuilder eventBuilder = builderCloudEventBuilder(header, event);
eventBuilder.withSubject(topic)
- .withExtension(ProtocolKey.ENV, env)
- .withExtension(ProtocolKey.IDC, idc)
- .withExtension(ProtocolKey.IP, ip)
- .withExtension(ProtocolKey.PID, pid)
- .withExtension(ProtocolKey.SYS, sys)
- .withExtension(ProtocolKey.USERNAME, username)
- .withExtension(ProtocolKey.PASSWD, passwd)
- .withExtension(ProtocolKey.LANGUAGE, language)
- .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
- .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
- .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(ProtocolKey.SEQ_NUM, seqNum)
.withExtension(ProtocolKey.UNIQUE_ID, uniqueId)
.withExtension(ProtocolKey.PRODUCERGROUP, producerGroup)
@@ -94,26 +67,25 @@ public class GrpcMessageProtocolResolver {
}
public static SimpleMessageWrapper buildSimpleMessage(CloudEvent cloudEvent) {
- String env = cloudEvent.getExtension(ProtocolKey.ENV) == null ? "env" : cloudEvent.getExtension(ProtocolKey.ENV).toString();
- String idc = cloudEvent.getExtension(ProtocolKey.IDC) == null ? "idc" : cloudEvent.getExtension(ProtocolKey.IDC).toString();
- String ip = cloudEvent.getExtension(ProtocolKey.IP) == null ? "127.0.0.1" : cloudEvent.getExtension(ProtocolKey.IP).toString();
- String pid = cloudEvent.getExtension(ProtocolKey.PID) == null ? "123" : cloudEvent.getExtension(ProtocolKey.PID).toString();
- String sys = cloudEvent.getExtension(ProtocolKey.SYS) == null ? "sys123" : cloudEvent.getExtension(ProtocolKey.SYS).toString();
- String userName = cloudEvent.getExtension(ProtocolKey.USERNAME) == null ? "user" : cloudEvent.getExtension(ProtocolKey.USERNAME).toString();
- String passwd = cloudEvent.getExtension(ProtocolKey.PASSWD) == null ? "pass" : cloudEvent.getExtension(ProtocolKey.PASSWD).toString();
- String language = cloudEvent.getExtension(ProtocolKey.LANGUAGE) == null ? Constants.LANGUAGE_JAVA :
- cloudEvent.getExtension(ProtocolKey.LANGUAGE).toString();
- String protocol = cloudEvent.getExtension(ProtocolKey.PROTOCOL_TYPE) == null ? "protocol" :
- cloudEvent.getExtension(ProtocolKey.PROTOCOL_TYPE).toString();
- String protocolDesc = cloudEvent.getExtension(ProtocolKey.PROTOCOL_DESC) == null ? "protocolDesc" :
- cloudEvent.getExtension(ProtocolKey.PROTOCOL_DESC).toString();
- String protocolVersion = cloudEvent.getExtension(ProtocolKey.PROTOCOL_VERSION) == null ? "1.0" :
- cloudEvent.getExtension(ProtocolKey.PROTOCOL_VERSION).toString();
- String seqNum = cloudEvent.getExtension(ProtocolKey.SEQ_NUM) == null ? "" : cloudEvent.getExtension(ProtocolKey.SEQ_NUM).toString();
- String uniqueId = cloudEvent.getExtension(ProtocolKey.UNIQUE_ID) == null ? "" : cloudEvent.getExtension(ProtocolKey.UNIQUE_ID).toString();
- String producerGroup = cloudEvent.getExtension(ProtocolKey.PRODUCERGROUP) == null ? "producerGroup" :
- cloudEvent.getExtension(ProtocolKey.PRODUCERGROUP).toString();
- String ttl = cloudEvent.getExtension(ProtocolKey.TTL) == null ? "3000" : cloudEvent.getExtension(ProtocolKey.TTL).toString();
+
+ String env = getCloudEventExtension(cloudEvent, ProtocolKey.ENV, "env");
+ String idc = getCloudEventExtension(cloudEvent, ProtocolKey.IDC, "idc");
+
+ String ip = getCloudEventExtension(cloudEvent, ProtocolKey.IP, "127.0.0.1");
+ String pid = getCloudEventExtension(cloudEvent, ProtocolKey.PID, "123");
+ String sys = getCloudEventExtension(cloudEvent, ProtocolKey.SYS, "sys123");
+ String userName = getCloudEventExtension(cloudEvent, ProtocolKey.USERNAME, "user");
+ String passwd = getCloudEventExtension(cloudEvent, ProtocolKey.PASSWD, "pass");
+ String language = getCloudEventExtension(cloudEvent, ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA);
+
+ String protocol = getCloudEventExtension(cloudEvent, ProtocolKey.PROTOCOL_TYPE, "protocol");
+ String protocolDesc = getCloudEventExtension(cloudEvent, ProtocolKey.PROTOCOL_DESC, "protocolDesc");
+ String protocolVersion = getCloudEventExtension(cloudEvent, ProtocolKey.PROTOCOL_VERSION, "1.0");
+ String seqNum = getCloudEventExtension(cloudEvent, ProtocolKey.SEQ_NUM, "");
+ String uniqueId = getCloudEventExtension(cloudEvent, ProtocolKey.UNIQUE_ID, "");
+
+ String producerGroup = getCloudEventExtension(cloudEvent, ProtocolKey.PRODUCERGROUP, "producerGroup");
+ String ttl = getCloudEventExtension(cloudEvent, ProtocolKey.TTL, "3000");
RequestHeader header = RequestHeader.newBuilder()
.setEnv(env).setIdc(idc)
@@ -145,6 +117,11 @@ public class GrpcMessageProtocolResolver {
return new SimpleMessageWrapper(simpleMessage);
}
+ private static String getCloudEventExtension(CloudEvent cloudEvent, String protocolKey, String defaultValue) {
+ Object extension = cloudEvent.getExtension(protocolKey);
+ return Objects.isNull(extension) ? defaultValue : extension.toString();
+ }
+
public static List<CloudEvent> buildBatchEvents(BatchMessage batchMessage) {
List<CloudEvent> cloudEvents = new ArrayList<>();
@@ -157,55 +134,20 @@ public class GrpcMessageProtocolResolver {
EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(contentType);
CloudEvent event = eventFormat.deserialize(cloudEventJson.getBytes(StandardCharsets.UTF_8));
- String env = StringUtils.isEmpty(header.getEnv()) ? event.getExtension(ProtocolKey.ENV).toString() : header.getEnv();
- String idc = StringUtils.isEmpty(header.getIdc()) ? event.getExtension(ProtocolKey.IDC).toString() : header.getIdc();
- String ip = StringUtils.isEmpty(header.getIp()) ? event.getExtension(ProtocolKey.IP).toString() : header.getIp();
- String pid = StringUtils.isEmpty(header.getPid()) ? event.getExtension(ProtocolKey.PID).toString() : header.getPid();
- String sys = StringUtils.isEmpty(header.getSys()) ? event.getExtension(ProtocolKey.SYS).toString() : header.getSys();
-
- String language = StringUtils.isEmpty(header.getLanguage())
- ? event.getExtension(ProtocolKey.LANGUAGE).toString() : header.getLanguage();
-
- String protocolType = StringUtils.isEmpty(header.getProtocolType())
- ? event.getExtension(ProtocolKey.PROTOCOL_TYPE).toString() : header.getProtocolType();
-
- String protocolDesc = StringUtils.isEmpty(header.getProtocolDesc())
- ? event.getExtension(ProtocolKey.PROTOCOL_DESC).toString() : header.getProtocolDesc();
+ String seqNum = getMessageItemValue(item.getSeqNum(), event, ProtocolKey.SEQ_NUM);
- String protocolVersion = StringUtils.isEmpty(header.getProtocolVersion())
- ? event.getExtension(ProtocolKey.PROTOCOL_VERSION).toString() : header.getProtocolVersion();
-
- String username = StringUtils.isEmpty(header.getUsername()) ? event.getExtension(ProtocolKey.USERNAME).toString() : header.getUsername();
- String passwd = StringUtils.isEmpty(header.getPassword()) ? event.getExtension(ProtocolKey.PASSWD).toString() : header.getPassword();
-
- String seqNum = StringUtils.isEmpty(item.getSeqNum()) ? event.getExtension(ProtocolKey.SEQ_NUM).toString() : item.getSeqNum();
- String uniqueId = StringUtils.isEmpty(item.getUniqueId()) ? event.getExtension(ProtocolKey.UNIQUE_ID).toString() : item.getUniqueId();
+ String uniqueId = getMessageItemValue(item.getUniqueId(), event, ProtocolKey.UNIQUE_ID);
+ String ttl = getMessageItemValue(item.getTtl(), event, ProtocolKey.TTL);
String topic = StringUtils.isEmpty(batchMessage.getTopic()) ? event.getSubject() : batchMessage.getTopic();
String producerGroup = StringUtils.isEmpty(batchMessage.getProducerGroup())
- ? event.getExtension(ProtocolKey.PRODUCERGROUP).toString() : batchMessage.getProducerGroup();
- String ttl = StringUtils.isEmpty(item.getTtl()) ? event.getExtension(ProtocolKey.TTL).toString() : item.getTtl();
+ ? Objects.requireNonNull(event.getExtension(ProtocolKey.PRODUCERGROUP)).toString() : batchMessage.getProducerGroup();
+
+ CloudEventBuilder eventBuilder = builderCloudEventBuilder(header, event);
- CloudEventBuilder eventBuilder;
- if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
- eventBuilder = CloudEventBuilder.v1(event);
- } else {
- eventBuilder = CloudEventBuilder.v03(event);
- }
eventBuilder.withSubject(topic)
- .withExtension(ProtocolKey.ENV, env)
- .withExtension(ProtocolKey.IDC, idc)
- .withExtension(ProtocolKey.IP, ip)
- .withExtension(ProtocolKey.PID, pid)
- .withExtension(ProtocolKey.SYS, sys)
- .withExtension(ProtocolKey.USERNAME, username)
- .withExtension(ProtocolKey.PASSWD, passwd)
- .withExtension(ProtocolKey.LANGUAGE, language)
- .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
- .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
- .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(ProtocolKey.SEQ_NUM, seqNum)
.withExtension(ProtocolKey.UNIQUE_ID, uniqueId)
.withExtension(ProtocolKey.PRODUCERGROUP, producerGroup)
@@ -217,9 +159,44 @@ public class GrpcMessageProtocolResolver {
}
return cloudEvents;
}
-
- private static String getEventExtension(CloudEvent event, String protocolKey) {
- return Objects.requireNonNull(event.getExtension(protocolKey)).toString();
+
+ private static String getHeaderValue(String value, CloudEvent event, String key) {
+ return StringUtils.isEmpty(value) ? Objects.requireNonNull(event.getExtension(key)).toString() : value;
+ }
+
+ private static String getMessageItemValue(String value, CloudEvent event, String key) {
+ return StringUtils.isEmpty(value) ? Objects.requireNonNull(event.getExtension(key)).toString() : value;
+ }
+
+ private static CloudEventBuilder builderCloudEventBuilder(RequestHeader header, CloudEvent event) {
+ String env = getHeaderValue(header.getEnv(), event, ProtocolKey.ENV);
+ String idc = getHeaderValue(header.getIdc(), event, ProtocolKey.IDC);
+ String ip = getHeaderValue(header.getIp(), event, ProtocolKey.IP);
+ String pid = getHeaderValue(header.getPid(), event, ProtocolKey.PID);
+ String sys = getHeaderValue(header.getSys(), event, ProtocolKey.SYS);
+ String language = getHeaderValue(header.getLanguage(), event, ProtocolKey.LANGUAGE);
+ String protocolType = getHeaderValue(header.getProtocolType(), event, ProtocolKey.PROTOCOL_TYPE);
+ String protocolDesc = getHeaderValue(header.getProtocolDesc(), event, ProtocolKey.PROTOCOL_DESC);
+ String protocolVersion = getHeaderValue(header.getProtocolVersion(), event, ProtocolKey.PROTOCOL_VERSION);
+ String username = getHeaderValue(header.getUsername(), event, ProtocolKey.USERNAME);
+ String passwd = getHeaderValue(header.getPassword(), event, ProtocolKey.PASSWD);
+
+ CloudEventBuilder eventBuilder = StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)
+ ? CloudEventBuilder.v1(event) : CloudEventBuilder.v03(event);
+
+ return eventBuilder
+ .withExtension(ProtocolKey.ENV, env)
+ .withExtension(ProtocolKey.IDC, idc)
+ .withExtension(ProtocolKey.IP, ip)
+ .withExtension(ProtocolKey.PID, pid)
+ .withExtension(ProtocolKey.SYS, sys)
+ .withExtension(ProtocolKey.USERNAME, username)
+ .withExtension(ProtocolKey.PASSWD, passwd)
+ .withExtension(ProtocolKey.LANGUAGE, language)
+ .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
+ .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
+ .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion);
+
}
-
+
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org