You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/09/02 11:14:58 UTC
[camel-kafka-connector] 01/03: Added support for multiple topics on
the source connector side
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch multiple-topics
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit e6de1f11af649c9d47789c486d2c23b753fa5445
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Sep 2 12:35:26 2020 +0200
Added support for multiple topics on the source connector side
---
.../camel/kafkaconnector/CamelSourceTask.java | 92 ++++++++++------------
1 file changed, 43 insertions(+), 49 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 36aa96e..f48446f 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -22,6 +22,7 @@ import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
@@ -57,11 +58,11 @@ public class CamelSourceTask extends SourceTask {
private static final String LOCAL_URL = "direct:end";
-
private CamelMainSupport cms;
private CamelSourceConnectorConfig config;
private PollingConsumer consumer;
private String topic;
+ private List<String> topics;
private Long maxBatchPollSize;
private Long maxPollDuration;
private String camelMessageHeaderKey;
@@ -94,14 +95,15 @@ public class CamelSourceTask extends SourceTask {
dataformats.add(new CamelKafkaConnectDataformat(marshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
}
topic = config.getString(CamelSourceConnectorConfig.TOPIC_CONF);
+ topics = Arrays.asList(topic.split(","));
String localUrl = getLocalUrlWithPollingOptions(config);
CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
- remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(),
- actualProps, config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF),
- CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
+ remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(), actualProps,
+ config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
+ CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
}
cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, dataformats, 10, 500, camelContext);
@@ -123,48 +125,40 @@ public class CamelSourceTask extends SourceTask {
long collectedRecords = 0L;
List<SourceRecord> records = new ArrayList<>();
-
- while (collectedRecords < maxBatchPollSize
- && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
- Exchange exchange = consumer.receiveNoWait();
-
- if (exchange != null) {
- LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(),
- exchange.getMessage().getMessageId(), exchange.getFromEndpoint());
-
- // TODO: see if there is a better way to use sourcePartition an sourceOffset
- Map<String, String> sourcePartition = Collections.singletonMap("filename",
- exchange.getFromEndpoint().toString());
- Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
-
- final Object messageHeaderKey = camelMessageHeaderKey != null
- ? exchange.getMessage().getHeader(camelMessageHeaderKey)
- : null;
- final Object messageBodyValue = exchange.getMessage().getBody();
-
- final Schema messageKeySchema = messageHeaderKey != null
- ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey)
- : null;
- final Schema messageBodySchema = messageBodyValue != null
- ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue)
- : null;
-
- SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, messageKeySchema,
- messageHeaderKey, messageBodySchema, messageBodyValue);
- if (exchange.getMessage().hasHeaders()) {
- setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
+ while (collectedRecords < maxBatchPollSize && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
+ Exchange exchange = consumer.receiveNoWait();
+
+ if (exchange != null) {
+ LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(), exchange.getMessage().getMessageId(), exchange.getFromEndpoint());
+
+ // TODO: see if there is a better way to use sourcePartition
+ // an sourceOffset
+ Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
+ Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
+
+ final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
+ final Object messageBodyValue = exchange.getMessage().getBody();
+
+ final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
+ final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
+
+ for (String singleTopic : topics) {
+ SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, messageKeySchema, messageHeaderKey, messageBodySchema, messageBodyValue);
+ if (exchange.getMessage().hasHeaders()) {
+ setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
+ }
+ if (exchange.hasProperties()) {
+ setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
+ }
+
+ TaskHelper.logRecordContent(LOG, record, config);
+ records.add(record);
+ collectedRecords++;
+ }
+ } else {
+ break;
}
- if (exchange.hasProperties()) {
- setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
- }
-
- TaskHelper.logRecordContent(LOG, record, config);
- records.add(record);
- collectedRecords++;
- } else {
- break;
}
- }
if (records.isEmpty()) {
return Collections.EMPTY_LIST;
@@ -172,7 +166,6 @@ public class CamelSourceTask extends SourceTask {
return records;
}
-
}
@Override
@@ -189,8 +182,9 @@ public class CamelSourceTask extends SourceTask {
}
try {
/*
- If the CamelMainSupport instance fails to be instantiated (ie.: due to missing classes or similar
- issues) then it won't be assigned and de-referencing it could cause an NPE.
+ * If the CamelMainSupport instance fails to be instantiated (ie.:
+ * due to missing classes or similar issues) then it won't be
+ * assigned and de-referencing it could cause an NPE.
*/
if (cms != null) {
cms.stop();
@@ -264,8 +258,8 @@ public class CamelSourceTask extends SourceTask {
long pollingConsumerQueueSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF);
long pollingConsumerBlockTimeout = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF);
boolean pollingConsumerBlockWhenFull = config.getBoolean(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF);
- return LOCAL_URL + "?pollingConsumerQueueSize=" + pollingConsumerQueueSize + "&pollingConsumerBlockTimeout="
- + pollingConsumerBlockTimeout + "&pollingConsumerBlockWhenFull=" + pollingConsumerBlockWhenFull;
+ return LOCAL_URL + "?pollingConsumerQueueSize=" + pollingConsumerQueueSize + "&pollingConsumerBlockTimeout=" + pollingConsumerBlockTimeout
+ + "&pollingConsumerBlockWhenFull=" + pollingConsumerBlockWhenFull;
}
public CamelMainSupport getCms() {