You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/12/01 08:59:32 UTC

[camel] 01/02: CAMEL-17255: camel-kafka - Use kafka interfaces for kafka producer and kafka consumer so the factory can create other implementations, such as mocks for testing.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3a74f8fc2d5969965710fa11bf1802c78f5eba4b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Dec 1 09:55:45 2021 +0100

    CAMEL-17255: camel-kafka - Use kafka interfaces for kafka producer and kafka consumer so the factory can create other implementations, such as mocks for testing.
---
 .../camel/component/kafka/DefaultKafkaClientFactory.java |  8 ++++----
 .../component/kafka/DefaultKafkaManualAsyncCommit.java   |  4 ++--
 .../kafka/DefaultKafkaManualAsyncCommitFactory.java      |  4 ++--
 .../camel/component/kafka/DefaultKafkaManualCommit.java  |  8 ++++----
 .../component/kafka/DefaultKafkaManualCommitFactory.java |  4 ++--
 .../component/kafka/DefaultKafkaManualSyncCommit.java    |  4 ++--
 .../apache/camel/component/kafka/KafkaClientFactory.java | 14 +++++++-------
 .../apache/camel/component/kafka/KafkaFetchRecords.java  |  2 +-
 .../camel/component/kafka/KafkaManualCommitFactory.java  |  4 ++--
 .../org/apache/camel/component/kafka/KafkaProducer.java  |  6 +++---
 .../consumer/support/KafkaConsumerResumeStrategy.java    |  6 ++++--
 .../kafka/consumer/support/KafkaRecordProcessor.java     | 16 ++++++++--------
 .../support/OffsetKafkaConsumerResumeStrategy.java       |  7 ++++---
 .../consumer/support/PartitionAssignmentListener.java    |  6 +++---
 .../kafka/consumer/support/ResumeStrategyFactory.java    |  4 ++--
 .../support/SeekPolicyKafkaConsumerResumeStrategy.java   |  5 +++--
 .../apache/camel/component/kafka/KafkaProducerTest.java  |  4 ++--
 .../integration/KafkaConsumerWithResumeStrategyIT.java   |  4 ++--
 18 files changed, 57 insertions(+), 53 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
