You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2022/12/01 10:00:36 UTC

[incubator-eventmesh] branch master updated: Simplify CloudEventUtils code (#2323)

This is an automated email from the ASF dual-hosted git repository.

jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new bc6fe3c0e Simplify CloudEventUtils code (#2323)
bc6fe3c0e is described below

commit bc6fe3c0e9f874e6a48c8800e4e172e4441a7394
Author: weihubeats <we...@163.com>
AuthorDate: Thu Dec 1 18:00:30 2022 +0800

    Simplify CloudEventUtils code (#2323)
---
 .../connector/rocketmq/utils/CloudEventUtils.java  | 84 +++++++++++-----------
 1 file changed, 41 insertions(+), 43 deletions(-)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java
index dbf1175e9..85eb0e0a7 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java
@@ -26,13 +26,14 @@ import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 
-import java.util.Map;
-import java.util.Set;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
 
 public class CloudEventUtils {
 
     public static SendResult convertSendResult(
-            org.apache.rocketmq.client.producer.SendResult rmqResult) {
+        org.apache.rocketmq.client.producer.SendResult rmqResult) {
         SendResult sendResult = new SendResult();
         sendResult.setTopic(rmqResult.getMessageQueue().getTopic());
         sendResult.setMessageId(rmqResult.getMsgId());
@@ -42,53 +43,39 @@ public class CloudEventUtils {
 
     public static Message msgConvert(MessageExt rmqMsg) {
         Message message = new Message();
-        if (rmqMsg.getTopic() != null) {
-            message.setTopic(rmqMsg.getTopic());
-        }
-
-        if (rmqMsg.getKeys() != null) {
-            message.setKeys(rmqMsg.getKeys());
-        }
-
-        if (rmqMsg.getTags() != null) {
-            message.setTags(rmqMsg.getTags());
-        }
-
+        initProperty(rmqMsg, message, MessageExt::getTopic, Message::setTopic);
+        initProperty(rmqMsg, message, MessageExt::getKeys, Message::setKeys);
+        initProperty(rmqMsg, message, MessageExt::getTags, Message::setTags);
         if (rmqMsg.getBody() != null) {
             message.setBody(rmqMsg.getBody());
         }
-
-        final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
-
-        for (final Map.Entry<String, String> entry : entries) {
-            MessageAccessor.putProperty(message, entry.getKey(), entry.getValue());
-        }
+        rmqMsg.getProperties().forEach((k, v) -> MessageAccessor.putProperty(message, k, v));
 
         if (rmqMsg.getMsgId() != null) {
             MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_MESSAGE_ID),
-                    rmqMsg.getMsgId());
+                rmqMsg.getMsgId());
         }
 
         if (rmqMsg.getTopic() != null) {
             MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_DESTINATION),
-                    rmqMsg.getTopic());
+                rmqMsg.getTopic());
         }
 
         //
         MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_BORN_HOST),
-                String.valueOf(rmqMsg.getBornHost()));
+            String.valueOf(rmqMsg.getBornHost()));
         MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP),
-                String.valueOf(rmqMsg.getBornTimestamp()));
+            String.valueOf(rmqMsg.getBornTimestamp()));
         MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_STORE_HOST),
-                String.valueOf(rmqMsg.getStoreHost()));
+            String.valueOf(rmqMsg.getStoreHost()));
         MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_STORE_TIMESTAMP),
-                String.valueOf(rmqMsg.getStoreTimestamp()));
+            String.valueOf(rmqMsg.getStoreTimestamp()));
 
         //use in manual ack
         MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_ID),
-                String.valueOf(rmqMsg.getQueueId()));
+            String.valueOf(rmqMsg.getQueueId()));
         MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET),
-                String.valueOf(rmqMsg.getQueueOffset()));
+            String.valueOf(rmqMsg.getQueueOffset()));
 
         for (String sysPropKey : MessageConst.STRING_HASH_SET) {
             if (StringUtils.isNotEmpty(message.getProperty(sysPropKey))) {
@@ -110,15 +97,10 @@ public class CloudEventUtils {
     public static org.apache.rocketmq.common.message.MessageExt msgConvertExt(Message message) {
 
         org.apache.rocketmq.common.message.MessageExt rmqMessageExt =
-                new org.apache.rocketmq.common.message.MessageExt();
+            new org.apache.rocketmq.common.message.MessageExt();
         try {
-            if (message.getKeys() != null) {
-                rmqMessageExt.setKeys(message.getKeys());
-            }
-            if (message.getTags() != null) {
-                rmqMessageExt.setTags(message.getTags());
-            }
-
+            initProperty(message, rmqMessageExt, Message::getKeys, Message::setKeys);
+            initProperty(message, rmqMessageExt, Message::getTags, Message::setTags);
 
             if (message.getBody() != null) {
                 rmqMessageExt.setBody(message.getBody());
@@ -129,16 +111,14 @@ public class CloudEventUtils {
             rmqMessageExt.setTopic(message.getTopic());
 
             int queueId =
-                    Integer.parseInt(message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_ID)));
+                Integer.parseInt(message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_ID)));
             long queueOffset = Long.parseLong(
-                    message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET)));
+                message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET)));
             //use in manual ack
             rmqMessageExt.setQueueId(queueId);
             rmqMessageExt.setQueueOffset(queueOffset);
-            Map<String, String> properties = message.getProperties();
-            for (final Map.Entry<String, String> entry : properties.entrySet()) {
-                MessageAccessor.putProperty(rmqMessageExt, entry.getKey(), entry.getValue());
-            }
+
+            message.getProperties().forEach((k, v) -> MessageAccessor.putProperty(rmqMessageExt, k, v));
         } catch (Exception e) {
             e.printStackTrace();
         }
@@ -146,5 +126,23 @@ public class CloudEventUtils {
 
     }
 
+    /**
+     * Populate the target with properties whose source is not empty
+     *
+     * @param source     source
+     * @param target     target
+     * @param function   function
+     * @param biConsumer biConsumer
+     * @param <T>        t
+     * @param <V>        v
+     */
+    private static <T, V> void initProperty(T source, V target, Function<T, String> function, BiConsumer<V, String> biConsumer) {
+        String apply = function.apply(source);
+        if (Objects.nonNull(apply)) {
+            biConsumer.accept(target, apply);
+        }
+
+    }
+
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org