You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/03/01 18:24:09 UTC

[camel] 03/16: CAMEL-15562: cleanup the creation of the strategy factories

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0992cbc2b28bb6bc385a7a4fe3d7eb16c6558fcc
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Feb 4 09:18:48 2022 +0100

    CAMEL-15562: cleanup the creation of the strategy factories
---
 .../apache/camel/component/kafka/KafkaFetchRecords.java   | 10 ++++++----
 .../consumer/support/PartitionAssignmentListener.java     | 14 +++-----------
 .../kafka/consumer/support/ResumeStrategyFactory.java     | 15 ++++++++++++++-
 3 files changed, 23 insertions(+), 16 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index deeef3c..91728e3 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -32,6 +32,7 @@ import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStra
 import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade;
 import org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener;
 import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
+import org.apache.camel.component.kafka.consumer.support.ResumeStrategyFactory;
 import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
 import org.apache.camel.util.IOHelper;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -137,12 +138,13 @@ class KafkaFetchRecords implements Runnable {
     }
 
     private void subscribe() {
-        KafkaConsumerResumeStrategy userProvidedStrategy
-                = kafkaConsumer.getEndpoint().getCamelContext().hasService(KafkaConsumerResumeStrategy.class);
+
+        KafkaConsumerResumeStrategy resumeStrategy = ResumeStrategyFactory.newResumeStrategy(kafkaConsumer);
+        resumeStrategy.setConsumer(consumer);
 
         PartitionAssignmentListener listener = new PartitionAssignmentListener(
-                threadId, kafkaConsumer.getEndpoint().getConfiguration(), consumer, lastProcessedOffset,
-                this::isRunnable, commitManager, userProvidedStrategy);
+                threadId, kafkaConsumer.getEndpoint().getConfiguration(), lastProcessedOffset,
+                this::isRunnable, commitManager, resumeStrategy);
 
         if (LOG.isInfoEnabled()) {
             LOG.info("Subscribing {} to {}", threadId, getPrintableTopic());
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index fe19147..482f804 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -25,7 +25,6 @@ import java.util.stream.Collectors;
 
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.consumer.CommitManager;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
@@ -41,10 +40,10 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
     private final Map<String, Long> lastProcessedOffset;
     private final KafkaConsumerResumeStrategy resumeStrategy;
     private final CommitManager commitManager;
-    private Supplier<Boolean> stopStateSupplier;
+    private final Supplier<Boolean> stopStateSupplier;
 
     public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration,
-                                       Consumer consumer, Map<String, Long> lastProcessedOffset,
+                                       Map<String, Long> lastProcessedOffset,
                                        Supplier<Boolean> stopStateSupplier, CommitManager commitManager,
                                        KafkaConsumerResumeStrategy resumeStrategy) {
         this.threadId = threadId;
@@ -52,15 +51,8 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
         this.lastProcessedOffset = lastProcessedOffset;
         this.commitManager = commitManager;
         this.stopStateSupplier = stopStateSupplier;
+        this.resumeStrategy = resumeStrategy;
 
-        if (resumeStrategy == null) {
-            LOG.info("No resume strategy was provided ... checking for builtins ...");
-            this.resumeStrategy = ResumeStrategyFactory.newResumeStrategy(configuration);
-        } else {
-            LOG.info("Using user-provided strategy");
-            this.resumeStrategy = resumeStrategy;
-        }
-        resumeStrategy.setConsumer(consumer);
     }
 
     @Override
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index 67707de..c2648ce 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.kafka.consumer.support;
 
 import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.component.kafka.KafkaConsumer;
 import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.slf4j.Logger;
@@ -53,8 +54,19 @@ public final class ResumeStrategyFactory {
     private ResumeStrategyFactory() {
     }
 
-    public static KafkaConsumerResumeStrategy newResumeStrategy(KafkaConfiguration configuration) {
+    public static KafkaConsumerResumeStrategy newResumeStrategy(KafkaConsumer kafkaConsumer) {
+        // When using resumable routes, which register the strategy via service, it takes priority over everything else
+        KafkaConsumerResumeStrategy resumableRouteStrategy
+                = kafkaConsumer.getEndpoint().getCamelContext().hasService(KafkaConsumerResumeStrategy.class);
+
+        if (resumableRouteStrategy != null) {
+            return resumableRouteStrategy;
+        }
+
+        KafkaConfiguration configuration = kafkaConsumer.getEndpoint().getConfiguration();
+
         if (configuration.getResumeStrategy() != null) {
+            LOG.info("Using user-provided strategy");
             return configuration.getResumeStrategy();
         }
 
@@ -62,6 +74,7 @@ public final class ResumeStrategyFactory {
     }
 
     private static KafkaConsumerResumeStrategy builtinResumeStrategies(KafkaConfiguration configuration) {
+        LOG.debug("No resume strategy was provided ... checking for built-ins ...");
         StateRepository<String, String> offsetRepository = configuration.getOffsetRepository();
         String seekTo = configuration.getSeekTo();