You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/01/31 11:41:08 UTC

[rocketmq-connect] branch master updated: [ISSUE #414]ConnectRecord partition cannot add dynamic parameters (#416)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new f89adce7 [ISSUE #414]ConnectRecord partition cannot add dynamic parameters  (#416)
f89adce7 is described below

commit f89adce71de53eca662b68c4028da85b1fb632d7
Author: Xiaojian Sun <su...@163.com>
AuthorDate: Tue Jan 31 19:41:02 2023 +0800

    [ISSUE #414]ConnectRecord partition cannot add dynamic parameters  (#416)
    
    * fixed null pointer exception #294
    
    * Fix invalid offset submitted by sinktask #310
    
    * Move the topic configuration from partition to extensions apache#414
---
 .../kafka/connect/adaptor/schema/Converters.java   | 13 ++++----
 .../runtime/connectorwrapper/WorkerSourceTask.java | 39 ++++++++--------------
 .../connect/runtime/rest/RestHandlerTest.java      | 13 --------
 3 files changed, 20 insertions(+), 45 deletions(-)

diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
index c4caf592..d9de40ae 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
@@ -53,20 +53,19 @@ public class Converters {
 
 
         RocketMQSourceValueConverter rocketMQSourceValueConverter = new RocketMQSourceValueConverter();
-        String sourceTopic = record.topic();
-        Map<String, Object> partition = (Map<String, Object>) record.sourcePartition();
-        if (StringUtils.isNotBlank(sourceTopic) && partition != null) {
-            // set topic
-            partition.put(TOPIC, sourceTopic);
-        }
+
         ConnectRecord connectRecord = new ConnectRecord(
-                new RecordPartition(partition),
+                new RecordPartition(record.sourcePartition()),
                 new RecordOffset(record.sourceOffset()),
                 record.timestamp(),
                 keySchema,
                 record.key() == null ? null : rocketMQSourceValueConverter.value(keySchema, record.key()),
                 valueSchema,
                 record.value() == null ? null : rocketMQSourceValueConverter.value(valueSchema, record.value()));
+        String sourceTopic = record.topic();
+        if (StringUtils.isNotBlank(sourceTopic) ) {
+            connectRecord.addExtension(TOPIC, sourceTopic);
+        }
         Iterator<Header> headers = record.headers().iterator();
         while (headers.hasNext()) {
             Header header = headers.next();
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 45e4f0d2..f379b7fb 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -23,14 +23,12 @@ import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.source.SourceTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.RecordConverter;
-import io.openmessaging.connector.api.data.RecordPartition;
-import io.openmessaging.connector.api.data.RecordPosition;
 import io.openmessaging.connector.api.errors.ConnectException;
 import io.openmessaging.connector.api.errors.RetriableException;
 import io.openmessaging.connector.api.storage.OffsetStorageReader;
+
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -40,6 +38,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -167,6 +166,17 @@ public class WorkerSourceTask extends WorkerTask {
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
     }
 
+    @Nullable
+    private static String overwriteTopicFromRecord(ConnectRecord record) {
+        KeyValue extensions = record.getExtensions();
+        String o = extensions.getString(TOPIC, null);
+        if (null == o) {
+            log.error("Partition map element topic is null , lack of topic config");
+            return null;
+        }
+        return o;
+    }
+
     private List<ConnectRecord> poll() throws InterruptedException {
         try {
             List<ConnectRecord> connectRecords = sourceTask.poll();
@@ -184,6 +194,7 @@ public class WorkerSourceTask extends WorkerTask {
         super.removeMetrics();
         Utils.closeQuietly(sourceTaskMetricsGroup, "Remove source " + id.toString() + " metrics");
     }
+
     @Override
     public void close() {
         sourceTask.stop();
@@ -423,28 +434,6 @@ public class WorkerSourceTask extends WorkerTask {
         return topic;
     }
 
-    @Nullable
-    private static String overwriteTopicFromRecord(ConnectRecord record) {
-        RecordPosition recordPosition = record.getPosition();
-        if (null == recordPosition) {
-            log.error("Record position is null , lack of topic config");
-        }
-        RecordPartition partition = recordPosition.getPartition();
-        if (null == partition) {
-            log.error("Partition is null , lack of topic config");
-        }
-        Map<String, ?> partitionMap = partition.getPartition();
-        if (null == partitionMap) {
-            log.error("Partition map is null , lack of topic config");
-        }
-        Object o = partitionMap.get(TOPIC);
-        if (null == o) {
-            log.error("Partition map element topic is null , lack of topic config");
-            return null;
-        }
-        return (String) o;
-    }
-
     private void putExtendMsgProperty(ConnectRecord sourceDataEntry, Message sourceMessage, String topic) {
         KeyValue extensionKeyValues = sourceDataEntry.getExtensions();
         if (null == extensionKeyValues) {
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index 53074563..c22e155f 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -207,7 +207,6 @@ public class RestHandlerTest {
             }
         };
         when(connectController.getWorker()).thenReturn(worker);
-        when(worker.getWorkingConnectors()).thenReturn(workerConnectors);
 
         List<String> pluginPaths = new ArrayList<>();
         pluginPaths.add("src/test/java/org/apache/rocketmq/connect/runtime");
@@ -261,18 +260,6 @@ public class RestHandlerTest {
         HttpGet httpGet4 = new HttpGet(uri4);
         HttpResponse httpResponse4 = httpClient.execute(httpGet4);
         assertEquals(200, httpResponse4.getStatusLine().getStatusCode());
-        Map<String, ConnectKeyValue> connectors = new HashMap<>();
-        for (WorkerConnector workerConnector : workerConnectors) {
-            connectors.put(workerConnector.getConnectorName(), workerConnector.getKeyValue());
-        }
-        final String result4 = EntityUtils.toString(httpResponse4.getEntity(), "UTF-8");
-        final Map map4 = JSON.parseObject(result4, Map.class);
-        final Object body4 = map4.get("body");
-        final Map dataMap4 = JSON.parseObject(body4.toString(), Map.class);
-        final Object connectorName1Value = dataMap4.get("testConnectorName1");
-        final Map connector1Map = JSON.parseObject(connectorName1Value.toString(), Map.class);
-        Assert.assertEquals("org.apache.rocketmq.connect.runtime.service.TestConnector", connector1Map.get(ConnectorConfig.CONNECTOR_CLASS));
-        Assert.assertEquals("source-record-converter", connector1Map.get(ConnectorConfig.VALUE_CONVERTER));
 
         httpClient = HttpClientBuilder.create().build();
         URIBuilder uriBuilder5 = new URIBuilder(GET_ALLOCATED_TASKS_URL);