You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/05/11 07:59:06 UTC

[rocketmq-connect] branch master updated: [ISSUE #127]when sync tag or key properties of message, remove the 'connect-ext-' prefix (#128)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ea7062  [ISSUE #127]when sync tag or key properties of message, remove the 'connect-ext-' prefix (#128)
2ea7062 is described below

commit 2ea7062f268297c7ff166df41483ffe57566bd30
Author: zhangjidi2016 <zh...@cmss.chinamobile.com>
AuthorDate: Wed May 11 15:59:01 2022 +0800

    [ISSUE #127]when sync tag or key properties of message, remove the 'connect-ext-' prefix (#128)
    
    * [ISSUE #127]when sync tag or key properties of message, remove the 'connect-ext-' prefix
    
    * static list of properties instead of enumerating
    
    Co-authored-by: zhangjidi <zh...@cmss.chinamobile.com>
---
 .../runtime/connectorwrapper/WorkerSourceTask.java     | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)

diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 36a10fd..955d770 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -32,6 +32,7 @@ import io.openmessaging.connector.api.errors.RetriableException;
 import io.openmessaging.connector.api.storage.OffsetStorageReader;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -44,6 +45,7 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
@@ -112,6 +114,16 @@ public class WorkerSourceTask implements WorkerTask {
 
     private TransformChain<ConnectRecord> transformChain;
 
+    /**
+     * The property of message in WHITE_KEY_SET don't need add a connect prefix
+     */
+    private static final Set<String> WHITE_KEY_SET = new HashSet<>();
+
+    static {
+        WHITE_KEY_SET.add(MessageConst.PROPERTY_KEYS);
+        WHITE_KEY_SET.add(MessageConst.PROPERTY_TAGS);
+    }
+
     public WorkerSourceTask(String connectorName,
         SourceTask sourceTask,
         ConnectKeyValue taskConfig,
@@ -351,7 +363,11 @@ public class WorkerSourceTask implements WorkerTask {
             return;
         }
         for (String key : keySet) {
-            MessageAccessor.putProperty(sourceMessage, "connect-ext-" + key, extensionKeyValues.getString(key));
+            if (WHITE_KEY_SET.contains(key)) {
+                MessageAccessor.putProperty(sourceMessage, key, extensionKeyValues.getString(key));
+            } else {
+                MessageAccessor.putProperty(sourceMessage, "connect-ext-" + key, extensionKeyValues.getString(key));
+            }
         }
     }