You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2022/12/04 01:31:36 UTC

[incubator-eventmesh] branch master updated: Check null pointer (#2438)

This is an automated email from the ASF dual-hosted git repository.

jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a67d333c Check null pointer (#2438)
8a67d333c is described below

commit 8a67d333c426506b58ce8e98bd1516ee2a60ab98
Author: weihubeats <we...@163.com>
AuthorDate: Sun Dec 4 09:31:30 2022 +0800

    Check null pointer (#2438)
---
 .../connector/rocketmq/producer/ProducerImpl.java  | 56 +++++++++++-----------
 1 file changed, 27 insertions(+), 29 deletions(-)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
index fd4cecc71..47ac04fc7 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
+import java.util.Objects;
 import java.util.Properties;
 
 import io.cloudevents.CloudEvent;
@@ -71,8 +72,8 @@ public class ProducerImpl extends AbstractProducer {
     public SendResult send(CloudEvent cloudEvent) {
         this.checkProducerServiceState(rocketmqProducer.getDefaultMQProducerImpl());
         org.apache.rocketmq.common.message.Message msg =
-                RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);
-        msg = supplySysProp(msg, cloudEvent);
+                RocketMQMessageFactory.createWriter(Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent);
+        supplySysProp(msg, cloudEvent);
         String messageId = null;
         try {
             org.apache.rocketmq.client.producer.SendResult sendResultRmq = this.rocketmqProducer.send(msg);
@@ -91,8 +92,8 @@ public class ProducerImpl extends AbstractProducer {
     public void sendOneway(CloudEvent cloudEvent) {
         this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
         org.apache.rocketmq.common.message.Message msg =
-                RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);
-        msg = supplySysProp(msg, cloudEvent);
+                RocketMQMessageFactory.createWriter(Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent);
+        supplySysProp(msg, cloudEvent);
         try {
             this.rocketmqProducer.sendOneway(msg);
         } catch (Exception e) {
@@ -105,7 +106,7 @@ public class ProducerImpl extends AbstractProducer {
     public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) {
         this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
         org.apache.rocketmq.common.message.Message msg =
-                RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);
+                RocketMQMessageFactory.createWriter(Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent);
         msg = supplySysProp(msg, cloudEvent);
         try {
             this.rocketmqProducer.send(msg, this.sendCallbackConvert(msg, sendCallback));
@@ -120,19 +121,19 @@ public class ProducerImpl extends AbstractProducer {
 
         this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
         org.apache.rocketmq.common.message.Message msg =
-                RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);
+                RocketMQMessageFactory.createWriter(Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent);
 
-        msg = supplySysProp(msg, cloudEvent);
+        supplySysProp(msg, cloudEvent);
 
         rocketmqProducer.request(msg, rrCallbackConvert(msg, rrCallback), timeout);
     }
 
-    public boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallback) {
+    public void reply(final CloudEvent cloudEvent, final SendCallback sendCallback) {
         this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
         org.apache.rocketmq.common.message.Message msg =
-                RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);
+                RocketMQMessageFactory.createWriter(Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent);
         MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);
-        msg = supplySysProp(msg, cloudEvent);
+        supplySysProp(msg, cloudEvent);
 
         try {
             this.rocketmqProducer.send(msg, this.sendCallbackConvert(msg, sendCallback));
@@ -140,7 +141,6 @@ public class ProducerImpl extends AbstractProducer {
             log.error(String.format("Send message async Exception, %s", msg), e);
             throw this.checkProducerException(msg.getTopic(), MessageClientIDSetter.getUniqID(msg), e);
         }
-        return true;
 
     }
 
@@ -148,7 +148,7 @@ public class ProducerImpl extends AbstractProducer {
         for (String sysPropKey : MessageConst.STRING_HASH_SET) {
             String ceKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR);
             if (cloudEvent.getExtension(ceKey) != null && StringUtils.isNotEmpty(cloudEvent.getExtension(ceKey).toString())) {
-                MessageAccessor.putProperty(msg, sysPropKey, cloudEvent.getExtension(ceKey).toString());
+                MessageAccessor.putProperty(msg, sysPropKey, Objects.requireNonNull(cloudEvent.getExtension(ceKey)).toString());
                 msg.getProperties().remove(ceKey);
             }
         }
@@ -187,24 +187,22 @@ public class ProducerImpl extends AbstractProducer {
 
     private org.apache.rocketmq.client.producer.SendCallback sendCallbackConvert(final Message message,
                                                                                  final SendCallback sendCallback) {
-        org.apache.rocketmq.client.producer.SendCallback rmqSendCallback =
-                new org.apache.rocketmq.client.producer.SendCallback() {
-                    @Override
-                    public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
-                        sendCallback.onSuccess(CloudEventUtils.convertSendResult(sendResult));
-                    }
+        return new org.apache.rocketmq.client.producer.SendCallback() {
+            @Override
+            public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
+                sendCallback.onSuccess(CloudEventUtils.convertSendResult(sendResult));
+            }
 
-                    @Override
-                    public void onException(Throwable e) {
-                        String topic = message.getTopic();
-                        ConnectorRuntimeException onsEx = ProducerImpl.this.checkProducerException(topic, null, e);
-                        OnExceptionContext context = new OnExceptionContext();
-                        context.setTopic(topic);
-                        context.setException(onsEx);
-                        sendCallback.onException(context);
-                    }
-                };
-        return rmqSendCallback;
+            @Override
+            public void onException(Throwable e) {
+                String topic = message.getTopic();
+                ConnectorRuntimeException onsEx = ProducerImpl.this.checkProducerException(topic, null, e);
+                OnExceptionContext context = new OnExceptionContext();
+                context.setTopic(topic);
+                context.setException(onsEx);
+                sendCallback.onException(context);
+            }
+        };
     }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org