You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/12/01 08:59:31 UTC

[camel] branch main updated (5e58bcf -> a464225)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 5e58bcf  Sync deps
     new 3a74f8f  CAMEL-17255: camel-kafka - Use kafka interfaces for kafka producer and kafka consumer so the factory can create other implementations, such as mocks for testing.
     new a464225  CAMEL-17255: camel-kafka - Use kafka interfaces for kafka producer and kafka consumer so the factory can create other implementations, such as mocks for testing.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../camel/component/kafka/DefaultKafkaClientFactory.java |  8 ++++----
 .../component/kafka/DefaultKafkaManualAsyncCommit.java   |  4 ++--
 .../kafka/DefaultKafkaManualAsyncCommitFactory.java      |  4 ++--
 .../camel/component/kafka/DefaultKafkaManualCommit.java  |  8 ++++----
 .../component/kafka/DefaultKafkaManualCommitFactory.java |  4 ++--
 .../component/kafka/DefaultKafkaManualSyncCommit.java    |  4 ++--
 .../apache/camel/component/kafka/KafkaClientFactory.java | 14 +++++++-------
 .../apache/camel/component/kafka/KafkaFetchRecords.java  |  2 +-
 .../camel/component/kafka/KafkaManualCommitFactory.java  |  4 ++--
 .../org/apache/camel/component/kafka/KafkaProducer.java  |  6 +++---
 .../consumer/support/KafkaConsumerResumeStrategy.java    |  6 ++++--
 .../kafka/consumer/support/KafkaRecordProcessor.java     | 16 ++++++++--------
 .../support/OffsetKafkaConsumerResumeStrategy.java       |  7 ++++---
 .../consumer/support/PartitionAssignmentListener.java    |  6 +++---
 .../kafka/consumer/support/ResumeStrategyFactory.java    |  4 ++--
 .../support/SeekPolicyKafkaConsumerResumeStrategy.java   |  5 +++--
 .../apache/camel/component/kafka/KafkaProducerTest.java  |  4 ++--
 .../integration/KafkaConsumerWithResumeStrategyIT.java   |  4 ++--
 .../modules/ROOT/pages/camel-3x-upgrade-guide-3_14.adoc  |  5 +++++
 19 files changed, 62 insertions(+), 53 deletions(-)

[camel] 01/02: CAMEL-17255: camel-kafka - Use kafka interfaces for kafka producer and kafka consumer so the factory can create other implementations, such as mocks for testing.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3a74f8fc2d5969965710fa11bf1802c78f5eba4b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Dec 1 09:55:45 2021 +0100

    CAMEL-17255: camel-kafka - Use kafka interfaces for kafka producer and kafka consumer so the factory can create other implementations, such as mocks for testing.
---
 .../camel/component/kafka/DefaultKafkaClientFactory.java |  8 ++++----
 .../component/kafka/DefaultKafkaManualAsyncCommit.java   |  4 ++--
 .../kafka/DefaultKafkaManualAsyncCommitFactory.java      |  4 ++--
 .../camel/component/kafka/DefaultKafkaManualCommit.java  |  8 ++++----
 .../component/kafka/DefaultKafkaManualCommitFactory.java |  4 ++--
 .../component/kafka/DefaultKafkaManualSyncCommit.java    |  4 ++--
 .../apache/camel/component/kafka/KafkaClientFactory.java | 14 +++++++-------
 .../apache/camel/component/kafka/KafkaFetchRecords.java  |  2 +-
 .../camel/component/kafka/KafkaManualCommitFactory.java  |  4 ++--
 .../org/apache/camel/component/kafka/KafkaProducer.java  |  6 +++---
 .../consumer/support/KafkaConsumerResumeStrategy.java    |  6 ++++--
 .../kafka/consumer/support/KafkaRecordProcessor.java     | 16 ++++++++--------
 .../support/OffsetKafkaConsumerResumeStrategy.java       |  7 ++++---
 .../consumer/support/PartitionAssignmentListener.java    |  6 +++---
 .../kafka/consumer/support/ResumeStrategyFactory.java    |  4 ++--
 .../support/SeekPolicyKafkaConsumerResumeStrategy.java   |  5 +++--
 .../apache/camel/component/kafka/KafkaProducerTest.java  |  4 ++--
 .../integration/KafkaConsumerWithResumeStrategyIT.java   |  4 ++--
 18 files changed, 57 insertions(+), 53 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
