You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/17 12:16:17 UTC
(camel) branch main updated: CAMEL-16064: preliminary refactoring of camel-kafka (#12820)
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 8232b52fa89 CAMEL-16064: preliminary refactoring of camel-kafka (#12820)
8232b52fa89 is described below
commit 8232b52fa8944f286283c32c97f86cefc81265b4
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Wed Jan 17 13:16:11 2024 +0100
CAMEL-16064: preliminary refactoring of camel-kafka (#12820)
This reorganizes streaming-related processing code to simplify the implementation of the batch processing code
---
.../camel/component/kafka/KafkaFetchRecords.java | 3 +-
.../AbstractKafkaRecordProcessorFacade.java | 90 ++++++++++++
.../consumer/support/KafkaRecordProcessor.java | 159 +--------------------
.../support/KafkaRecordProcessorFacade.java | 142 ++----------------
.../kafka/consumer/support/ProcessingResult.java | 11 +-
.../classic/OffsetPartitionAssignmentAdapter.java | 11 +-
.../KafkaRecordStreamingProcessor.java} | 73 +++++-----
.../KafkaRecordStreamingProcessorFacade.java} | 108 ++++++--------
.../kafka/KafkaIdempotentRepository.java | 4 +-
9 files changed, 208 insertions(+), 393 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 2c32e727a50..f0e84f4e41b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -31,6 +31,7 @@ import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFac
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.camel.component.kafka.consumer.support.classic.ClassicRebalanceListener;
import org.apache.camel.component.kafka.consumer.support.resume.ResumeRebalanceListener;
+import org.apache.camel.component.kafka.consumer.support.streaming.KafkaRecordStreamingProcessorFacade;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.support.task.ForegroundTask;
import org.apache.camel.support.task.Tasks;
@@ -320,7 +321,7 @@ public class KafkaFetchRecords implements Runnable {
LOG.trace("Polling {} from {} with timeout: {}", threadId, getPrintableTopic(), pollTimeoutMs);
}
- KafkaRecordProcessorFacade recordProcessorFacade = new KafkaRecordProcessorFacade(
+ KafkaRecordProcessorFacade recordProcessorFacade = new KafkaRecordStreamingProcessorFacade(
kafkaConsumer, threadId, commitManager, consumerListener);
while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && pollExceptionStrategy.canContinue()) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/AbstractKafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/AbstractKafkaRecordProcessorFacade.java
new file mode 100644
index 00000000000..96706a6a6a2
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/AbstractKafkaRecordProcessorFacade.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.support;
+
+import java.util.List;
+
+import org.apache.camel.component.kafka.KafkaConsumer;
+import org.apache.camel.component.kafka.consumer.CommitManager;
+import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Common code for processing consumer records retrieved from Kafka
+ */
+public abstract class AbstractKafkaRecordProcessorFacade implements KafkaRecordProcessorFacade {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaRecordProcessorFacade.class);
+ protected final KafkaConsumer camelKafkaConsumer;
+ protected final String threadId;
+ protected final CommitManager commitManager;
+ protected final KafkaConsumerListener consumerListener;
+
+ protected AbstractKafkaRecordProcessorFacade(
+ KafkaConsumer camelKafkaConsumer, String threadId, CommitManager commitManager,
+ KafkaConsumerListener consumerListener) {
+ this.camelKafkaConsumer = camelKafkaConsumer;
+ this.threadId = threadId;
+ this.commitManager = commitManager;
+ this.consumerListener = consumerListener;
+ }
+
+ /**
+ * Whether the the Camel consumer is stopping
+ *
+ * @return true if is stopping or false otherwise
+ */
+ protected boolean isStopping() {
+ return camelKafkaConsumer.isStopping();
+ }
+
+ /**
+ * Utility to log record information along with partition
+ *
+ * @param partitionRecords records from partition
+ * @param partition topic/partition information
+ */
+ protected void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Records count {} received for partition {}", partitionRecords.size(),
+ partition);
+ }
+ }
+
+ /**
+ * Utility to log record information
+ *
+ * @param allRecords records retrieved from Kafka
+ */
+ protected void logRecords(ConsumerRecords<Object, Object> allRecords) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Last poll on thread {} resulted on {} records to process", threadId, allRecords.count());
+ }
+ }
+
+ protected void logRecord(ConsumerRecord<Object, Object> consumerRecord) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", consumerRecord.partition(),
+ consumerRecord.offset(), consumerRecord.key(), consumerRecord.value());
+ }
+ }
+
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index b06e53493e6..121c250b815 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -14,161 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.kafka.consumer.support;
-import java.util.stream.StreamSupport;
+package org.apache.camel.component.kafka.consumer.support;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.Processor;
-import org.apache.camel.component.kafka.KafkaConfiguration;
-import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.component.kafka.consumer.CommitManager;
-import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
-import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
-import org.apache.camel.spi.ExceptionHandler;
-import org.apache.camel.spi.HeaderFilterStrategy;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.header.Header;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaRecordProcessor {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessor.class);
-
- private final boolean autoCommitEnabled;
- private final KafkaConfiguration configuration;
- private final Processor processor;
- private final CommitManager commitManager;
-
- public KafkaRecordProcessor(KafkaConfiguration configuration, Processor processor, CommitManager commitManager) {
- this.autoCommitEnabled = configuration.isAutoCommitEnable();
- this.configuration = configuration;
- this.processor = processor;
- this.commitManager = commitManager;
- }
-
- private void setupExchangeMessage(Message message, ConsumerRecord record) {
- message.setHeader(KafkaConstants.PARTITION, record.partition());
- message.setHeader(KafkaConstants.TOPIC, record.topic());
- message.setHeader(KafkaConstants.OFFSET, record.offset());
- message.setHeader(KafkaConstants.HEADERS, record.headers());
- message.setHeader(KafkaConstants.TIMESTAMP, record.timestamp());
- message.setHeader(Exchange.MESSAGE_TIMESTAMP, record.timestamp());
-
- if (record.key() != null) {
- message.setHeader(KafkaConstants.KEY, record.key());
- }
-
- LOG.debug("Setting up the exchange for message from partition {} and offset {}",
- record.partition(), record.offset());
-
- message.setBody(record.value());
- }
-
- private boolean shouldBeFiltered(Header header, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
- return !headerFilterStrategy.applyFilterToExternalHeaders(header.key(), header.value(), exchange);
- }
-
- private void propagateHeaders(ConsumerRecord<Object, Object> record, Exchange exchange) {
-
- HeaderFilterStrategy headerFilterStrategy = configuration.getHeaderFilterStrategy();
- KafkaHeaderDeserializer headerDeserializer = configuration.getHeaderDeserializer();
-
- StreamSupport.stream(record.headers().spliterator(), false)
- .filter(header -> shouldBeFiltered(header, exchange, headerFilterStrategy))
- .forEach(header -> exchange.getIn().setHeader(header.key(),
- headerDeserializer.deserialize(header.key(), header.value())));
- }
-
- public ProcessingResult processExchange(
- Exchange exchange, TopicPartition topicPartition, boolean partitionHasNext,
- boolean recordHasNext, ConsumerRecord<Object, Object> record, ExceptionHandler exceptionHandler) {
-
- Message message = exchange.getMessage();
-
- setupExchangeMessage(message, record);
-
- propagateHeaders(record, exchange);
-
- // if not auto commit then we have additional information on the exchange
- if (!autoCommitEnabled) {
- message.setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, !recordHasNext);
- message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext && !partitionHasNext);
- }
-
- if (configuration.isAllowManualCommit()) {
- // allow Camel users to access the Kafka consumer API to be able to do for example manual commits
- KafkaManualCommit manual = commitManager.getManualCommit(exchange, topicPartition, record);
-
- message.setHeader(KafkaConstants.MANUAL_COMMIT, manual);
- message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext && !partitionHasNext);
- }
-
- try {
- processor.process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
-
- ProcessingResult result = ProcessingResult.newUnprocessed();
- if (exchange.getException() != null) {
- LOG.debug("An exception was thrown for record at partition {} and offset {}",
- record.partition(), record.offset());
-
- boolean breakOnErrorExit = processException(exchange, topicPartition, record, exceptionHandler);
- result = new ProcessingResult(breakOnErrorExit, true);
- } else {
- result = new ProcessingResult(false, exchange.getException() != null);
- }
-
- if (!result.isBreakOnErrorHit()) {
- commitManager.recordOffset(topicPartition, record.offset());
- }
-
- return result;
- }
-
- private boolean processException(
- Exchange exchange, TopicPartition topicPartition,
- ConsumerRecord<Object, Object> consumerRecord, ExceptionHandler exceptionHandler) {
-
- // processing failed due to an unhandled exception, what should we do
- if (configuration.isBreakOnFirstError()) {
- // we are failing and we should break out
- if (LOG.isWarnEnabled()) {
- Exception exc = exchange.getException();
- LOG.warn("Error during processing {} from topic: {} due to {}", exchange, topicPartition.topic(),
- exc.getMessage());
- LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.",
- consumerRecord.offset(), consumerRecord.partition());
- }
-
- // we should just do a commit (vs the original forceCommit)
- // when route uses NOOP Commit Manager it will rely
- // on the route implementation to explicitly commit offset
- // when route uses Synch/Asynch Commit Manager it will
- // ALWAYS commit the offset for the failing record
- // and will ALWAYS retry it
- commitManager.commit(topicPartition);
-
- // continue to next partition
- return true;
- } else {
- // will handle/log the exception and then continue to next
- exceptionHandler.handleException("Error during processing", exchange, exchange.getException());
- }
-
- return false;
- }
-
- public static String serializeOffsetKey(TopicPartition topicPartition) {
- return topicPartition.topic() + '/' + topicPartition.partition();
- }
- public static long deserializeOffsetValue(String offset) {
- return Long.parseLong(offset);
- }
+public interface KafkaRecordProcessor<T> {
+ ProcessingResult processExchange(
+ KafkaConsumer camelKafkaConsumer, TopicPartition topicPartition, boolean partitionHasNext,
+ boolean recordHasNext, T processable);
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index 8965a1037c4..41a747b0ef9 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -17,137 +17,17 @@
package org.apache.camel.component.kafka.consumer.support;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.component.kafka.KafkaConsumer;
-import org.apache.camel.component.kafka.consumer.CommitManager;
-import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaRecordProcessorFacade {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessorFacade.class);
-
- private final KafkaConsumer camelKafkaConsumer;
- private final String threadId;
- private final KafkaRecordProcessor kafkaRecordProcessor;
- private final CommitManager commitManager;
- private final KafkaConsumerListener consumerListener;
-
- public KafkaRecordProcessorFacade(KafkaConsumer camelKafkaConsumer, String threadId,
- CommitManager commitManager, KafkaConsumerListener consumerListener) {
- this.camelKafkaConsumer = camelKafkaConsumer;
- this.threadId = threadId;
- this.commitManager = commitManager;
-
- kafkaRecordProcessor = buildKafkaRecordProcessor(commitManager);
- this.consumerListener = consumerListener;
- }
-
- private boolean isStopping() {
- return camelKafkaConsumer.isStopping();
- }
-
- public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) {
- logRecords(allRecords);
-
- ProcessingResult result = ProcessingResult.newUnprocessed();
-
- Set<TopicPartition> partitions = allRecords.partitions();
- Iterator<TopicPartition> partitionIterator = partitions.iterator();
-
- LOG.debug("Poll received records on {} partitions", partitions.size());
-
- while (partitionIterator.hasNext() && !isStopping()) {
- TopicPartition partition = partitionIterator.next();
-
- LOG.debug("Processing records on partition {}", partition.partition());
-
- List<ConsumerRecord<Object, Object>> partitionRecords = allRecords.records(partition);
- Iterator<ConsumerRecord<Object, Object>> recordIterator = partitionRecords.iterator();
-
- logRecordsInPartition(partitionRecords, partition);
-
- while (!result.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) {
- ConsumerRecord<Object, Object> consumerRecord = recordIterator.next();
-
- LOG.debug("Processing record on partition {} with offset {}", consumerRecord.partition(),
- consumerRecord.offset());
- result = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(),
- kafkaRecordProcessor, consumerRecord);
-
- LOG.debug("Processed record on partition {} with offset {}", consumerRecord.partition(),
- consumerRecord.offset());
-
- if (consumerListener != null) {
- if (!consumerListener.afterProcess(result)) {
- commitManager.commit(partition);
- return result;
- }
- }
- }
-
- if (!result.isBreakOnErrorHit()) {
- LOG.debug("Committing offset on successful execution");
- // all records processed from partition so commit them
- commitManager.commit(partition);
- }
- }
-
- return result;
- }
-
- private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Records count {} received for partition {}", partitionRecords.size(),
- partition);
- }
- }
-
- private void logRecords(ConsumerRecords<Object, Object> allRecords) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Last poll on thread {} resulted on {} records to process", threadId, allRecords.count());
- }
- }
-
- private ProcessingResult processRecord(
- TopicPartition partition,
- boolean partitionHasNext,
- boolean recordHasNext,
- KafkaRecordProcessor kafkaRecordProcessor,
- ConsumerRecord<Object, Object> consumerRecord) {
-
- logRecord(consumerRecord);
-
- Exchange exchange = camelKafkaConsumer.createExchange(false);
-
- ProcessingResult result = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext,
- recordHasNext, consumerRecord, camelKafkaConsumer.getExceptionHandler());
-
- // success so release the exchange
- camelKafkaConsumer.releaseExchange(exchange, false);
-
- return result;
- }
-
- private void logRecord(ConsumerRecord<Object, Object> consumerRecord) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", consumerRecord.partition(),
- consumerRecord.offset(), consumerRecord.key(), consumerRecord.value());
- }
- }
-
- private KafkaRecordProcessor buildKafkaRecordProcessor(CommitManager commitManager) {
- return new KafkaRecordProcessor(
- camelKafkaConsumer.getEndpoint().getConfiguration(),
- camelKafkaConsumer.getProcessor(),
- commitManager);
- }
+/**
+ * A processing facade that allows processing consumer records in different ways
+ */
+public interface KafkaRecordProcessorFacade {
+ /**
+ * Sends a set of records polled from Kafka for processing
+ *
+ * @param allRecords All records received from a call to the Kafka's consumer poll method
+ * @return The result of processing this set of records
+ */
+ ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords);
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
index 1728616cf9e..4be6b8ba7eb 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
@@ -17,6 +17,9 @@
package org.apache.camel.component.kafka.consumer.support;
+/**
+ * Holds the result of processing one or more consumer records
+ */
public final class ProcessingResult {
private static final ProcessingResult UNPROCESSED_RESULT
= new ProcessingResult(false, false);
@@ -24,7 +27,13 @@ public final class ProcessingResult {
private final boolean breakOnErrorHit;
private final boolean failed;
- ProcessingResult(boolean breakOnErrorHit, boolean failed) {
+ /**
+ * Constructs a new processing result
+ *
+ * @param breakOnErrorHit break on error hit setting
+ * @param failed whether processing has failed
+ */
+ public ProcessingResult(boolean breakOnErrorHit, boolean failed) {
this.breakOnErrorHit = breakOnErrorHit;
this.failed = failed;
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java
index 4227ad7f004..a851659b815 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java
@@ -24,9 +24,6 @@ import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.deserializeOffsetValue;
-import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.serializeOffsetKey;
-
/**
* A resume strategy that uses Kafka's offset for resuming
*/
@@ -62,4 +59,12 @@ public class OffsetPartitionAssignmentAdapter implements PartitionAssignmentAdap
}
}
}
+
+ public static String serializeOffsetKey(TopicPartition topicPartition) {
+ return topicPartition.topic() + '/' + topicPartition.partition();
+ }
+
+ public static long deserializeOffsetValue(String offset) {
+ return Long.parseLong(offset);
+ }
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java
similarity index 71%
copy from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
copy to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java
index b06e53493e6..2cf0ca267a3 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.kafka.consumer.support.streaming;
import java.util.stream.StreamSupport;
@@ -23,8 +23,11 @@ import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
+import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.HeaderFilterStrategy;
@@ -34,64 +37,67 @@ import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class KafkaRecordProcessor {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessor.class);
+final class KafkaRecordStreamingProcessor implements KafkaRecordProcessor<ConsumerRecord<Object, Object>> {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordStreamingProcessor.class);
private final boolean autoCommitEnabled;
private final KafkaConfiguration configuration;
private final Processor processor;
private final CommitManager commitManager;
- public KafkaRecordProcessor(KafkaConfiguration configuration, Processor processor, CommitManager commitManager) {
+ public KafkaRecordStreamingProcessor(KafkaConfiguration configuration, Processor processor, CommitManager commitManager) {
this.autoCommitEnabled = configuration.isAutoCommitEnable();
this.configuration = configuration;
this.processor = processor;
this.commitManager = commitManager;
}
- private void setupExchangeMessage(Message message, ConsumerRecord record) {
- message.setHeader(KafkaConstants.PARTITION, record.partition());
- message.setHeader(KafkaConstants.TOPIC, record.topic());
- message.setHeader(KafkaConstants.OFFSET, record.offset());
- message.setHeader(KafkaConstants.HEADERS, record.headers());
- message.setHeader(KafkaConstants.TIMESTAMP, record.timestamp());
- message.setHeader(Exchange.MESSAGE_TIMESTAMP, record.timestamp());
+ private void setupExchangeMessage(Message message, ConsumerRecord<?, ?> consumerRecord) {
+ message.setHeader(KafkaConstants.PARTITION, consumerRecord.partition());
+ message.setHeader(KafkaConstants.TOPIC, consumerRecord.topic());
+ message.setHeader(KafkaConstants.OFFSET, consumerRecord.offset());
+ message.setHeader(KafkaConstants.HEADERS, consumerRecord.headers());
+ message.setHeader(KafkaConstants.TIMESTAMP, consumerRecord.timestamp());
+ message.setHeader(Exchange.MESSAGE_TIMESTAMP, consumerRecord.timestamp());
- if (record.key() != null) {
- message.setHeader(KafkaConstants.KEY, record.key());
+ if (consumerRecord.key() != null) {
+ message.setHeader(KafkaConstants.KEY, consumerRecord.key());
}
LOG.debug("Setting up the exchange for message from partition {} and offset {}",
- record.partition(), record.offset());
+ consumerRecord.partition(), consumerRecord.offset());
- message.setBody(record.value());
+ message.setBody(consumerRecord.value());
}
private boolean shouldBeFiltered(Header header, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
return !headerFilterStrategy.applyFilterToExternalHeaders(header.key(), header.value(), exchange);
}
- private void propagateHeaders(ConsumerRecord<Object, Object> record, Exchange exchange) {
+ private void propagateHeaders(ConsumerRecord<Object, Object> consumerRecord, Exchange exchange) {
HeaderFilterStrategy headerFilterStrategy = configuration.getHeaderFilterStrategy();
KafkaHeaderDeserializer headerDeserializer = configuration.getHeaderDeserializer();
- StreamSupport.stream(record.headers().spliterator(), false)
+ StreamSupport.stream(consumerRecord.headers().spliterator(), false)
.filter(header -> shouldBeFiltered(header, exchange, headerFilterStrategy))
.forEach(header -> exchange.getIn().setHeader(header.key(),
headerDeserializer.deserialize(header.key(), header.value())));
}
+ @Override
public ProcessingResult processExchange(
- Exchange exchange, TopicPartition topicPartition, boolean partitionHasNext,
- boolean recordHasNext, ConsumerRecord<Object, Object> record, ExceptionHandler exceptionHandler) {
+ KafkaConsumer camelKafkaConsumer, TopicPartition topicPartition, boolean partitionHasNext,
+ boolean recordHasNext, ConsumerRecord<Object, Object> consumerRecord) {
+
+ final Exchange exchange = camelKafkaConsumer.createExchange(false);
+ final ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler();
Message message = exchange.getMessage();
- setupExchangeMessage(message, record);
+ setupExchangeMessage(message, consumerRecord);
- propagateHeaders(record, exchange);
+ propagateHeaders(consumerRecord, exchange);
// if not auto commit then we have additional information on the exchange
if (!autoCommitEnabled) {
@@ -101,7 +107,7 @@ public class KafkaRecordProcessor {
if (configuration.isAllowManualCommit()) {
// allow Camel users to access the Kafka consumer API to be able to do for example manual commits
- KafkaManualCommit manual = commitManager.getManualCommit(exchange, topicPartition, record);
+ KafkaManualCommit manual = commitManager.getManualCommit(exchange, topicPartition, consumerRecord);
message.setHeader(KafkaConstants.MANUAL_COMMIT, manual);
message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext && !partitionHasNext);
@@ -113,21 +119,24 @@ public class KafkaRecordProcessor {
exchange.setException(e);
}
- ProcessingResult result = ProcessingResult.newUnprocessed();
+ ProcessingResult result;
if (exchange.getException() != null) {
- LOG.debug("An exception was thrown for record at partition {} and offset {}",
- record.partition(), record.offset());
+ LOG.debug("An exception was thrown for consumerRecord at partition {} and offset {}",
+ consumerRecord.partition(), consumerRecord.offset());
- boolean breakOnErrorExit = processException(exchange, topicPartition, record, exceptionHandler);
+ boolean breakOnErrorExit = processException(exchange, topicPartition, consumerRecord, exceptionHandler);
result = new ProcessingResult(breakOnErrorExit, true);
} else {
result = new ProcessingResult(false, exchange.getException() != null);
}
if (!result.isBreakOnErrorHit()) {
- commitManager.recordOffset(topicPartition, record.offset());
+ commitManager.recordOffset(topicPartition, consumerRecord.offset());
}
+ // success so release the exchange
+ camelKafkaConsumer.releaseExchange(exchange, false);
+
return result;
}
@@ -163,12 +172,4 @@ public class KafkaRecordProcessor {
return false;
}
-
- public static String serializeOffsetKey(TopicPartition topicPartition) {
- return topicPartition.topic() + '/' + topicPartition.partition();
- }
-
- public static long deserializeOffsetValue(String offset) {
- return Long.parseLong(offset);
- }
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessorFacade.java
similarity index 59%
copy from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
copy to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessorFacade.java
index 8965a1037c4..0e6aa6ace08 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessorFacade.java
@@ -15,45 +15,65 @@
* limitations under the License.
*/
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.kafka.consumer.support.streaming;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import org.apache.camel.Exchange;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
+import org.apache.camel.component.kafka.consumer.support.AbstractKafkaRecordProcessorFacade;
+import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class KafkaRecordProcessorFacade {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessorFacade.class);
+public class KafkaRecordStreamingProcessorFacade extends AbstractKafkaRecordProcessorFacade {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordStreamingProcessorFacade.class);
+ private final KafkaRecordStreamingProcessor kafkaRecordProcessor;
- private final KafkaConsumer camelKafkaConsumer;
- private final String threadId;
- private final KafkaRecordProcessor kafkaRecordProcessor;
- private final CommitManager commitManager;
- private final KafkaConsumerListener consumerListener;
-
- public KafkaRecordProcessorFacade(KafkaConsumer camelKafkaConsumer, String threadId,
- CommitManager commitManager, KafkaConsumerListener consumerListener) {
- this.camelKafkaConsumer = camelKafkaConsumer;
- this.threadId = threadId;
- this.commitManager = commitManager;
+ public KafkaRecordStreamingProcessorFacade(
+ KafkaConsumer camelKafkaConsumer, String threadId,
+ CommitManager commitManager, KafkaConsumerListener consumerListener) {
+ super(camelKafkaConsumer, threadId, commitManager, consumerListener);
kafkaRecordProcessor = buildKafkaRecordProcessor(commitManager);
- this.consumerListener = consumerListener;
+
}
- private boolean isStopping() {
- return camelKafkaConsumer.isStopping();
+ private KafkaRecordStreamingProcessor buildKafkaRecordProcessor(CommitManager commitManager) {
+ return new KafkaRecordStreamingProcessor(
+ camelKafkaConsumer.getEndpoint().getConfiguration(),
+ camelKafkaConsumer.getProcessor(),
+ commitManager);
+ }
+
+ /**
+ * Process a single record retrieved from Kafka
+ *
+ * @param partition the partition
+ * @param partitionHasNext whether there are more partitions to process
+ * @param recordHasNext whether more records to be processed exist in that partition
+ * @param kafkaRecordProcessor the record processor
+ * @param consumerRecord the consumer record retrieved from Kafka to process
+ * @return The result of processing this set of records
+ */
+ private ProcessingResult processRecord(
+ TopicPartition partition, boolean partitionHasNext, boolean recordHasNext,
+ KafkaRecordStreamingProcessor kafkaRecordProcessor,
+ ConsumerRecord<Object, Object> consumerRecord) {
+
+ logRecord(consumerRecord);
+
+ return kafkaRecordProcessor.processExchange(camelKafkaConsumer, partition, partitionHasNext,
+ recordHasNext, consumerRecord);
}
+ @Override
public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) {
logRecords(allRecords);
@@ -77,13 +97,15 @@ public class KafkaRecordProcessorFacade {
while (!result.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) {
ConsumerRecord<Object, Object> consumerRecord = recordIterator.next();
- LOG.debug("Processing record on partition {} with offset {}", consumerRecord.partition(),
+ LOG.debug("Processing record on partition {} with offset {}",
+ consumerRecord.partition(),
consumerRecord.offset());
result = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(),
kafkaRecordProcessor, consumerRecord);
- LOG.debug("Processed record on partition {} with offset {}", consumerRecord.partition(),
+ LOG.debug("Processed record on partition {} with offset {}",
+ consumerRecord.partition(),
consumerRecord.offset());
if (consumerListener != null) {
@@ -104,50 +126,4 @@ public class KafkaRecordProcessorFacade {
return result;
}
- private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Records count {} received for partition {}", partitionRecords.size(),
- partition);
- }
- }
-
- private void logRecords(ConsumerRecords<Object, Object> allRecords) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Last poll on thread {} resulted on {} records to process", threadId, allRecords.count());
- }
- }
-
- private ProcessingResult processRecord(
- TopicPartition partition,
- boolean partitionHasNext,
- boolean recordHasNext,
- KafkaRecordProcessor kafkaRecordProcessor,
- ConsumerRecord<Object, Object> consumerRecord) {
-
- logRecord(consumerRecord);
-
- Exchange exchange = camelKafkaConsumer.createExchange(false);
-
- ProcessingResult result = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext,
- recordHasNext, consumerRecord, camelKafkaConsumer.getExceptionHandler());
-
- // success so release the exchange
- camelKafkaConsumer.releaseExchange(exchange, false);
-
- return result;
- }
-
- private void logRecord(ConsumerRecord<Object, Object> consumerRecord) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", consumerRecord.partition(),
- consumerRecord.offset(), consumerRecord.key(), consumerRecord.value());
- }
- }
-
- private KafkaRecordProcessor buildKafkaRecordProcessor(CommitManager commitManager) {
- return new KafkaRecordProcessor(
- camelKafkaConsumer.getEndpoint().getConfiguration(),
- camelKafkaConsumer.getProcessor(),
- commitManager);
- }
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index f3dc6f0eaeb..226fb1209b4 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -306,8 +306,8 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
/**
* Sets the group id of the Kafka consumer.
*
- * @param groupId The poll duration in milliseconds.
- * @deprecated The parameter groupId is ignored.
+ * @param groupId The poll duration in milliseconds.
+ * @deprecated The parameter groupId is ignored.
*/
@Deprecated
public void setGroupId(String groupId) {