You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/26 08:48:38 UTC
[rocketmq-connect] branch master updated: [ISSUE #337]bug fix (#391)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 74f16616 [ISSUE #337]bug fix (#391)
74f16616 is described below
commit 74f16616f6381183aa4c7c4924bf98776bd83264
Author: sinrimin <as...@moesrc.com>
AuthorDate: Mon Dec 26 16:48:33 2022 +0800
[ISSUE #337]bug fix (#391)
* bug fix
* [ISSUE #400] Create subscription group before use
---
connectors/rocketmq-connect-http/pom.xml | 2 +-
.../connect/http/sink/HttpSinkConnector.java | 20 ++++++------------
.../rocketmq/connect/http/sink/HttpSinkTask.java | 24 ++--------------------
.../connect/http/sink/HttpSinkConnectorTest.java | 2 +-
.../runtime/connectorwrapper/WorkerSinkTask.java | 14 ++++++++-----
.../converter/record/json/JsonConverterConfig.java | 2 +-
.../service/ConfigManagementServiceImpl.java | 6 ++++++
.../service/PositionManagementServiceImpl.java | 6 ++++++
.../service/StateManagementServiceImpl.java | 6 ++++++
9 files changed, 38 insertions(+), 44 deletions(-)
diff --git a/connectors/rocketmq-connect-http/pom.xml b/connectors/rocketmq-connect-http/pom.xml
index 7bbadfe0..ba8a8b85 100644
--- a/connectors/rocketmq-connect-http/pom.xml
+++ b/connectors/rocketmq-connect-http/pom.xml
@@ -18,7 +18,7 @@
<junit.version>4.13.1</junit.version>
<assertj.version>2.6.0</assertj.version>
<mockito.version>2.6.3</mockito.version>
- <openmessaging-connector.version>0.1.2-SNAPSHOT</openmessaging-connector.version>
+ <openmessaging-connector.version>0.1.4</openmessaging-connector.version>
<okhttp.version>3.9.1</okhttp.version>
<fastjson.version>1.2.83</fastjson.version>
<commons-lang3.version>3.12.0</commons-lang3.version>
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnector.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnector.java
index 54c7efe5..5a9193fc 100644
--- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnector.java
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnector.java
@@ -14,23 +14,15 @@ import java.util.List;
public class HttpSinkConnector extends SinkConnector {
- private String url;
-
- @Override
- public void pause() {
-
- }
-
- @Override
- public void resume() {
-
- }
+ private KeyValue connectConfig;
@Override
public List<KeyValue> taskConfigs(int maxTasks) {
List<KeyValue> keyValueList = new ArrayList<>(11);
KeyValue keyValue = new DefaultKeyValue();
- keyValue.put(HttpConstant.URL_CONSTANT, url);
+ for (String key : connectConfig.keySet()) {
+ keyValue.put(key, connectConfig.getString(key));
+ }
keyValueList.add(keyValue);
return keyValueList;
}
@@ -56,8 +48,8 @@ public class HttpSinkConnector extends SinkConnector {
}
@Override
- public void init(KeyValue config) {
- url = config.getString(HttpConstant.URL_CONSTANT);
+ public void start(KeyValue config) {
+ this.connectConfig = config;
}
@Override
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java
index 603bafaf..f561b654 100644
--- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java
@@ -2,7 +2,6 @@ package org.apache.rocketmq.connect.http.sink;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
-import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.errors.ConnectException;
import org.apache.rocketmq.connect.http.sink.common.OkHttpUtils;
@@ -31,27 +30,8 @@ public class HttpSinkTask extends SinkTask {
}
@Override
- public void pause() {
-
- }
-
- @Override
- public void resume() {
-
- }
-
- @Override
- public void validate(KeyValue config) {
- }
-
- @Override
- public void init(KeyValue config) {
- url = config.getString(HttpConstant.URL_CONSTANT);
- }
-
- @Override
- public void start(SinkTaskContext sinkTaskContext) {
- super.start(sinkTaskContext);
+ public void start(KeyValue keyValue) {
+ url = keyValue.getString(HttpConstant.URL_CONSTANT);
}
@Override
diff --git a/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnectorTest.java b/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnectorTest.java
index d75e4454..ac9efead 100644
--- a/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnectorTest.java
+++ b/connectors/rocketmq-connect-http/src/test/java/org/apache/rocketmq/connect/http/sink/HttpSinkConnectorTest.java
@@ -24,7 +24,7 @@ public class HttpSinkConnectorTest {
HttpSinkTask httpSinkTask = new HttpSinkTask();
KeyValue keyValue = new DefaultKeyValue();
keyValue.put(HttpConstant.URL_CONSTANT, "http://127.0.0.1:8081/demo");
- httpSinkTask.init(keyValue);
+ httpSinkTask.start(keyValue);
List<ConnectRecord> connectRecordList = new ArrayList<>();
ConnectRecord connectRecord = new ConnectRecord(null ,null, System.currentTimeMillis());
connectRecord.setData("test");
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 665ee9cb..ee135cbd 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -516,9 +516,8 @@ public class WorkerSinkTask extends WorkerTask {
}
private ConnectRecord convertMessages(MessageExt message) {
- Map<String, String> properties = message.getProperties();
// timestamp
- String connectTimestamp = properties.get(ConnectorConfig.CONNECT_TIMESTAMP);
+ String connectTimestamp = message.getProperties().get(ConnectorConfig.CONNECT_TIMESTAMP);
Long timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : message.getBornTimestamp();
// partition and offset
@@ -558,11 +557,12 @@ public class WorkerSinkTask extends WorkerTask {
}
// add extension
- addExtension(properties, record);
- return record;
+ addExtension(message, transformedRecord);
+ return transformedRecord;
}
- private void addExtension(Map<String, String> properties, ConnectRecord sinkDataEntry) {
+ private void addExtension(MessageExt message, ConnectRecord sinkDataEntry) {
+ Map<String, String> properties = message.getProperties();
KeyValue keyValue = new DefaultKeyValue();
if (MapUtils.isNotEmpty(properties)) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
@@ -575,6 +575,10 @@ public class WorkerSinkTask extends WorkerTask {
}
}
}
+
+ // add msgId to extension
+ keyValue.put("MQ-SYS-MSG_ID", message.getMsgId());
+
sinkDataEntry.addExtension(keyValue);
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverterConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverterConfig.java
index 65df25eb..d3167b8f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverterConfig.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverterConfig.java
@@ -46,7 +46,7 @@ public class JsonConverterConfig {
public JsonConverterConfig(Map<String, ?> props) {
// schema.enabled
if (props.containsKey(SCHEMAS_ENABLE_CONFIG)) {
- this.schemasEnabled = (Boolean) props.get(SCHEMAS_ENABLE_CONFIG);
+ this.schemasEnabled = Boolean.parseBoolean(String.valueOf(props.get(SCHEMAS_ENABLE_CONFIG)));
} else {
this.schemasEnabled = SCHEMAS_ENABLE_DEFAULT;
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 9fc07534..b7aba212 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -199,6 +199,12 @@ public class ConfigManagementServiceImpl extends AbstractConfigManagementService
* @param connectConfig
*/
private void prepare(WorkerConfig connectConfig) {
+ String consumerGroup = ConnectUtil.createGroupName(configManagePrefix, connectConfig.getWorkerId());
+ Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
+ if (!consumerGroupSet.contains(consumerGroup)) {
+ log.info("try to create consumerGroup: {}!", consumerGroup);
+ ConnectUtil.createSubGroup(connectConfig, consumerGroup);
+ }
String configStoreTopic = connectConfig.getConfigStoreTopic();
if (!ConnectUtil.isTopicExist(connectConfig, configStoreTopic)) {
log.info("try to create config store topic: {}!", configStoreTopic);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
index 0f3ac2fa..cb0af9b6 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
@@ -121,6 +121,12 @@ public class PositionManagementServiceImpl implements PositionManagementService
* @param connectConfig
*/
private void prepare(WorkerConfig connectConfig) {
+ String consumerGroup = ConnectUtil.createGroupName(positionManagePrefix, connectConfig.getWorkerId());
+ Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
+ if (!consumerGroupSet.contains(consumerGroup)) {
+ log.info("try to create consumerGroup: {}!", consumerGroup);
+ ConnectUtil.createSubGroup(connectConfig, consumerGroup);
+ }
String positionStoreTopic = connectConfig.getPositionStoreTopic();
if (!ConnectUtil.isTopicExist(connectConfig, positionStoreTopic)) {
log.info("try to create position store topic: {}!", positionStoreTopic);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
index 74f0de72..b0b10f47 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
@@ -105,6 +105,12 @@ public class StateManagementServiceImpl implements StateManagementService {
* @param connectConfig
*/
private void prepare(WorkerConfig connectConfig) {
+ String consumerGroup = ConnectUtil.createGroupName(statusManagePrefix, connectConfig.getWorkerId());
+ Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
+ if (!consumerGroupSet.contains(consumerGroup)) {
+ log.info("try to create consumerGroup: {}!", consumerGroup);
+ ConnectUtil.createSubGroup(connectConfig, consumerGroup);
+ }
String connectStatusTopic = connectConfig.getConnectStatusTopic();
if (!ConnectUtil.isTopicExist(connectConfig, connectStatusTopic)) {
log.info("try to create status topic: {}!", connectStatusTopic);