You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/26 08:48:38 UTC

[rocketmq-connect] branch master updated: [ISSUE #337]bug fix (#391)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 74f16616 [ISSUE #337]bug fix (#391)
74f16616 is described below

commit 74f16616f6381183aa4c7c4924bf98776bd83264
Author: sinrimin <as...@moesrc.com>
AuthorDate: Mon Dec 26 16:48:33 2022 +0800

    [ISSUE #337]bug fix (#391)
    
    * bug fix
    
    * [ISSUE #400] Create subscription group before use
---
 connectors/rocketmq-connect-http/pom.xml           |  2 +-
 .../connect/http/sink/HttpSinkConnector.java       | 20 ++++++------------
 .../rocketmq/connect/http/sink/HttpSinkTask.java   | 24 ++--------------------
 .../connect/http/sink/HttpSinkConnectorTest.java   |  2 +-
 .../runtime/connectorwrapper/WorkerSinkTask.java   | 14 ++++++++-----
 .../converter/record/json/JsonConverterConfig.java |  2 +-
 .../service/ConfigManagementServiceImpl.java       |  6 ++++++
 .../service/PositionManagementServiceImpl.java     |  6 ++++++
 .../service/StateManagementServiceImpl.java        |  6 ++++++
 9 files changed, 38 insertions(+), 44 deletions(-)

diff --git a/connectors/rocketmq-connect-http/pom.xml b/connectors/rocketmq-connect-http/pom.xml
index 7bbadfe0..ba8a8b85 100644
--- a/connectors/rocketmq-connect-http/pom.xml
+++ b/connectors/rocketmq-connect-http/pom.xml
@@ -18,7 +18,7 @@
         <junit.version>4.13.1</junit.version>
         <assertj.version>2.6.0</assertj.version>
         <mockito.version>2.6.3</mockito.version>
-        <openmessaging-connector.version>0.1.2-SNAPSHOT</openmessaging-connector.version>
+        <openmessaging-connector.version>0.1.4</openmessaging-connector.version>
         <okhttp.version>3.9.1</okhttp.version>
         <fastjson.version>1.2.83</fastjson.version>
         <commons-lang3.version>3.12.0</commons-lang3.version>
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnector.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnector.java
index 54c7efe5..5a9193fc 100644
--- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnector.java
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnector.java
@@ -14,23 +14,15 @@ import java.util.List;
 
 public class HttpSinkConnector extends SinkConnector {
 
-    private String url;
-
-    @Override
-    public void pause() {
-
-    }
-
-    @Override
-    public void resume() {
-
-    }
+    private KeyValue connectConfig;
 
     @Override
     public List<KeyValue> taskConfigs(int maxTasks) {
         List<KeyValue> keyValueList = new ArrayList<>(11);
         KeyValue keyValue = new DefaultKeyValue();
-        keyValue.put(HttpConstant.URL_CONSTANT, url);
+        for (String key : connectConfig.keySet()) {
+            keyValue.put(key, connectConfig.getString(key));
+        }
         keyValueList.add(keyValue);
         return keyValueList;
     }
@@ -56,8 +48,8 @@ public class HttpSinkConnector extends SinkConnector {
     }
 
     @Override
-    public void init(KeyValue config) {
-        url = config.getString(HttpConstant.URL_CONSTANT);
+    public void start(KeyValue config) {
+        this.connectConfig = config;
     }
 
     @Override
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java
index 603bafaf..f561b654 100644
--- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java
@@ -2,7 +2,6 @@ package org.apache.rocketmq.connect.http.sink;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
-import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.errors.ConnectException;
 import org.apache.rocketmq.connect.http.sink.common.OkHttpUtils;
@@ -31,27 +30,8 @@ public class HttpSinkTask extends SinkTask {
     }
 
     @Override
-    public void pause() {
-
-    }
-
-    @Override
-    public void resume() {
-
-    }
-
-    @Override
-    public void validate(KeyValue config) {
-    }
-
-    @Override
-    public void init(KeyValue config) {
-        url = config.getString(HttpConstant.URL_CONSTANT);
-    }
-
-    @Override
-    public void start(SinkTaskContext sinkTaskContext) {
-        super.start(sinkTaskContext);
+    public void start(KeyValue keyValue) {
+        url = keyValue.getString(HttpConstant.URL_CONSTANT);
     }
 
     @Override
diff --git a/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnectorTest.java b/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnectorTest.java
index d75e4454..ac9efead 100644
--- a/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnectorTest.java
+++ b/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnectorTest.java
@@ -24,7 +24,7 @@ public class HttpSinkConnectorTest {
         HttpSinkTask httpSinkTask = new HttpSinkTask();
         KeyValue keyValue = new DefaultKeyValue();
         keyValue.put(HttpConstant.URL_CONSTANT, "http://127.0.0.1:8081/demo");
-        httpSinkTask.init(keyValue);
+        httpSinkTask.start(keyValue);
         List<ConnectRecord> connectRecordList = new ArrayList<>();
         ConnectRecord connectRecord = new ConnectRecord(null ,null, System.currentTimeMillis());
         connectRecord.setData("test");
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 665ee9cb..ee135cbd 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -516,9 +516,8 @@ public class WorkerSinkTask extends WorkerTask {
     }
 
     private ConnectRecord convertMessages(MessageExt message) {
-        Map<String, String> properties = message.getProperties();
         // timestamp
-        String connectTimestamp = properties.get(ConnectorConfig.CONNECT_TIMESTAMP);
+        String connectTimestamp = message.getProperties().get(ConnectorConfig.CONNECT_TIMESTAMP);
         Long timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : message.getBornTimestamp();
 
         // partition and offset
@@ -558,11 +557,12 @@ public class WorkerSinkTask extends WorkerTask {
         }
 
         // add extension
-        addExtension(properties, record);
-        return record;
+        addExtension(message, transformedRecord);
+        return transformedRecord;
     }
 
-    private void addExtension(Map<String, String> properties, ConnectRecord sinkDataEntry) {
+    private void addExtension(MessageExt message, ConnectRecord sinkDataEntry) {
+        Map<String, String> properties = message.getProperties();
         KeyValue keyValue = new DefaultKeyValue();
         if (MapUtils.isNotEmpty(properties)) {
             for (Map.Entry<String, String> entry : properties.entrySet()) {
@@ -575,6 +575,10 @@ public class WorkerSinkTask extends WorkerTask {
                 }
             }
         }
+
+        // add msgId to extension
+        keyValue.put("MQ-SYS-MSG_ID", message.getMsgId());
+
         sinkDataEntry.addExtension(keyValue);
     }
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverterConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverterConfig.java
index 65df25eb..d3167b8f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverterConfig.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverterConfig.java
@@ -46,7 +46,7 @@ public class JsonConverterConfig {
     public JsonConverterConfig(Map<String, ?> props) {
         // schema.enabled
         if (props.containsKey(SCHEMAS_ENABLE_CONFIG)) {
-            this.schemasEnabled = (Boolean) props.get(SCHEMAS_ENABLE_CONFIG);
+            this.schemasEnabled = Boolean.parseBoolean(String.valueOf(props.get(SCHEMAS_ENABLE_CONFIG)));
         } else {
             this.schemasEnabled = SCHEMAS_ENABLE_DEFAULT;
         }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 9fc07534..b7aba212 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -199,6 +199,12 @@ public class ConfigManagementServiceImpl extends AbstractConfigManagementService
      * @param connectConfig
      */
     private void prepare(WorkerConfig connectConfig) {
+        String consumerGroup = ConnectUtil.createGroupName(configManagePrefix, connectConfig.getWorkerId());
+        Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
+        if (!consumerGroupSet.contains(consumerGroup)) {
+            log.info("try to create consumerGroup: {}!", consumerGroup);
+            ConnectUtil.createSubGroup(connectConfig, consumerGroup);
+        }
         String configStoreTopic = connectConfig.getConfigStoreTopic();
         if (!ConnectUtil.isTopicExist(connectConfig, configStoreTopic)) {
             log.info("try to create config store topic: {}!", configStoreTopic);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
index 0f3ac2fa..cb0af9b6 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
@@ -121,6 +121,12 @@ public class PositionManagementServiceImpl implements PositionManagementService
      * @param connectConfig
      */
     private void prepare(WorkerConfig connectConfig) {
+        String consumerGroup = ConnectUtil.createGroupName(positionManagePrefix, connectConfig.getWorkerId());
+        Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
+        if (!consumerGroupSet.contains(consumerGroup)) {
+            log.info("try to create consumerGroup: {}!", consumerGroup);
+            ConnectUtil.createSubGroup(connectConfig, consumerGroup);
+        }
         String positionStoreTopic = connectConfig.getPositionStoreTopic();
         if (!ConnectUtil.isTopicExist(connectConfig, positionStoreTopic)) {
             log.info("try to create position store topic: {}!", positionStoreTopic);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
index 74f0de72..b0b10f47 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
@@ -105,6 +105,12 @@ public class StateManagementServiceImpl implements StateManagementService {
      * @param connectConfig
      */
     private void prepare(WorkerConfig connectConfig) {
+        String consumerGroup = ConnectUtil.createGroupName(statusManagePrefix, connectConfig.getWorkerId());
+        Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
+        if (!consumerGroupSet.contains(consumerGroup)) {
+            log.info("try to create consumerGroup: {}!", consumerGroup);
+            ConnectUtil.createSubGroup(connectConfig, consumerGroup);
+        }
         String connectStatusTopic = connectConfig.getConnectStatusTopic();
         if (!ConnectUtil.isTopicExist(connectConfig, connectStatusTopic)) {
             log.info("try to create status topic: {}!", connectStatusTopic);