You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2022/12/01 10:00:36 UTC
[incubator-eventmesh] branch master updated: Simplify CloudEventUtils code (#2323)
This is an automated email from the ASF dual-hosted git repository.
jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new bc6fe3c0e Simplify CloudEventUtils code (#2323)
bc6fe3c0e is described below
commit bc6fe3c0e9f874e6a48c8800e4e172e4441a7394
Author: weihubeats <we...@163.com>
AuthorDate: Thu Dec 1 18:00:30 2022 +0800
Simplify CloudEventUtils code (#2323)
---
.../connector/rocketmq/utils/CloudEventUtils.java | 84 +++++++++++-----------
1 file changed, 41 insertions(+), 43 deletions(-)
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java
index dbf1175e9..85eb0e0a7 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java
@@ -26,13 +26,14 @@ import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
-import java.util.Map;
-import java.util.Set;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
public class CloudEventUtils {
public static SendResult convertSendResult(
- org.apache.rocketmq.client.producer.SendResult rmqResult) {
+ org.apache.rocketmq.client.producer.SendResult rmqResult) {
SendResult sendResult = new SendResult();
sendResult.setTopic(rmqResult.getMessageQueue().getTopic());
sendResult.setMessageId(rmqResult.getMsgId());
@@ -42,53 +43,39 @@ public class CloudEventUtils {
public static Message msgConvert(MessageExt rmqMsg) {
Message message = new Message();
- if (rmqMsg.getTopic() != null) {
- message.setTopic(rmqMsg.getTopic());
- }
-
- if (rmqMsg.getKeys() != null) {
- message.setKeys(rmqMsg.getKeys());
- }
-
- if (rmqMsg.getTags() != null) {
- message.setTags(rmqMsg.getTags());
- }
-
+ initProperty(rmqMsg, message, MessageExt::getTopic, Message::setTopic);
+ initProperty(rmqMsg, message, MessageExt::getKeys, Message::setKeys);
+ initProperty(rmqMsg, message, MessageExt::getTags, Message::setTags);
if (rmqMsg.getBody() != null) {
message.setBody(rmqMsg.getBody());
}
-
- final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
-
- for (final Map.Entry<String, String> entry : entries) {
- MessageAccessor.putProperty(message, entry.getKey(), entry.getValue());
- }
+ rmqMsg.getProperties().forEach((k, v) -> MessageAccessor.putProperty(message, k, v));
if (rmqMsg.getMsgId() != null) {
MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_MESSAGE_ID),
- rmqMsg.getMsgId());
+ rmqMsg.getMsgId());
}
if (rmqMsg.getTopic() != null) {
MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_DESTINATION),
- rmqMsg.getTopic());
+ rmqMsg.getTopic());
}
//
MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_BORN_HOST),
- String.valueOf(rmqMsg.getBornHost()));
+ String.valueOf(rmqMsg.getBornHost()));
MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP),
- String.valueOf(rmqMsg.getBornTimestamp()));
+ String.valueOf(rmqMsg.getBornTimestamp()));
MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_STORE_HOST),
- String.valueOf(rmqMsg.getStoreHost()));
+ String.valueOf(rmqMsg.getStoreHost()));
MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_STORE_TIMESTAMP),
- String.valueOf(rmqMsg.getStoreTimestamp()));
+ String.valueOf(rmqMsg.getStoreTimestamp()));
//use in manual ack
MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_ID),
- String.valueOf(rmqMsg.getQueueId()));
+ String.valueOf(rmqMsg.getQueueId()));
MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET),
- String.valueOf(rmqMsg.getQueueOffset()));
+ String.valueOf(rmqMsg.getQueueOffset()));
for (String sysPropKey : MessageConst.STRING_HASH_SET) {
if (StringUtils.isNotEmpty(message.getProperty(sysPropKey))) {
@@ -110,15 +97,10 @@ public class CloudEventUtils {
public static org.apache.rocketmq.common.message.MessageExt msgConvertExt(Message message) {
org.apache.rocketmq.common.message.MessageExt rmqMessageExt =
- new org.apache.rocketmq.common.message.MessageExt();
+ new org.apache.rocketmq.common.message.MessageExt();
try {
- if (message.getKeys() != null) {
- rmqMessageExt.setKeys(message.getKeys());
- }
- if (message.getTags() != null) {
- rmqMessageExt.setTags(message.getTags());
- }
-
+ initProperty(message, rmqMessageExt, Message::getKeys, Message::setKeys);
+ initProperty(message, rmqMessageExt, Message::getTags, Message::setTags);
if (message.getBody() != null) {
rmqMessageExt.setBody(message.getBody());
@@ -129,16 +111,14 @@ public class CloudEventUtils {
rmqMessageExt.setTopic(message.getTopic());
int queueId =
- Integer.parseInt(message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_ID)));
+ Integer.parseInt(message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_ID)));
long queueOffset = Long.parseLong(
- message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET)));
+ message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET)));
//use in manual ack
rmqMessageExt.setQueueId(queueId);
rmqMessageExt.setQueueOffset(queueOffset);
- Map<String, String> properties = message.getProperties();
- for (final Map.Entry<String, String> entry : properties.entrySet()) {
- MessageAccessor.putProperty(rmqMessageExt, entry.getKey(), entry.getValue());
- }
+
+ message.getProperties().forEach((k, v) -> MessageAccessor.putProperty(rmqMessageExt, k, v));
} catch (Exception e) {
e.printStackTrace();
}
@@ -146,5 +126,23 @@ public class CloudEventUtils {
}
+ /**
+ * Populate the target with properties whose source is not empty
+ *
+ * @param source source
+ * @param target target
+ * @param function function
+ * @param biConsumer biConsumer
+ * @param <T> t
+ * @param <V> v
+ */
+ private static <T, V> void initProperty(T source, V target, Function<T, String> function, BiConsumer<V, String> biConsumer) {
+ String apply = function.apply(source);
+ if (Objects.nonNull(apply)) {
+ biConsumer.accept(target, apply);
+ }
+
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org