index 425788e..8b06f5b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java
@@ -19,18 +19,18 @@ package org.apache.camel.component.kafka;
 import java.util.Properties;
 
 import org.apache.camel.util.ObjectHelper;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
 
 public class DefaultKafkaClientFactory implements KafkaClientFactory {
 
     @Override
-    public KafkaProducer getProducer(Properties kafkaProps) {
+    public Producer getProducer(Properties kafkaProps) {
         return new org.apache.kafka.clients.producer.KafkaProducer(kafkaProps);
     }
 
     @Override
-    public KafkaConsumer getConsumer(Properties kafkaProps) {
+    public Consumer getConsumer(Properties kafkaProps) {
         return new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
     }
 
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.java
index 4db0dfb..e9765fe 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.java
@@ -21,7 +21,7 @@ import java.util.Collections;
 
 import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
 import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
@@ -33,7 +33,7 @@ public class DefaultKafkaManualAsyncCommit extends DefaultKafkaManualCommit impl
 
     private final Collection<KafkaAsyncManualCommit> asyncCommits;
 
-    public DefaultKafkaManualAsyncCommit(KafkaConsumer consumer, String topicName, String threadId,
+    public DefaultKafkaManualAsyncCommit(Consumer consumer, String topicName, String threadId,
                                          StateRepository<String, String> offsetRepository, TopicPartition partition,
                                          long recordOffset, long commitTimeout,
                                          Collection<KafkaAsyncManualCommit> asyncCommits) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommitFactory.java
index 43ff1c2..51dc58d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommitFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommitFactory.java
@@ -20,14 +20,14 @@ import java.util.Collection;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 
 public class DefaultKafkaManualAsyncCommitFactory implements KafkaManualCommitFactory {
 
     @Override
     public KafkaManualCommit newInstance(
-            Exchange exchange, KafkaConsumer consumer, String topicName, String threadId,
+            Exchange exchange, Consumer consumer, String topicName, String threadId,
             StateRepository<String, String> offsetRepository,
             TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits) {
         return new DefaultKafkaManualAsyncCommit(
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
index ea6581d..aee6ed6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java
@@ -17,12 +17,12 @@
 package org.apache.camel.component.kafka;
 
 import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 
 public abstract class DefaultKafkaManualCommit implements KafkaManualCommit {
 
-    private final KafkaConsumer consumer;
+    private final Consumer consumer;
     private final String topicName;
     private final String threadId;
     private final StateRepository<String, String> offsetRepository;
@@ -30,7 +30,7 @@ public abstract class DefaultKafkaManualCommit implements KafkaManualCommit {
     private final long recordOffset;
     private final long commitTimeout;
 
-    public DefaultKafkaManualCommit(KafkaConsumer consumer, String topicName, String threadId,
+    public DefaultKafkaManualCommit(Consumer consumer, String topicName, String threadId,
                                     StateRepository<String, String> offsetRepository, TopicPartition partition,
                                     long recordOffset, long commitTimeout) {
         this.consumer = consumer;
@@ -55,7 +55,7 @@ public abstract class DefaultKafkaManualCommit implements KafkaManualCommit {
         return String.valueOf(offset);
     }
 
-    public KafkaConsumer getConsumer() {
+    public Consumer getConsumer() {
         return consumer;
     }
 
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
index 4080485..0fefc4e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java
@@ -20,14 +20,14 @@ import java.util.Collection;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 
 public class DefaultKafkaManualCommitFactory implements KafkaManualCommitFactory {
 
     @Override
     public KafkaManualCommit newInstance(
-            Exchange exchange, KafkaConsumer consumer, String topicName, String threadId,
+            Exchange exchange, Consumer consumer, String topicName, String threadId,
             StateRepository<String, String> offsetRepository,
             TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits) {
         return new DefaultKafkaManualSyncCommit(
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualSyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualSyncCommit.java
index 83b2862..7d5dd86 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualSyncCommit.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualSyncCommit.java
@@ -21,7 +21,7 @@ import java.util.Collections;
 
 import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
 import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
@@ -31,7 +31,7 @@ public class DefaultKafkaManualSyncCommit extends DefaultKafkaManualCommit imple
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaManualSyncCommit.class);
 
-    public DefaultKafkaManualSyncCommit(KafkaConsumer consumer, String topicName, String threadId,
+    public DefaultKafkaManualSyncCommit(Consumer consumer, String topicName, String threadId,
                                         StateRepository<String, String> offsetRepository, TopicPartition partition,
                                         long recordOffset, long commitTimeout) {
         super(consumer, topicName, threadId, offsetRepository, partition, recordOffset, commitTimeout);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
index d7de678..8ee33506 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java
@@ -18,29 +18,29 @@ package org.apache.camel.component.kafka;
 
 import java.util.Properties;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
 
 /**
- * Factory to create a new {@link KafkaConsumer} and {@link KafkaProducer} instances.
+ * Factory to create a new Kafka {@link Consumer} and Kafka {@link Producer} instances.
  */
 public interface KafkaClientFactory {
 
     /**
-     * Creates a new instance of the {@link KafkaProducer} class.
+     * Creates a new instance of the Kafka {@link Producer} class.
      * 
      * @param  kafkaProps The producer configs.
      * @return            an instance of Kafka producer.
      */
-    KafkaProducer getProducer(Properties kafkaProps);
+    Producer getProducer(Properties kafkaProps);
 
     /**
-     * Creates a new instance of the {@link KafkaConsumer} class.
+     * Creates a new instance of the Kafka {@link Consumer} class.
      * 
      * @param  kafkaProps The consumer configs.
      * @return            an instance of Kafka consumer.
      */
-    KafkaConsumer getConsumer(Properties kafkaProps);
+    Consumer getConsumer(Properties kafkaProps);
 
     /**
      * URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 61fb686..2a487d1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -49,7 +49,7 @@ class KafkaFetchRecords implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaFetchRecords.class);
 
     private final KafkaConsumer kafkaConsumer;
-    private org.apache.kafka.clients.consumer.KafkaConsumer consumer;
+    private org.apache.kafka.clients.consumer.Consumer consumer;
     private final String topicName;
     private final Pattern topicPattern;
     private final String threadId;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
index d0b9302..b0f6144 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
@@ -20,7 +20,7 @@ import java.util.Collection;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 
 /**
@@ -32,7 +32,7 @@ public interface KafkaManualCommitFactory {
      * Creates a new instance
      */
     KafkaManualCommit newInstance(
-            Exchange exchange, KafkaConsumer consumer, String topicName, String threadId,
+            Exchange exchange, Consumer consumer, String topicName, String threadId,
             StateRepository<String, String> offsetRepository,
             TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits);
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index a89ecdd..97a6e6d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -56,7 +56,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class);
 
     @SuppressWarnings("rawtypes")
-    private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
+    private org.apache.kafka.clients.producer.Producer kafkaProducer;
     private final KafkaEndpoint endpoint;
     private final KafkaConfiguration configuration;
     private ExecutorService workerPool;
@@ -89,7 +89,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
     }
 
     @SuppressWarnings("rawtypes")
-    public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() {
+    public org.apache.kafka.clients.producer.Producer getKafkaProducer() {
         return kafkaProducer;
     }
 
@@ -97,7 +97,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
      * To use a custom {@link org.apache.kafka.clients.producer.KafkaProducer} instance.
      */
     @SuppressWarnings("rawtypes")
-    public void setKafkaProducer(org.apache.kafka.clients.producer.KafkaProducer kafkaProducer) {
+    public void setKafkaProducer(org.apache.kafka.clients.producer.Producer kafkaProducer) {
         this.kafkaProducer = kafkaProducer;
     }
 
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
index d85579b..0deba22 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
@@ -18,13 +18,15 @@
 package org.apache.camel.component.kafka.consumer.support;
 
 import org.apache.camel.ResumeStrategy;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 
 /**
  * Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume
  * processing records.
  */
-public interface KafkaConsumerResumeStrategy extends ResumeStrategy<KafkaConsumer<?, ?>> {
+public interface KafkaConsumerResumeStrategy extends ResumeStrategy<Consumer<?, ?>> {
+
     /**
      * Perform the resume operation. This runs in the scope of the Kafka Consumer thread and may run concurrently with
      * other consumer instances when the component is set up to use more than one of them. As such, implementations are
@@ -32,5 +34,5 @@ public interface KafkaConsumerResumeStrategy extends ResumeStrategy<KafkaConsume
      *
      * @param consumer an instance of the KafkaConsumer which is resuming the operation
      */
-    void resume(KafkaConsumer<?, ?> consumer);
+    void resume(Consumer<?, ?> consumer);
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index 197a865..320cbfe 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -34,8 +34,8 @@ import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.StateRepository;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Header;
@@ -50,7 +50,7 @@ public class KafkaRecordProcessor {
     private final boolean autoCommitEnabled;
     private final KafkaConfiguration configuration;
     private final Processor processor;
-    private final KafkaConsumer<?, ?> consumer;
+    private final Consumer<?, ?> consumer;
     private final KafkaManualCommitFactory manualCommitFactory;
     private final String threadId;
     private final ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits;
@@ -80,7 +80,7 @@ public class KafkaRecordProcessor {
     }
 
     public KafkaRecordProcessor(boolean autoCommitEnabled, KafkaConfiguration configuration,
-                                Processor processor, KafkaConsumer<?, ?> consumer,
+                                Processor processor, Consumer<?, ?> consumer,
                                 KafkaManualCommitFactory manualCommitFactory,
                                 String threadId, ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits) {
         this.autoCommitEnabled = autoCommitEnabled;
@@ -194,7 +194,7 @@ public class KafkaRecordProcessor {
     }
 
     public static void commitOffset(
-            KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
+            KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
             boolean stopping, boolean forceCommit, String threadId) {
 
         if (partitionLastOffset == START_OFFSET) {
@@ -221,7 +221,7 @@ public class KafkaRecordProcessor {
     }
 
     private static void commitOffset(
-            KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition,
+            KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition,
             long partitionLastOffset) {
         long timeout = configuration.getCommitTimeoutMs();
         consumer.commitSync(
@@ -230,7 +230,7 @@ public class KafkaRecordProcessor {
     }
 
     private static void forceSyncCommit(
-            KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
+            KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
             String threadId) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Forcing commitSync {} [topic: {} partition: {} offset: {}]", threadId, partition.topic(),
@@ -247,7 +247,7 @@ public class KafkaRecordProcessor {
     }
 
     private static void commitSync(
-            KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
+            KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset,
             String threadId) {
 
         if (LOG.isDebugEnabled()) {
@@ -258,7 +258,7 @@ public class KafkaRecordProcessor {
     }
 
     private static void commitAsync(
-            KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, String threadId) {
+            Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, String threadId) {
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Auto commitAsync on stop {} from topic {}", threadId, partition.topic());
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
index 7c13eb1..5e690fa 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
@@ -20,7 +20,7 @@ package org.apache.camel.component.kafka.consumer.support;
 import java.util.Set;
 
 import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,6 +32,7 @@ import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProce
  * A resume strategy that uses Kafka's offset for resuming
  */
 public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
+
     private static final Logger LOG = LoggerFactory.getLogger(OffsetKafkaConsumerResumeStrategy.class);
 
     private final StateRepository<String, String> offsetRepository;
@@ -40,7 +41,7 @@ public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStr
         this.offsetRepository = offsetRepository;
     }
 
-    private void resumeFromOffset(final KafkaConsumer<?, ?> consumer, TopicPartition topicPartition, String offsetState) {
+    private void resumeFromOffset(final Consumer<?, ?> consumer, TopicPartition topicPartition, String offsetState) {
         // The state contains the last read offset, so you need to seek from the next one
         long offset = deserializeOffsetValue(offsetState) + 1;
         LOG.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset);
@@ -48,7 +49,7 @@ public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStr
     }
 
     @Override
-    public void resume(final KafkaConsumer<?, ?> consumer) {
+    public void resume(final Consumer<?, ?> consumer) {
         Set<TopicPartition> assignments = consumer.assignment();
         for (TopicPartition topicPartition : assignments) {
             String offsetState = offsetRepository.getState(serializeOffsetKey(topicPartition));
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index 0420041..1ffbd2f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -22,8 +22,8 @@ import java.util.Map;
 import java.util.function.Supplier;
 
 import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,13 +36,13 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
     private final String threadId;
     private final String topicName;
     private final KafkaConfiguration configuration;
-    private final KafkaConsumer consumer;
+    private final Consumer consumer;
     private final Map<String, Long> lastProcessedOffset;
     private final KafkaConsumerResumeStrategy resumeStrategy;
     private Supplier<Boolean> stopStateSupplier;
 
     public PartitionAssignmentListener(String threadId, String topicName, KafkaConfiguration configuration,
-                                       KafkaConsumer consumer, Map<String, Long> lastProcessedOffset,
+                                       Consumer consumer, Map<String, Long> lastProcessedOffset,
                                        Supplier<Boolean> stopStateSupplier) {
         this.threadId = threadId;
         this.topicName = topicName;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index 272f467..09f304b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.kafka.consumer.support;
 
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,7 +30,7 @@ public final class ResumeStrategyFactory {
     private static class NoOpKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
         @SuppressWarnings("unused")
         @Override
-        public void resume(KafkaConsumer<?, ?> consumer) {
+        public void resume(Consumer<?, ?> consumer) {
             // NO-OP
         }
     }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
index 969f0df..1eba302 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
@@ -17,7 +17,7 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
  * A resume strategy that uses Camel's seekTo configuration for resuming
  */
 public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
+
     private static final Logger LOG = LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeStrategy.class);
 
     private final String seekPolicy;
@@ -34,7 +35,7 @@ public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResum
     }
 
     @Override
-    public void resume(final KafkaConsumer<?, ?> consumer) {
+    public void resume(final Consumer<?, ?> consumer) {
         if (seekPolicy.equals("beginning")) {
             LOG.debug("Seeking from the beginning of topic");
             consumer.seekToBeginning(consumer.assignment());
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 3e287ee..9495020 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -131,7 +131,7 @@ public class KafkaProducerTest {
     public void processSendsMessageWithException() {
         endpoint.getConfiguration().setTopic("sometopic");
         // setup the exception here
-        org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer();
+        org.apache.kafka.clients.producer.Producer kp = producer.getKafkaProducer();
         Mockito.when(kp.send(any(ProducerRecord.class))).thenThrow(new ApiException());
         Mockito.when(exchange.getIn()).thenReturn(in);
         Mockito.when(exchange.getMessage()).thenReturn(in);
@@ -165,7 +165,7 @@ public class KafkaProducerTest {
         Mockito.when(exchange.getMessage()).thenReturn(in);
 
         // setup the exception here
-        org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer();
+        org.apache.kafka.clients.producer.Producer kp = producer.getKafkaProducer();
         Mockito.when(kp.send(any(ProducerRecord.class), any(Callback.class))).thenThrow(new ApiException());
 
         in.setHeader(KafkaConstants.PARTITION_KEY, 4);
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
index 192a1a6..15bd79c 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
@@ -25,7 +25,7 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -53,7 +53,7 @@ public class KafkaConsumerWithResumeStrategyIT extends BaseEmbeddedKafkaTestSupp
         }
 
         @Override
-        public void resume(KafkaConsumer<?, ?> consumer) {
+        public void resume(Consumer<?, ?> consumer) {
             resumeCalled = true;
 
             if (consumer != null) {

[camel] 02/02: CAMEL-17255: camel-kafka - Use kafka interfaces for kafka producer and kafka consumer so the factory can create other implementations, such as mocks for testing.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a464225245b8dc025ea598a25c50e8e8e0345838
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Dec 1 09:58:15 2021 +0100

    CAMEL-17255: camel-kafka - Use kafka interfaces for kafka producer and kafka consumer so the factory can create other implementations, such as mocks for testing.
---
 docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_14.adoc | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_14.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_14.adoc
index 834c86e..ec14503 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_14.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_14.adoc
@@ -10,6 +10,11 @@ from both 3.0 to 3.1 and 3.1 to 3.2.
 
 Added method `updateRoutesToCamelContext` to `org.apache.camel.RoutesBuilder` interface.
 
+=== camel-kafka
+
+The APIs in `camel-kafka` component changed from using the Kafka Client classes `org.apache.kafka.clients.producer.KafkaProducer` and `org.apache.kafka.clients.consumer.KafkaConsumer`
+to their interfaces `org.apache.kafka.clients.producer.Producer` and `org.apache.kafka.clients.consumer.Consumer` instead.
+
 === camel-jbang
 
 The option `debug-level` has been renamed to `logging-level` because the option is for configuring the logging level.