index 425788e..8b06f5b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
@@ -19,18 +19,18 @@ package org.apache.camel.component.kafka;
 import java.util.Properties;
 
 import org.apache.camel.util.ObjectHelper;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
 
 public class DefaultKafkaClientFactory implements KafkaClientFactory {
 
     @Override
-    public KafkaProducer getProducer(Properties kafkaProps) {
+    public Producer getProducer(Properties kafkaProps) {
         return new org.apache.kafka.clients.producer.KafkaProducer(kafkaProps);
     }
 
     @Override
-    public KafkaConsumer getConsumer(Properties kafkaProps) {
+    public Consumer getConsumer(Properties kafkaProps) {
         return new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
     }
 
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.java
index 4db0dfb..e9765fe 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.java
@@ -21,7 +21,7 @@ import java.util.Collections;
 
 import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
 import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
@@ -33,7 +33,7 @@ public class DefaultKafkaManualAsyncCommit extends DefaultKafkaManualCommit impl
 
     private final Collection<KafkaAsyncManualCommit> asyncCommits;
 
-    public DefaultKafkaManualAsyncCommit(KafkaConsumer consumer, String topicName, String threadId,
+    public DefaultKafkaManualAsyncCommit(Consumer consumer, String topicName, String threadId,
                                          StateRepository<String, String> offsetRepository, TopicPartition partition,
                                          long recordOffset, long commitTimeout,
                                          Collection<KafkaAsyncManualCommit> asyncCommits) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommitFactory.java
index 43ff1c2..51dc58d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommitFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommitFactory.java
@@ -20,14 +20,14 @@ import java.util.Collection;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 
 public class DefaultKafkaManualAsyncCommitFactory implements KafkaManualCommitFactory {
 
     @Override
     public KafkaManualCommit newInstance(
-            Exchange exchange, KafkaConsumer consumer, String topicName, String threadId,
+            Exchange exchange, Consumer consumer, String topicName, String threadId,
             StateRepository<String, String> offsetRepository,
             TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits) {
         return new DefaultKafkaManualAsyncCommit(
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
index ea6581d..aee6ed6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
@@ -17,12 +17,12 @@
 package org.apache.camel.component.kafka;
 
 import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 
 public abstract class DefaultKafkaManualCommit implements KafkaManualCommit {
 
-    private final KafkaConsumer consumer;
+    private final Consumer consumer;
     private final String topicName;
     private final String threadId;
     private final StateRepository<String, String> offsetRepository;
@@ -30,7 +30,7 @@ public abstract class DefaultKafkaManualCommit implements KafkaManualCommit {
     private final long recordOffset;
     private final long commitTimeout;
 
-    public DefaultKafkaManualCommit(KafkaConsumer consumer, String topicName, String threadId,
+    public DefaultKafkaManualCommit(Consumer consumer, String topicName, String threadId,
                                     StateRepository<String, String> offsetRepository, TopicPartition partition,
                                     long recordOffset, long commitTimeout) {
         this.consumer = consumer;
@@ -55,7 +55,7 @@ public abstract class DefaultKafkaManualCommit implements KafkaManualCommit {
         return String.valueOf(offset);
     }
 
-    public KafkaConsumer getConsumer() {
+    public Consumer getConsumer() {
         return consumer;
     }
 
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
index 4080485..0fefc4e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
@@ -20,14 +20,14 @@ import java.util.Collection;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 
 public class DefaultKafkaManualCommitFactory implements KafkaManualCommitFactory {
 
     @Override
     public KafkaManualCommit newInstance(
-            Exchange exchange, KafkaConsumer consumer, String topicName, String threadId,
+            Exchange exchange, Consumer consumer, String topicName, String threadId,
             StateRepository<String, String> offsetRepository,
             TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits) {
         return new DefaultKafkaManualSyncCommit(
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualSyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualSyncCommit.java
index 83b2862..7d5dd86 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualSyncCommit.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualSyncCommit.java
@@ -21,7 +21,7 @@ import java.util.Collections;
 
 import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
 import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
@@ -31,7 +31,7 @@ public class DefaultKafkaManualSyncCommit extends DefaultKafkaManualCommit imple
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaManualSyncCommit.class);
 
-    public DefaultKafkaManualSyncCommit(KafkaConsumer consumer, String topicName, String threadId,
+    public DefaultKafkaManualSyncCommit(Consumer consumer, String topicName, String threadId,
                                         StateRepository<String, String> offsetRepository, TopicPartition partition,
                                         long recordOffset, long commitTimeout) {
         super(consumer, topicName, threadId, offsetRepository, partition, recordOffset, commitTimeout);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
index d7de678..8ee33506 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
@@ -18,29 +18,29 @@ package org.apache.camel.component.kafka;
 
 import java.util.Properties;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
 
 /**
- * Factory to create a new {@link KafkaConsumer} and {@link KafkaProducer} instances.
+ * Factory to create a new Kafka {@link Consumer} and Kafka {@link Producer} instances.
  */
 public interface KafkaClientFactory {
 
     /**
-     * Creates a new instance of the {@link KafkaProducer} class.
+     * Creates a new instance of the Kafka {@link Producer} class.
      * 
      * @param  kafkaProps The producer configs.
      * @return            an instance of Kafka producer.
      */
-    KafkaProducer getProducer(Properties kafkaProps);
+    Producer getProducer(Properties kafkaProps);
 
     /**
-     * Creates a new instance of the {@link KafkaConsumer} class.
+     * Creates a new instance of the Kafka {@link Consumer} class.
      * 
      * @param  kafkaProps The consumer configs.
      * @return            an instance of Kafka consumer.
      */
-    KafkaConsumer getConsumer(Properties kafkaProps);
+    Consumer getConsumer(Properties kafkaProps);
 
     /**
      * URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 61fb686..2a487d1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -49,7 +49,7 @@ class KafkaFetchRecords implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaFetchRecords.class);
 
     private final KafkaConsumer kafkaConsumer;
-    private org.apache.kafka.clients.consumer.KafkaConsumer consumer;
+    private org.apache.kafka.clients.consumer.Consumer consumer;
     private final String topicName;
     private final Pattern topicPattern;
     private final String threadId;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
index d0b9302..b0f6144 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
@@ -20,7 +20,7 @@ import java.util.Collection;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 
 /**
@@ -32,7 +32,7 @@ public interface KafkaManualCommitFactory {
      * Creates a new instance
      */
     KafkaManualCommit newInstance(
-            Exchange exchange, KafkaConsumer consumer, String topicName, String threadId,
+            Exchange exchange, Consumer consumer, String topicName, String threadId,
             StateRepository<String, String> offsetRepository,
             TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits);
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index a89ecdd..97a6e6d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -56,7 +56,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class);
 
     @SuppressWarnings("rawtypes")
-    private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
+    private org.apache.kafka.clients.producer.Producer kafkaProducer;
     private final KafkaEndpoint endpoint;
     private final KafkaConfiguration configuration;
     private ExecutorService workerPool;
@@ -89,7 +89,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
     }
 
     @SuppressWarnings("rawtypes")
-    public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() {
+    public org.apache.kafka.clients.producer.Producer getKafkaProducer() {
         return kafkaProducer;
     }
 
@@ -97,7 +97,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
      * To use a custom {@link org.apache.kafka.clients.producer.KafkaProducer} instance.
      */
     @SuppressWarnings("rawtypes")
-    public void setKafkaProducer(org.apache.kafka.clients.producer.KafkaProducer kafkaProducer) {
+    public void setKafkaProducer(org.apache.kafka.clients.producer.Producer kafkaProducer) {
         this.kafkaProducer = kafkaProducer;
     }
 
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
index d85579b..0deba22 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
@@ -18,13 +18,15 @@
 package org.apache.camel.component.kafka.consumer.support;
 
 import org.apache.camel.ResumeStrategy;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 
 /**
  * Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume
  * processing records.
  */
-public interface KafkaConsumerResumeStrategy extends ResumeStrategy<KafkaConsumer<?, ?>> {
+public interface KafkaConsumerResumeStrategy extends ResumeStrategy<Consumer<?, ?>> {
+
     /**
      * Perform the resume operation. This runs in the scope of the Kafka Consumer thread and may run concurrently with
      * other consumer instances when the component is set up to use more than one of them. As such, implementations are
@@ -32,5 +34,5 @@ public interface KafkaConsumerResumeStrategy extends ResumeStrategy<KafkaConsume
      *
      * @param consumer an instance of the KafkaConsumer which is resuming the operation
      */
-    void resume(KafkaConsumer<?, ?> consumer);
+    void resume(Consumer<?, ?> consumer);
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index 197a865..320cbfe 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -34,8 +34,8 @@ import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.StateRepository;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Header;
@@ -50,7 +50,7 @@ public class KafkaRecordProcessor {
     private final boolean autoCommitEnabled;
     private final KafkaConfiguration configuration;
     private final Processor processor;
-    private final KafkaConsumer<?, ?> consumer;
+    private final Consumer<?, ?> consumer;
     private final KafkaManualCommitFactory manualCommitFactory;
     private final String threadId;
     private final ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits;
@@ -80,7 +80,7 @@ public class KafkaRecordProcessor {
     }
 
     public KafkaRecordProcessor(boolean autoCommitEnabled, KafkaConfiguration configuration,
-                                Processor processor, KafkaConsumer<?, ?> consumer,
+                                Processor processor, Consumer<?, ?> consumer,
                                 KafkaManualCommitFactory manualCommitFactory,
                                 String threadId, ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits) {
         this.autoCommitEnabled = autoCommitEnabled;
@@ -194,7 +194,7 @@ public class KafkaRecordProcessor {
     }
 
     public static void commitOffset(
-            KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
+            KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
             boolean stopping, boolean forceCommit, String threadId) {
 
         if (partitionLastOffset == START_OFFSET) {
@@ -221,7 +221,7 @@ public class KafkaRecordProcessor {
     }
 
     private static void commitOffset(
-            KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition,
+            KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition,
             long partitionLastOffset) {
         long timeout = configuration.getCommitTimeoutMs();
         consumer.commitSync(
@@ -230,7 +230,7 @@ public class KafkaRecordProcessor {
     }
 
     private static void forceSyncCommit(
-            KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
+            KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
             String threadId) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Forcing commitSync {} [topic: {} partition: {} offset: {}]", threadId, partition.topic(),
@@ -247,7 +247,7 @@ public class KafkaRecordProcessor {
     }
 
     private static void commitSync(
-            KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
+            KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
             String threadId) {
 
         if (LOG.isDebugEnabled()) {
@@ -258,7 +258,7 @@ public class KafkaRecordProcessor {
     }
 
     private static void commitAsync(
-            KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, String threadId) {
+            Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, String threadId) {
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Auto commitAsync on stop {} from topic {}", threadId, partition.topic());
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
index 7c13eb1..5e690fa 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
@@ -20,7 +20,7 @@ package org.apache.camel.component.kafka.consumer.support;
 import java.util.Set;
 
 import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,6 +32,7 @@ import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProce
  * A resume strategy that uses Kafka's offset for resuming
  */
 public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
+
     private static final Logger LOG = LoggerFactory.getLogger(OffsetKafkaConsumerResumeStrategy.class);
 
     private final StateRepository<String, String> offsetRepository;
@@ -40,7 +41,7 @@ public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStr
         this.offsetRepository = offsetRepository;
     }
 
-    private void resumeFromOffset(final KafkaConsumer<?, ?> consumer, TopicPartition topicPartition, String offsetState) {
+    private void resumeFromOffset(final Consumer<?, ?> consumer, TopicPartition topicPartition, String offsetState) {
         // The state contains the last read offset, so you need to seek from the next one
         long offset = deserializeOffsetValue(offsetState) + 1;
         LOG.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset);
@@ -48,7 +49,7 @@ public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStr
     }
 
     @Override
-    public void resume(final KafkaConsumer<?, ?> consumer) {
+    public void resume(final Consumer<?, ?> consumer) {
         Set<TopicPartition> assignments = consumer.assignment();
         for (TopicPartition topicPartition : assignments) {
             String offsetState = offsetRepository.getState(serializeOffsetKey(topicPartition));
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index 0420041..1ffbd2f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -22,8 +22,8 @@ import java.util.Map;
 import java.util.function.Supplier;
 
 import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,13 +36,13 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
     private final String threadId;
     private final String topicName;
     private final KafkaConfiguration configuration;
-    private final KafkaConsumer consumer;
+    private final Consumer consumer;
     private final Map<String, Long> lastProcessedOffset;
     private final KafkaConsumerResumeStrategy resumeStrategy;
     private Supplier<Boolean> stopStateSupplier;
 
     public PartitionAssignmentListener(String threadId, String topicName, KafkaConfiguration configuration,
-                                       KafkaConsumer consumer, Map<String, Long> lastProcessedOffset,
+                                       Consumer consumer, Map<String, Long> lastProcessedOffset,
                                        Supplier<Boolean> stopStateSupplier) {
         this.threadId = threadId;
         this.topicName = topicName;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index 272f467..09f304b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.kafka.consumer.support;
 
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,7 +30,7 @@ public final class ResumeStrategyFactory {
     private static class NoOpKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
         @SuppressWarnings("unused")
         @Override
-        public void resume(KafkaConsumer<?, ?> consumer) {
+        public void resume(Consumer<?, ?> consumer) {
             // NO-OP
         }
     }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
index 969f0df..1eba302 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
@@ -17,7 +17,7 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
  * A resume strategy that uses Camel's seekTo configuration for resuming
  */
 public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
+
     private static final Logger LOG = LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeStrategy.class);
 
     private final String seekPolicy;
@@ -34,7 +35,7 @@ public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResum
     }
 
     @Override
-    public void resume(final KafkaConsumer<?, ?> consumer) {
+    public void resume(final Consumer<?, ?> consumer) {
         if (seekPolicy.equals("beginning")) {
             LOG.debug("Seeking from the beginning of topic");
             consumer.seekToBeginning(consumer.assignment());
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 3e287ee..9495020 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -131,7 +131,7 @@ public class KafkaProducerTest {
     public void processSendsMessageWithException() {
         endpoint.getConfiguration().setTopic("sometopic");
         // setup the exception here
-        org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer();
+        org.apache.kafka.clients.producer.Producer kp = producer.getKafkaProducer();
         Mockito.when(kp.send(any(ProducerRecord.class))).thenThrow(new ApiException());
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getMessage()).thenReturn(in);
@@ -165,7 +165,7 @@ public class KafkaProducerTest {
         Mockito.when(exchange.getMessage()).thenReturn(in);
 
         // setup the exception here
-        org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer();
+        org.apache.kafka.clients.producer.Producer kp = producer.getKafkaProducer();
         Mockito.when(kp.send(any(ProducerRecord.class), any(Callback.class))).thenThrow(new ApiException());
 
         in.setHeader(KafkaConstants.PARTITION_KEY, 4);
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
index 192a1a6..15bd79c 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
@@ -25,7 +25,7 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -53,7 +53,7 @@ public class KafkaConsumerWithResumeStrategyIT extends BaseEmbeddedKafkaTestSupp
         }
 
         @Override
-        public void resume(KafkaConsumer<?, ?> consumer) {
+        public void resume(Consumer<?, ?> consumer) {
             resumeCalled = true;
 
             if (consumer != null) {