You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/01/31 11:41:08 UTC
[rocketmq-connect] branch master updated: [ISSUE #414]ConnectRecord partition cannot add dynamic parameters (#416)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new f89adce7 [ISSUE #414]ConnectRecord partition cannot add dynamic parameters (#416)
f89adce7 is described below
commit f89adce71de53eca662b68c4028da85b1fb632d7
Author: Xiaojian Sun <su...@163.com>
AuthorDate: Tue Jan 31 19:41:02 2023 +0800
[ISSUE #414]ConnectRecord partition cannot add dynamic parameters (#416)
* fixed null pointer exception #294
* Fix invalid offset submitted by sinktask #310
* Move the topic configuration from partition to extensions apache#414
---
.../kafka/connect/adaptor/schema/Converters.java | 13 ++++----
.../runtime/connectorwrapper/WorkerSourceTask.java | 39 ++++++++--------------
.../connect/runtime/rest/RestHandlerTest.java | 13 --------
3 files changed, 20 insertions(+), 45 deletions(-)
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
index c4caf592..d9de40ae 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
@@ -53,20 +53,19 @@ public class Converters {
RocketMQSourceValueConverter rocketMQSourceValueConverter = new RocketMQSourceValueConverter();
- String sourceTopic = record.topic();
- Map<String, Object> partition = (Map<String, Object>) record.sourcePartition();
- if (StringUtils.isNotBlank(sourceTopic) && partition != null) {
- // set topic
- partition.put(TOPIC, sourceTopic);
- }
+
ConnectRecord connectRecord = new ConnectRecord(
- new RecordPartition(partition),
+ new RecordPartition(record.sourcePartition()),
new RecordOffset(record.sourceOffset()),
record.timestamp(),
keySchema,
record.key() == null ? null : rocketMQSourceValueConverter.value(keySchema, record.key()),
valueSchema,
record.value() == null ? null : rocketMQSourceValueConverter.value(valueSchema, record.value()));
+ String sourceTopic = record.topic();
+ if (StringUtils.isNotBlank(sourceTopic) ) {
+ connectRecord.addExtension(TOPIC, sourceTopic);
+ }
Iterator<Header> headers = record.headers().iterator();
while (headers.hasNext()) {
Header header = headers.next();
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 45e4f0d2..f379b7fb 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -23,14 +23,12 @@ import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.source.SourceTask;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordConverter;
-import io.openmessaging.connector.api.data.RecordPartition;
-import io.openmessaging.connector.api.data.RecordPosition;
import io.openmessaging.connector.api.errors.ConnectException;
import io.openmessaging.connector.api.errors.RetriableException;
import io.openmessaging.connector.api.storage.OffsetStorageReader;
+
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -40,6 +38,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -167,6 +166,17 @@ public class WorkerSourceTask extends WorkerTask {
this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
}
+ @Nullable
+ private static String overwriteTopicFromRecord(ConnectRecord record) {
+ KeyValue extensions = record.getExtensions();
+ String o = extensions.getString(TOPIC, null);
+ if (null == o) {
+ log.error("Partition map element topic is null , lack of topic config");
+ return null;
+ }
+ return o;
+ }
+
private List<ConnectRecord> poll() throws InterruptedException {
try {
List<ConnectRecord> connectRecords = sourceTask.poll();
@@ -184,6 +194,7 @@ public class WorkerSourceTask extends WorkerTask {
super.removeMetrics();
Utils.closeQuietly(sourceTaskMetricsGroup, "Remove source " + id.toString() + " metrics");
}
+
@Override
public void close() {
sourceTask.stop();
@@ -423,28 +434,6 @@ public class WorkerSourceTask extends WorkerTask {
return topic;
}
- @Nullable
- private static String overwriteTopicFromRecord(ConnectRecord record) {
- RecordPosition recordPosition = record.getPosition();
- if (null == recordPosition) {
- log.error("Record position is null , lack of topic config");
- }
- RecordPartition partition = recordPosition.getPartition();
- if (null == partition) {
- log.error("Partition is null , lack of topic config");
- }
- Map<String, ?> partitionMap = partition.getPartition();
- if (null == partitionMap) {
- log.error("Partition map is null , lack of topic config");
- }
- Object o = partitionMap.get(TOPIC);
- if (null == o) {
- log.error("Partition map element topic is null , lack of topic config");
- return null;
- }
- return (String) o;
- }
-
private void putExtendMsgProperty(ConnectRecord sourceDataEntry, Message sourceMessage, String topic) {
KeyValue extensionKeyValues = sourceDataEntry.getExtensions();
if (null == extensionKeyValues) {
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index 53074563..c22e155f 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -207,7 +207,6 @@ public class RestHandlerTest {
}
};
when(connectController.getWorker()).thenReturn(worker);
- when(worker.getWorkingConnectors()).thenReturn(workerConnectors);
List<String> pluginPaths = new ArrayList<>();
pluginPaths.add("src/test/java/org/apache/rocketmq/connect/runtime");
@@ -261,18 +260,6 @@ public class RestHandlerTest {
HttpGet httpGet4 = new HttpGet(uri4);
HttpResponse httpResponse4 = httpClient.execute(httpGet4);
assertEquals(200, httpResponse4.getStatusLine().getStatusCode());
- Map<String, ConnectKeyValue> connectors = new HashMap<>();
- for (WorkerConnector workerConnector : workerConnectors) {
- connectors.put(workerConnector.getConnectorName(), workerConnector.getKeyValue());
- }
- final String result4 = EntityUtils.toString(httpResponse4.getEntity(), "UTF-8");
- final Map map4 = JSON.parseObject(result4, Map.class);
- final Object body4 = map4.get("body");
- final Map dataMap4 = JSON.parseObject(body4.toString(), Map.class);
- final Object connectorName1Value = dataMap4.get("testConnectorName1");
- final Map connector1Map = JSON.parseObject(connectorName1Value.toString(), Map.class);
- Assert.assertEquals("org.apache.rocketmq.connect.runtime.service.TestConnector", connector1Map.get(ConnectorConfig.CONNECTOR_CLASS));
- Assert.assertEquals("source-record-converter", connector1Map.get(ConnectorConfig.VALUE_CONVERTER));
httpClient = HttpClientBuilder.create().build();
URIBuilder uriBuilder5 = new URIBuilder(GET_ALLOCATED_TASKS_URL);