You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2022/12/04 01:31:36 UTC
[incubator-eventmesh] branch master updated: Check null pointer (#2438)
This is an automated email from the ASF dual-hosted git repository.
jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 8a67d333c Check null pointer (#2438)
8a67d333c is described below
commit 8a67d333c426506b58ce8e98bd1516ee2a60ab98
Author: weihubeats <we...@163.com>
AuthorDate: Sun Dec 4 09:31:30 2022 +0800
Check null pointer (#2438)
---
.../connector/rocketmq/producer/ProducerImpl.java | 56 +++++++++++-----------
1 file changed, 27 insertions(+), 29 deletions(-)
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
index fd4cecc71..47ac04fc7 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import java.util.Objects;
import java.util.Properties;
import io.cloudevents.CloudEvent;
@@ -71,8 +72,8 @@ public class ProducerImpl extends AbstractProducer {
public SendResult send(CloudEvent cloudEvent) {
this.checkProducerServiceState(rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msg =
- RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);
- msg = supplySysProp(msg, cloudEvent);
+ RocketMQMessageFactory.createWriter(Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent);
+ supplySysProp(msg, cloudEvent);
String messageId = null;
try {
org.apache.rocketmq.client.producer.SendResult sendResultRmq = this.rocketmqProducer.send(msg);
@@ -91,8 +92,8 @@ public class ProducerImpl extends AbstractProducer {
public void sendOneway(CloudEvent cloudEvent) {
this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msg =
- RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);
- msg = supplySysProp(msg, cloudEvent);
+ RocketMQMessageFactory.createWriter(Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent);
+ supplySysProp(msg, cloudEvent);
try {
this.rocketmqProducer.sendOneway(msg);
} catch (Exception e) {
@@ -105,7 +106,7 @@ public class ProducerImpl extends AbstractProducer {
public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) {
this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msg =
- RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);
+ RocketMQMessageFactory.createWriter(Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent);
msg = supplySysProp(msg, cloudEvent);
try {
this.rocketmqProducer.send(msg, this.sendCallbackConvert(msg, sendCallback));
@@ -120,19 +121,19 @@ public class ProducerImpl extends AbstractProducer {
this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msg =
- RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);
+ RocketMQMessageFactory.createWriter(Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent);
- msg = supplySysProp(msg, cloudEvent);
+ supplySysProp(msg, cloudEvent);
rocketmqProducer.request(msg, rrCallbackConvert(msg, rrCallback), timeout);
}
- public boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallback) {
+ public void reply(final CloudEvent cloudEvent, final SendCallback sendCallback) {
this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msg =
- RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent);
+ RocketMQMessageFactory.createWriter(Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent);
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);
- msg = supplySysProp(msg, cloudEvent);
+ supplySysProp(msg, cloudEvent);
try {
this.rocketmqProducer.send(msg, this.sendCallbackConvert(msg, sendCallback));
@@ -140,7 +141,6 @@ public class ProducerImpl extends AbstractProducer {
log.error(String.format("Send message async Exception, %s", msg), e);
throw this.checkProducerException(msg.getTopic(), MessageClientIDSetter.getUniqID(msg), e);
}
- return true;
}
@@ -148,7 +148,7 @@ public class ProducerImpl extends AbstractProducer {
for (String sysPropKey : MessageConst.STRING_HASH_SET) {
String ceKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR);
if (cloudEvent.getExtension(ceKey) != null && StringUtils.isNotEmpty(cloudEvent.getExtension(ceKey).toString())) {
- MessageAccessor.putProperty(msg, sysPropKey, cloudEvent.getExtension(ceKey).toString());
+ MessageAccessor.putProperty(msg, sysPropKey, Objects.requireNonNull(cloudEvent.getExtension(ceKey)).toString());
msg.getProperties().remove(ceKey);
}
}
@@ -187,24 +187,22 @@ public class ProducerImpl extends AbstractProducer {
private org.apache.rocketmq.client.producer.SendCallback sendCallbackConvert(final Message message,
final SendCallback sendCallback) {
- org.apache.rocketmq.client.producer.SendCallback rmqSendCallback =
- new org.apache.rocketmq.client.producer.SendCallback() {
- @Override
- public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
- sendCallback.onSuccess(CloudEventUtils.convertSendResult(sendResult));
- }
+ return new org.apache.rocketmq.client.producer.SendCallback() {
+ @Override
+ public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
+ sendCallback.onSuccess(CloudEventUtils.convertSendResult(sendResult));
+ }
- @Override
- public void onException(Throwable e) {
- String topic = message.getTopic();
- ConnectorRuntimeException onsEx = ProducerImpl.this.checkProducerException(topic, null, e);
- OnExceptionContext context = new OnExceptionContext();
- context.setTopic(topic);
- context.setException(onsEx);
- sendCallback.onException(context);
- }
- };
- return rmqSendCallback;
+ @Override
+ public void onException(Throwable e) {
+ String topic = message.getTopic();
+ ConnectorRuntimeException onsEx = ProducerImpl.this.checkProducerException(topic, null, e);
+ OnExceptionContext context = new OnExceptionContext();
+ context.setTopic(topic);
+ context.setException(onsEx);
+ sendCallback.onException(context);
+ }
+ };
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org