You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/12/01 08:59:32 UTC
[camel] 01/02: CAMEL-17255: camel-kafka - Use kafka interfaces for kafka producer and kafka consumer so the factory can create other implementations, such as mocks for testing.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3a74f8fc2d5969965710fa11bf1802c78f5eba4b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Dec 1 09:55:45 2021 +0100
CAMEL-17255: camel-kafka - Use kafka interfaces for kafka producer and kafka consumer so the factory can create other implementations, such as mocks for testing.
---
.../camel/component/kafka/DefaultKafkaClientFactory.java | 8 ++++----
.../component/kafka/DefaultKafkaManualAsyncCommit.java | 4 ++--
.../kafka/DefaultKafkaManualAsyncCommitFactory.java | 4 ++--
.../camel/component/kafka/DefaultKafkaManualCommit.java | 8 ++++----
.../component/kafka/DefaultKafkaManualCommitFactory.java | 4 ++--
.../component/kafka/DefaultKafkaManualSyncCommit.java | 4 ++--
.../apache/camel/component/kafka/KafkaClientFactory.java | 14 +++++++-------
.../apache/camel/component/kafka/KafkaFetchRecords.java | 2 +-
.../camel/component/kafka/KafkaManualCommitFactory.java | 4 ++--
.../org/apache/camel/component/kafka/KafkaProducer.java | 6 +++---
.../consumer/support/KafkaConsumerResumeStrategy.java | 6 ++++--
.../kafka/consumer/support/KafkaRecordProcessor.java | 16 ++++++++--------
.../support/OffsetKafkaConsumerResumeStrategy.java | 7 ++++---
.../consumer/support/PartitionAssignmentListener.java | 6 +++---
.../kafka/consumer/support/ResumeStrategyFactory.java | 4 ++--
.../support/SeekPolicyKafkaConsumerResumeStrategy.java | 5 +++--
.../apache/camel/component/kafka/KafkaProducerTest.java | 4 ++--
.../integration/KafkaConsumerWithResumeStrategyIT.java | 4 ++--
18 files changed, 57 insertions(+), 53 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
index 425788e..8b06f5b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
@@ -19,18 +19,18 @@ package org.apache.camel.component.kafka;
import java.util.Properties;
import org.apache.camel.util.ObjectHelper;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
public class DefaultKafkaClientFactory implements KafkaClientFactory {
@Override
- public KafkaProducer getProducer(Properties kafkaProps) {
+ public Producer getProducer(Properties kafkaProps) {
return new org.apache.kafka.clients.producer.KafkaProducer(kafkaProps);
}
@Override
- public KafkaConsumer getConsumer(Properties kafkaProps) {
+ public Consumer getConsumer(Properties kafkaProps) {
return new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.java
index 4db0dfb..e9765fe 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.java
@@ -21,7 +21,7 @@ import java.util.Collections;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
@@ -33,7 +33,7 @@ public class DefaultKafkaManualAsyncCommit extends DefaultKafkaManualCommit impl
private final Collection<KafkaAsyncManualCommit> asyncCommits;
- public DefaultKafkaManualAsyncCommit(KafkaConsumer consumer, String topicName, String threadId,
+ public DefaultKafkaManualAsyncCommit(Consumer consumer, String topicName, String threadId,
StateRepository<String, String> offsetRepository, TopicPartition partition,
long recordOffset, long commitTimeout,
Collection<KafkaAsyncManualCommit> asyncCommits) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommitFactory.java
index 43ff1c2..51dc58d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommitFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommitFactory.java
@@ -20,14 +20,14 @@ import java.util.Collection;
import org.apache.camel.Exchange;
import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
public class DefaultKafkaManualAsyncCommitFactory implements KafkaManualCommitFactory {
@Override
public KafkaManualCommit newInstance(
- Exchange exchange, KafkaConsumer consumer, String topicName, String threadId,
+ Exchange exchange, Consumer consumer, String topicName, String threadId,
StateRepository<String, String> offsetRepository,
TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits) {
return new DefaultKafkaManualAsyncCommit(
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
index ea6581d..aee6ed6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
@@ -17,12 +17,12 @@
package org.apache.camel.component.kafka;
import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
public abstract class DefaultKafkaManualCommit implements KafkaManualCommit {
- private final KafkaConsumer consumer;
+ private final Consumer consumer;
private final String topicName;
private final String threadId;
private final StateRepository<String, String> offsetRepository;
@@ -30,7 +30,7 @@ public abstract class DefaultKafkaManualCommit implements KafkaManualCommit {
private final long recordOffset;
private final long commitTimeout;
- public DefaultKafkaManualCommit(KafkaConsumer consumer, String topicName, String threadId,
+ public DefaultKafkaManualCommit(Consumer consumer, String topicName, String threadId,
StateRepository<String, String> offsetRepository, TopicPartition partition,
long recordOffset, long commitTimeout) {
this.consumer = consumer;
@@ -55,7 +55,7 @@ public abstract class DefaultKafkaManualCommit implements KafkaManualCommit {
return String.valueOf(offset);
}
- public KafkaConsumer getConsumer() {
+ public Consumer getConsumer() {
return consumer;
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
index 4080485..0fefc4e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
@@ -20,14 +20,14 @@ import java.util.Collection;
import org.apache.camel.Exchange;
import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
public class DefaultKafkaManualCommitFactory implements KafkaManualCommitFactory {
@Override
public KafkaManualCommit newInstance(
- Exchange exchange, KafkaConsumer consumer, String topicName, String threadId,
+ Exchange exchange, Consumer consumer, String topicName, String threadId,
StateRepository<String, String> offsetRepository,
TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits) {
return new DefaultKafkaManualSyncCommit(
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualSyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualSyncCommit.java
index 83b2862..7d5dd86 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualSyncCommit.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualSyncCommit.java
@@ -21,7 +21,7 @@ import java.util.Collections;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
@@ -31,7 +31,7 @@ public class DefaultKafkaManualSyncCommit extends DefaultKafkaManualCommit imple
private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaManualSyncCommit.class);
- public DefaultKafkaManualSyncCommit(KafkaConsumer consumer, String topicName, String threadId,
+ public DefaultKafkaManualSyncCommit(Consumer consumer, String topicName, String threadId,
StateRepository<String, String> offsetRepository, TopicPartition partition,
long recordOffset, long commitTimeout) {
super(consumer, topicName, threadId, offsetRepository, partition, recordOffset, commitTimeout);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
index d7de678..8ee33506 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
@@ -18,29 +18,29 @@ package org.apache.camel.component.kafka;
import java.util.Properties;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
/**
- * Factory to create a new {@link KafkaConsumer} and {@link KafkaProducer} instances.
+ * Factory to create a new Kafka {@link Consumer} and Kafka {@link Producer} instances.
*/
public interface KafkaClientFactory {
/**
- * Creates a new instance of the {@link KafkaProducer} class.
+ * Creates a new instance of the Kafka {@link Producer} class.
*
* @param kafkaProps The producer configs.
* @return an instance of Kafka producer.
*/
- KafkaProducer getProducer(Properties kafkaProps);
+ Producer getProducer(Properties kafkaProps);
/**
- * Creates a new instance of the {@link KafkaConsumer} class.
+ * Creates a new instance of the Kafka {@link Consumer} class.
*
* @param kafkaProps The consumer configs.
* @return an instance of Kafka consumer.
*/
- KafkaConsumer getConsumer(Properties kafkaProps);
+ Consumer getConsumer(Properties kafkaProps);
/**
* URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 61fb686..2a487d1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -49,7 +49,7 @@ class KafkaFetchRecords implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaFetchRecords.class);
private final KafkaConsumer kafkaConsumer;
- private org.apache.kafka.clients.consumer.KafkaConsumer consumer;
+ private org.apache.kafka.clients.consumer.Consumer consumer;
private final String topicName;
private final Pattern topicPattern;
private final String threadId;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
index d0b9302..b0f6144 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
@@ -20,7 +20,7 @@ import java.util.Collection;
import org.apache.camel.Exchange;
import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
/**
@@ -32,7 +32,7 @@ public interface KafkaManualCommitFactory {
* Creates a new instance
*/
KafkaManualCommit newInstance(
- Exchange exchange, KafkaConsumer consumer, String topicName, String threadId,
+ Exchange exchange, Consumer consumer, String topicName, String threadId,
StateRepository<String, String> offsetRepository,
TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits);
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index a89ecdd..97a6e6d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -56,7 +56,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class);
@SuppressWarnings("rawtypes")
- private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
+ private org.apache.kafka.clients.producer.Producer kafkaProducer;
private final KafkaEndpoint endpoint;
private final KafkaConfiguration configuration;
private ExecutorService workerPool;
@@ -89,7 +89,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
}
@SuppressWarnings("rawtypes")
- public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() {
+ public org.apache.kafka.clients.producer.Producer getKafkaProducer() {
return kafkaProducer;
}
@@ -97,7 +97,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
* To use a custom {@link org.apache.kafka.clients.producer.KafkaProducer} instance.
*/
@SuppressWarnings("rawtypes")
- public void setKafkaProducer(org.apache.kafka.clients.producer.KafkaProducer kafkaProducer) {
+ public void setKafkaProducer(org.apache.kafka.clients.producer.Producer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
index d85579b..0deba22 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
@@ -18,13 +18,15 @@
package org.apache.camel.component.kafka.consumer.support;
import org.apache.camel.ResumeStrategy;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume
* processing records.
*/
-public interface KafkaConsumerResumeStrategy extends ResumeStrategy<KafkaConsumer<?, ?>> {
+public interface KafkaConsumerResumeStrategy extends ResumeStrategy<Consumer<?, ?>> {
+
/**
* Perform the resume operation. This runs in the scope of the Kafka Consumer thread and may run concurrently with
* other consumer instances when the component is set up to use more than one of them. As such, implementations are
@@ -32,5 +34,5 @@ public interface KafkaConsumerResumeStrategy extends ResumeStrategy<KafkaConsume
*
* @param consumer an instance of the KafkaConsumer which is resuming the operation
*/
- void resume(KafkaConsumer<?, ?> consumer);
+ void resume(Consumer<?, ?> consumer);
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index 197a865..320cbfe 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -34,8 +34,8 @@ import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.StateRepository;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
@@ -50,7 +50,7 @@ public class KafkaRecordProcessor {
private final boolean autoCommitEnabled;
private final KafkaConfiguration configuration;
private final Processor processor;
- private final KafkaConsumer<?, ?> consumer;
+ private final Consumer<?, ?> consumer;
private final KafkaManualCommitFactory manualCommitFactory;
private final String threadId;
private final ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits;
@@ -80,7 +80,7 @@ public class KafkaRecordProcessor {
}
public KafkaRecordProcessor(boolean autoCommitEnabled, KafkaConfiguration configuration,
- Processor processor, KafkaConsumer<?, ?> consumer,
+ Processor processor, Consumer<?, ?> consumer,
KafkaManualCommitFactory manualCommitFactory,
String threadId, ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits) {
this.autoCommitEnabled = autoCommitEnabled;
@@ -194,7 +194,7 @@ public class KafkaRecordProcessor {
}
public static void commitOffset(
- KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
+ KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
boolean stopping, boolean forceCommit, String threadId) {
if (partitionLastOffset == START_OFFSET) {
@@ -221,7 +221,7 @@ public class KafkaRecordProcessor {
}
private static void commitOffset(
- KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition,
+ KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition,
long partitionLastOffset) {
long timeout = configuration.getCommitTimeoutMs();
consumer.commitSync(
@@ -230,7 +230,7 @@ public class KafkaRecordProcessor {
}
private static void forceSyncCommit(
- KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
+ KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
String threadId) {
if (LOG.isDebugEnabled()) {
LOG.debug("Forcing commitSync {} [topic: {} partition: {} offset: {}]", threadId, partition.topic(),
@@ -247,7 +247,7 @@ public class KafkaRecordProcessor {
}
private static void commitSync(
- KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
+ KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
String threadId) {
if (LOG.isDebugEnabled()) {
@@ -258,7 +258,7 @@ public class KafkaRecordProcessor {
}
private static void commitAsync(
- KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, String threadId) {
+ Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, String threadId) {
if (LOG.isDebugEnabled()) {
LOG.debug("Auto commitAsync on stop {} from topic {}", threadId, partition.topic());
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
index 7c13eb1..5e690fa 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
@@ -20,7 +20,7 @@ package org.apache.camel.component.kafka.consumer.support;
import java.util.Set;
import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +32,7 @@ import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProce
* A resume strategy that uses Kafka's offset for resuming
*/
public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
+
private static final Logger LOG = LoggerFactory.getLogger(OffsetKafkaConsumerResumeStrategy.class);
private final StateRepository<String, String> offsetRepository;
@@ -40,7 +41,7 @@ public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStr
this.offsetRepository = offsetRepository;
}
- private void resumeFromOffset(final KafkaConsumer<?, ?> consumer, TopicPartition topicPartition, String offsetState) {
+ private void resumeFromOffset(final Consumer<?, ?> consumer, TopicPartition topicPartition, String offsetState) {
// The state contains the last read offset, so you need to seek from the next one
long offset = deserializeOffsetValue(offsetState) + 1;
LOG.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset);
@@ -48,7 +49,7 @@ public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStr
}
@Override
- public void resume(final KafkaConsumer<?, ?> consumer) {
+ public void resume(final Consumer<?, ?> consumer) {
Set<TopicPartition> assignments = consumer.assignment();
for (TopicPartition topicPartition : assignments) {
String offsetState = offsetRepository.getState(serializeOffsetKey(topicPartition));
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index 0420041..1ffbd2f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -22,8 +22,8 @@ import java.util.Map;
import java.util.function.Supplier;
import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,13 +36,13 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
private final String threadId;
private final String topicName;
private final KafkaConfiguration configuration;
- private final KafkaConsumer consumer;
+ private final Consumer consumer;
private final Map<String, Long> lastProcessedOffset;
private final KafkaConsumerResumeStrategy resumeStrategy;
private Supplier<Boolean> stopStateSupplier;
public PartitionAssignmentListener(String threadId, String topicName, KafkaConfiguration configuration,
- KafkaConsumer consumer, Map<String, Long> lastProcessedOffset,
+ Consumer consumer, Map<String, Long> lastProcessedOffset,
Supplier<Boolean> stopStateSupplier) {
this.threadId = threadId;
this.topicName = topicName;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index 272f467..09f304b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.kafka.consumer.support;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,7 +30,7 @@ public final class ResumeStrategyFactory {
private static class NoOpKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
@SuppressWarnings("unused")
@Override
- public void resume(KafkaConsumer<?, ?> consumer) {
+ public void resume(Consumer<?, ?> consumer) {
// NO-OP
}
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
index 969f0df..1eba302 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
@@ -17,7 +17,7 @@
package org.apache.camel.component.kafka.consumer.support;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
* A resume strategy that uses Camel's seekTo configuration for resuming
*/
public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
+
private static final Logger LOG = LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeStrategy.class);
private final String seekPolicy;
@@ -34,7 +35,7 @@ public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResum
}
@Override
- public void resume(final KafkaConsumer<?, ?> consumer) {
+ public void resume(final Consumer<?, ?> consumer) {
if (seekPolicy.equals("beginning")) {
LOG.debug("Seeking from the beginning of topic");
consumer.seekToBeginning(consumer.assignment());
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 3e287ee..9495020 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -131,7 +131,7 @@ public class KafkaProducerTest {
public void processSendsMessageWithException() {
endpoint.getConfiguration().setTopic("sometopic");
// setup the exception here
- org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer();
+ org.apache.kafka.clients.producer.Producer kp = producer.getKafkaProducer();
Mockito.when(kp.send(any(ProducerRecord.class))).thenThrow(new ApiException());
Mockito.when(exchange.getIn()).thenReturn(in);
Mockito.when(exchange.getMessage()).thenReturn(in);
@@ -165,7 +165,7 @@ public class KafkaProducerTest {
Mockito.when(exchange.getMessage()).thenReturn(in);
// setup the exception here
- org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer();
+ org.apache.kafka.clients.producer.Producer kp = producer.getKafkaProducer();
Mockito.when(kp.send(any(ProducerRecord.class), any(Callback.class))).thenThrow(new ApiException());
in.setHeader(KafkaConstants.PARTITION_KEY, 4);
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
index 192a1a6..15bd79c 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
@@ -25,7 +25,7 @@ import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -53,7 +53,7 @@ public class KafkaConsumerWithResumeStrategyIT extends BaseEmbeddedKafkaTestSupp
}
@Override
- public void resume(KafkaConsumer<?, ?> consumer) {
+ public void resume(Consumer<?, ?> consumer) {
resumeCalled = true;
if (consumer != null) {