You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/05/11 07:59:06 UTC
[rocketmq-connect] branch master updated: [ISSUE #127]when sync tag or key properties of message, remove the 'connect-ext-' prefix (#128)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 2ea7062 [ISSUE #127]when sync tag or key properties of message, remove the 'connect-ext-' prefix (#128)
2ea7062 is described below
commit 2ea7062f268297c7ff166df41483ffe57566bd30
Author: zhangjidi2016 <zh...@cmss.chinamobile.com>
AuthorDate: Wed May 11 15:59:01 2022 +0800
[ISSUE #127]when sync tag or key properties of message, remove the 'connect-ext-' prefix (#128)
* [ISSUE #127]when sync tag or key properties of message, remove the 'connect-ext-' prefix
* static list of properties instead of enumerating
Co-authored-by: zhangjidi <zh...@cmss.chinamobile.com>
---
.../runtime/connectorwrapper/WorkerSourceTask.java | 18 +++++++++++++++++-
1 file changed, 17 insertions(+), 1 deletion(-)
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 36a10fd..955d770 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -32,6 +32,7 @@ import io.openmessaging.connector.api.errors.RetriableException;
import io.openmessaging.connector.api.storage.OffsetStorageReader;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -44,6 +45,7 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
@@ -112,6 +114,16 @@ public class WorkerSourceTask implements WorkerTask {
private TransformChain<ConnectRecord> transformChain;
+ /**
+ * The property of message in WHITE_KEY_SET don't need add a connect prefix
+ */
+ private static final Set<String> WHITE_KEY_SET = new HashSet<>();
+
+ static {
+ WHITE_KEY_SET.add(MessageConst.PROPERTY_KEYS);
+ WHITE_KEY_SET.add(MessageConst.PROPERTY_TAGS);
+ }
+
public WorkerSourceTask(String connectorName,
SourceTask sourceTask,
ConnectKeyValue taskConfig,
@@ -351,7 +363,11 @@ public class WorkerSourceTask implements WorkerTask {
return;
}
for (String key : keySet) {
- MessageAccessor.putProperty(sourceMessage, "connect-ext-" + key, extensionKeyValues.getString(key));
+ if (WHITE_KEY_SET.contains(key)) {
+ MessageAccessor.putProperty(sourceMessage, key, extensionKeyValues.getString(key));
+ } else {
+ MessageAccessor.putProperty(sourceMessage, "connect-ext-" + key, extensionKeyValues.getString(key));
+ }
}
}