You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/09/02 11:14:58 UTC

[camel-kafka-connector] 01/03: Added support for multiple topics on the source connector side

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch multiple-topics
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e6de1f11af649c9d47789c486d2c23b753fa5445
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Sep 2 12:35:26 2020 +0200

    Added support for multiple topics on the source connector side
---
 .../camel/kafkaconnector/CamelSourceTask.java      | 92 ++++++++++------------
 1 file changed, 43 insertions(+), 49 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 36aa96e..f48446f 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -22,6 +22,7 @@ import java.sql.Timestamp;
 import java.text.SimpleDateFormat;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.LinkedList;
@@ -57,11 +58,11 @@ public class CamelSourceTask extends SourceTask {
 
     private static final String LOCAL_URL = "direct:end";
 
-
     private CamelMainSupport cms;
     private CamelSourceConnectorConfig config;
     private PollingConsumer consumer;
     private String topic;
+    private List<String> topics;
     private Long maxBatchPollSize;
     private Long maxPollDuration;
     private String camelMessageHeaderKey;
@@ -94,14 +95,15 @@ public class CamelSourceTask extends SourceTask {
                 dataformats.add(new CamelKafkaConnectDataformat(marshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
             }
             topic = config.getString(CamelSourceConnectorConfig.TOPIC_CONF);
+            topics = Arrays.asList(topic.split(","));
 
             String localUrl = getLocalUrlWithPollingOptions(config);
 
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
-                remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(),
-                        actualProps, config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF),
-                        CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
+                remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(), actualProps,
+                                                config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
+                                                CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
             }
 
             cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, dataformats, 10, 500, camelContext);
@@ -123,48 +125,40 @@ public class CamelSourceTask extends SourceTask {
         long collectedRecords = 0L;
 
         List<SourceRecord> records = new ArrayList<>();
-
-        while (collectedRecords < maxBatchPollSize
-                && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
-            Exchange exchange = consumer.receiveNoWait();
-
-            if (exchange != null) {
-                LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(),
-                        exchange.getMessage().getMessageId(), exchange.getFromEndpoint());
-
-                // TODO: see if there is a better way to use sourcePartition an sourceOffset
-                Map<String, String> sourcePartition = Collections.singletonMap("filename",
-                        exchange.getFromEndpoint().toString());
-                Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
-
-                final Object messageHeaderKey = camelMessageHeaderKey != null
-                        ? exchange.getMessage().getHeader(camelMessageHeaderKey)
-                        : null;
-                final Object messageBodyValue = exchange.getMessage().getBody();
-
-                final Schema messageKeySchema = messageHeaderKey != null
-                        ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey)
-                        : null;
-                final Schema messageBodySchema = messageBodyValue != null
-                        ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue)
-                        : null;
-
-                SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, messageKeySchema,
-                        messageHeaderKey, messageBodySchema, messageBodyValue);
-                if (exchange.getMessage().hasHeaders()) {
-                    setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
+            while (collectedRecords < maxBatchPollSize && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
+                Exchange exchange = consumer.receiveNoWait();
+
+                if (exchange != null) {
+                    LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(), exchange.getMessage().getMessageId(), exchange.getFromEndpoint());
+
+                    // TODO: see if there is a better way to use sourcePartition
+                    // an sourceOffset
+                    Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
+                    Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
+
+                    final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
+                    final Object messageBodyValue = exchange.getMessage().getBody();
+
+                    final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
+                    final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
+
+                    for (String singleTopic : topics) {
+                    SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, messageKeySchema, messageHeaderKey, messageBodySchema, messageBodyValue);
+                    if (exchange.getMessage().hasHeaders()) {
+                        setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
+                    }
+                    if (exchange.hasProperties()) {
+                        setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
+                    }
+
+                    TaskHelper.logRecordContent(LOG, record, config);
+                    records.add(record);
+                    collectedRecords++;
+                    }
+                } else {
+                    break;
                 }
-                if (exchange.hasProperties()) {
-                    setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
-                }
-
-                TaskHelper.logRecordContent(LOG, record, config);
-                records.add(record);
-                collectedRecords++;
-            } else {
-                break;
             }
-        }
 
         if (records.isEmpty()) {
             return Collections.EMPTY_LIST;
@@ -172,7 +166,6 @@ public class CamelSourceTask extends SourceTask {
             return records;
         }
 
-
     }
 
     @Override
@@ -189,8 +182,9 @@ public class CamelSourceTask extends SourceTask {
         }
         try {
             /*
-              If the CamelMainSupport instance fails to be instantiated (ie.: due to missing classes or similar
-              issues) then it won't be assigned and de-referencing it could cause an NPE.
+             * If the CamelMainSupport instance fails to be instantiated (ie.:
+             * due to missing classes or similar issues) then it won't be
+             * assigned and de-referencing it could cause an NPE.
              */
             if (cms != null) {
                 cms.stop();
@@ -264,8 +258,8 @@ public class CamelSourceTask extends SourceTask {
         long pollingConsumerQueueSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF);
         long pollingConsumerBlockTimeout = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF);
         boolean pollingConsumerBlockWhenFull = config.getBoolean(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF);
-        return LOCAL_URL + "?pollingConsumerQueueSize=" + pollingConsumerQueueSize + "&pollingConsumerBlockTimeout="
-                + pollingConsumerBlockTimeout + "&pollingConsumerBlockWhenFull=" + pollingConsumerBlockWhenFull;
+        return LOCAL_URL + "?pollingConsumerQueueSize=" + pollingConsumerQueueSize + "&pollingConsumerBlockTimeout=" + pollingConsumerBlockTimeout
+               + "&pollingConsumerBlockWhenFull=" + pollingConsumerBlockWhenFull;
     }
 
     public CamelMainSupport getCms() {