You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by sh...@apache.org on 2022/12/23 09:09:49 UTC

[rocketmq-eventbridge] 01/02: updateSinkConnectorRequest

This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git

commit 8e82b26b8a0d6ac648530afe25ac13739be83a3e
Author: Artisan <Am...@gmail.com>
AuthorDate: Thu Dec 22 22:50:56 2022 +0800

    updateSinkConnectorRequest
---
 .../adapter/rpc/impl/connect/dto/CreateSinkConnectorRequest.java   | 7 ++++---
 .../adapter/rpc/impl/connect/dto/CreateSourceConnectorRequest.java | 7 ++++---
 2 files changed, 8 insertions(+), 6 deletions(-)

diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSinkConnectorRequest.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSinkConnectorRequest.java
index 6edc0cc..6f5c468 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSinkConnectorRequest.java
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSinkConnectorRequest.java
@@ -53,8 +53,9 @@ public class CreateSinkConnectorRequest extends BaseConnectorRequest {
 
     public Map<String, Object> getRequestObject() {
         Map<String, Object> config = Maps.newHashMap();
-        config.put("connector-class", connectorClass);
-        config.put("connect-topicname", topicName);
+        config.put("connector.class", connectorClass);
+        config.put("connect.topicnames", topicName);
+        String sinPrefix = ".";
         config.put("transforms", String.join(",", transforms.stream()
             .map(TransformRequest::getName)
             .collect(Collectors.toList())));
@@ -62,7 +63,7 @@ public class CreateSinkConnectorRequest extends BaseConnectorRequest {
             transform.getConfig()
                 .entrySet()
                 .forEach(entry -> {
-                    config.put("transforms" + "-" + transform.getName() + "-" + entry.getKey(), entry.getValue());
+                    config.put("transforms" + sinPrefix + transform.getName() + sinPrefix + entry.getKey(), entry.getValue());
                 });
         });
         config.putAll(connectorConfig);
diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSourceConnectorRequest.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSourceConnectorRequest.java
index 40ffd38..65dd520 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSourceConnectorRequest.java
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/dto/CreateSourceConnectorRequest.java
@@ -53,8 +53,9 @@ public class CreateSourceConnectorRequest extends BaseConnectorRequest {
 
     public Map<String, Object> getRequestObject() {
         Map<String, Object> config = Maps.newHashMap();
-        config.put("connector-class", connectorClass);
-        config.put("connect-topicname", topicName);
+        config.put("connector.class", connectorClass);
+        config.put("connect.topicname", topicName);
+        String sourcePrefix = ".";
         config.put("transforms", String.join(",", transforms.stream()
             .map(TransformRequest::getName)
             .collect(Collectors.toList())));
@@ -62,7 +63,7 @@ public class CreateSourceConnectorRequest extends BaseConnectorRequest {
             transform.getConfig()
                 .entrySet()
                 .forEach(entry -> {
-                    config.put("transforms" + "-" + transform.getName() + "-" + entry.getKey(), entry.getValue());
+                    config.put("transforms" + sourcePrefix + transform.getName() + sourcePrefix + entry.getKey(), entry.getValue());
                 });
         });
         config.putAll(connectorConfig);