You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:15 UTC

[rocketmq-connect] 05/39: return taskConfig with source-record-converter (#373)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 25e699e49745b43bb0c8c07bae29c630cdf73b63
Author: mike_xwm <mi...@126.com>
AuthorDate: Fri Aug 9 21:17:17 2019 +0800

    return taskConfig with source-record-converter (#373)
---
 src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java | 3 ++-
 .../java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java | 1 +
 2 files changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java b/src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java
index d8aa08a..fca4dcc 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java
+++ b/src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java
@@ -27,7 +27,8 @@ public enum TaskConfigEnum {
     TASK_BROKER_NAME("brokerName"),
     TASK_QUEUE_ID("queueId"),
     TASK_NEXT_POSITION("nextPosition"),
-    TASK_TOPIC_INFO("taskTopicList");
+    TASK_TOPIC_INFO("taskTopicList"),
+    TASK_SOURCE_RECORD_CONVERTER("source-record-converter");
 
     private String key;
 
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
index 53699a9..9b019f1 100644
--- a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
+++ b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
@@ -49,6 +49,7 @@ public class DivideTaskByTopic extends TaskDivideStrategy {
             keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr());
             keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.COMMON_MESSAGE.ordinal());
             keyValue.put(TaskConfigEnum.TASK_TOPIC_INFO.getKey(), JSONObject.toJSONString(taskTopicList.get(i)));
+            keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getSrcRecordConverter());
             config.add(keyValue);
         }