You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/08/10 16:26:38 UTC

[camel] branch camel-3.18.x updated (98c46d8061c -> 5fedf91e806)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git


    from 98c46d8061c CAMEL-18371: do load the cache when resuming with the file component
     new 9e832b0428c CAMEL-18356: consolidated the Kafka resume strategies
     new b4c8000991b (chores) camel-kafka: fixed typo in the resume strategy
     new 8bf4ae69846 (chores) camel-kafka: add log guard on resume strategy critical path
     new fe095bc545b CAMEL-18362: rework the consumer/producer
     new 28e1abba16c (chores) camel-kafka: ensure tests run a little bit faster
     new 5fedf91e806 CAMEL-18362: wait for the initial cache loading

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kafka/KafkaResumeStrategyConfiguration.java    |  23 +++
 .../KafkaResumeStrategyConfigurationBuilder.java   |  17 ++
 .../resume/kafka/MultiNodeKafkaResumeStrategy.java |  75 +------
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 229 ++++++++++++---------
 .../KafkaConsumerAsyncManualCommitIT.java          |   2 +-
 .../KafkaConsumerBadPortHealthCheckIT.java         |   2 +-
 ...fkaConsumerBadPortSupervisingHealthCheckIT.java |   2 +-
 .../kafka/integration/KafkaConsumerFullIT.java     |   2 +-
 .../integration/KafkaConsumerHealthCheckIT.java    |   2 +-
 .../integration/KafkaConsumerIdempotentIT.java     |   2 +-
 ...kaConsumerIdempotentWithCustomSerializerIT.java |   2 +-
 .../KafkaConsumerIdempotentWithProcessorIT.java    |   2 +-
 .../integration/KafkaConsumerTopicIsPatternIT.java |   5 +-
 .../KafkaConsumerUnresolvableHealthCheckIT.java    |   2 +-
 .../commit/KafkaConsumerNoopCommitIT.java          |   2 +-
 .../commit/KafkaConsumerSyncCommitIT.java          |   2 +-
 .../KafkaPausableConsumerCircuitBreakerIT.java     |   2 +-
 .../integration/pause/KafkaPausableConsumerIT.java |   2 +-
 18 files changed, 191 insertions(+), 184 deletions(-)


[camel] 01/06: CAMEL-18356: consolidated the Kafka resume strategies

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9e832b0428cf7f0101e9be2cb6b9c19f4958b5fd
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Aug 10 16:50:25 2022 +0200

    CAMEL-18356: consolidated the Kafka resume strategies
---
 .../resume/kafka/MultiNodeKafkaResumeStrategy.java | 64 +---------------------
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 60 ++++++++++++++++----
 2 files changed, 50 insertions(+), 74 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
index 4cfe256fb3d..82580200f03 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
@@ -40,9 +40,9 @@ import org.slf4j.LoggerFactory;
  *
  * @param <K> the type of key
  */
+@Deprecated
 public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNodeKafkaResumeStrategy<K> {
     private static final Logger LOG = LoggerFactory.getLogger(MultiNodeKafkaResumeStrategy.class);
-    private final ExecutorService executorService;
 
     /**
      * Create a new instance of this class
@@ -64,68 +64,8 @@ public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNod
 
     public MultiNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration,
                                         ExecutorService executorService) {
-        super(resumeStrategyConfiguration);
-
-        this.executorService = executorService;
-    }
-
-    protected void poll() {
-        poll(getConsumer());
-    }
-
-    protected void poll(Consumer<byte[], byte[]> consumer) {
-        Deserializable deserializable = (Deserializable) getAdapter();
-
-        ConsumerRecords<byte[], byte[]> records;
-        do {
-            records = consume(10, consumer);
-
-            if (records.isEmpty()) {
-                break;
-            }
-
-            for (ConsumerRecord<byte[], byte[]> record : records) {
-                byte[] value = record.value();
-
-                LOG.trace("Read from Kafka: {}", value);
-
-                deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value()));
-            }
-        } while (true);
+        super(resumeStrategyConfiguration, executorService);
     }
 
-    @Override
-    public void loadCache() throws Exception {
-        super.loadCache();
 
-        executorService.submit(() -> refresh());
-    }
-
-    /**
-     * Launch a thread to refresh the offsets periodically
-     */
-    private void refresh() {
-        LOG.trace("Creating a offset cache refresher");
-        try {
-            Properties prop = (Properties) getResumeStrategyConfiguration().getConsumerProperties().clone();
-            prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
-
-            try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(prop)) {
-                consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
-
-                poll(consumer);
-            }
-        } catch (Exception e) {
-            LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public void stop() {
-        try {
-            executorService.shutdown();
-        } finally {
-            super.stop();
-        }
-    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 870185127de..db87f65074a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -23,9 +23,13 @@ import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Objects;
+import java.util.Properties;
 import java.util.Queue;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.resume.Cacheable;
@@ -37,6 +41,7 @@ import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.cache.ResumeCache;
 import org.apache.camel.util.IOHelper;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -50,9 +55,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for single node
- * integrations. For multi-node integrations (i.e: using clusters with the master component check
- * {@link MultiNodeKafkaResumeStrategy}.
- *
+ * integrations.
  */
 public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements KafkaResumeStrategy<T> {
     private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
@@ -66,15 +69,29 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
     private boolean subscribed;
     private ResumeAdapter adapter;
     private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
+    private final ExecutorService executorService;
 
     /**
      * Builds an instance of this class
-     * 
+     *
      * @param resumeStrategyConfiguration the configuration to use for this strategy instance
      *
      */
     public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) {
         this.resumeStrategyConfiguration = resumeStrategyConfiguration;
+        executorService = Executors.newSingleThreadExecutor();
+    }
+
+    /**
+     * Builds an instance of this class
+     *
+     * @param resumeStrategyConfiguration the configuration to use for this strategy instance
+     *
+     */
+    public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration,
+                                         ExecutorService executorService) {
+        this.resumeStrategyConfiguration = resumeStrategyConfiguration;
+        this.executorService = executorService;
     }
 
     /**
@@ -125,7 +142,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
 
     /**
      * Loads the existing data into the cache
-     * 
+     *
      * @throws Exception
      */
     public void loadCache() throws Exception {
@@ -138,9 +155,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         if (!(adapter instanceof Deserializable)) {
             throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
         }
-        poll();
 
-        unsubscribe();
+        executorService.submit(() -> refresh());
     }
 
     protected void poll() {
@@ -160,7 +176,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
                 LOG.trace("Read from Kafka: {}", value);
 
                 if (!deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value()))) {
-                    break;
+                    LOG.warn("Deserializar indicates that this is the last record to deserialize");
                 }
             }
         } while (true);
@@ -168,7 +184,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
 
     /**
      * Subscribe to the topic if not subscribed yet
-     * 
+     *
      * @param topic the topic to consume the messages from
      */
     protected void checkAndSubscribe(String topic) {
@@ -181,7 +197,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
 
     /**
      * Subscribe to the topic if not subscribed yet
-     * 
+     *
      * @param topic     the topic to consume the messages from
      * @param remaining the number of messages to rewind from the last offset position (used to fill the cache)
      */
@@ -196,7 +212,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
     /**
      * Creates a new consumer rebalance listener. This can be useful for setting the exact Kafka offset when necessary
      * to read a limited amount of messages or customize the resume strategy behavior when a rebalance occurs.
-     * 
+     *
      * @param  remaining the number of remaining messages on the topic to try to collect
      * @return
      */
@@ -252,7 +268,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
 
     /**
      * Consumes message from the topic previously setup
-     * 
+     *
      * @param  retries how many times to retry consuming data from the topic
      * @return         An instance of the consumer records
      */
@@ -349,6 +365,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
 
         LOG.info("Closing the Kafka consumer");
         IOHelper.close(producer, "Kafka consumer", LOG);
+
+        executorService.shutdown();
     }
 
     @Override
@@ -388,4 +406,22 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         return resumeStrategyConfiguration;
     }
 
+    /**
+     * Launch a thread to refresh the offsets periodically
+     */
+    private void refresh() {
+        LOG.trace("Creating a offset cache refresher");
+        try {
+            Properties prop = (Properties) getResumeStrategyConfiguration().getConsumerProperties().clone();
+            prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+
+            try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(prop)) {
+                consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
+
+                poll();
+            }
+        } catch (Exception e) {
+            LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e);
+        }
+    }
 }


[camel] 04/06: CAMEL-18362: rework the consumer/producer

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fe095bc545b6212ea3e8e72265e7c4df3b9c36a7
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Mon Aug 8 16:31:58 2022 +0200

    CAMEL-18362: rework the consumer/producer
    
    - Ensure the consumer is mostly isolated in its own thread
    - Prevent closing the producer while an offset update is happening
---
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 190 +++++++++------------
 1 file changed, 84 insertions(+), 106 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index b731e9b0c1c..a527b66c9d7 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -23,13 +23,13 @@ import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Objects;
-import java.util.Properties;
 import java.util.Queue;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.resume.Cacheable;
@@ -41,7 +41,6 @@ import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.cache.ResumeCache;
 import org.apache.camel.util.IOHelper;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -50,6 +49,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,12 +70,12 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
     private ResumeAdapter adapter;
     private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
     private final ExecutorService executorService;
+    private final ReentrantLock lock = new ReentrantLock();
 
     /**
      * Builds an instance of this class
      *
      * @param resumeStrategyConfiguration the configuration to use for this strategy instance
-     *
      */
     public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) {
         this.resumeStrategyConfiguration = resumeStrategyConfiguration;
@@ -86,7 +86,6 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
      * Builds an instance of this class
      *
      * @param resumeStrategyConfiguration the configuration to use for this strategy instance
-     *
      */
     public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration,
                                          ExecutorService executorService) {
@@ -98,11 +97,10 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
      * Sends data to a topic. The records will always be sent asynchronously. If there's an error, a producer error
      * counter will be increased.
      *
-     * @see                         SingleNodeKafkaResumeStrategy#getProducerErrors()
      * @param  message              the message to send
      * @throws ExecutionException
      * @throws InterruptedException
-     *
+     * @see                         SingleNodeKafkaResumeStrategy#getProducerErrors()
      */
     protected void produce(byte[] key, byte[] message) throws ExecutionException, InterruptedException {
         ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(resumeStrategyConfiguration.getTopic(), key, message);
@@ -125,8 +123,6 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
 
     @Override
     public void updateLastOffset(T offset) throws Exception {
-        createProducer();
-
         OffsetKey<?> key = offset.getOffsetKey();
         Offset<?> offsetValue = offset.getLastOffset();
 
@@ -137,39 +133,63 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         ByteBuffer keyBuffer = key.serialize();
         ByteBuffer valueBuffer = offsetValue.serialize();
 
-        produce(keyBuffer.array(), valueBuffer.array());
+        try {
+            lock.lock();
+            produce(keyBuffer.array(), valueBuffer.array());
+        } finally {
+            lock.unlock();
+        }
 
         doAdd(key, offsetValue);
     }
 
     /**
      * Loads the existing data into the cache
-     *
-     * @throws Exception
      */
-    public void loadCache() throws Exception {
-        createConsumer();
-
-        subscribe();
-
-        LOG.debug("Loading records from topic {}", resumeStrategyConfiguration.getTopic());
-
+    @Override
+    public void loadCache() {
         if (!(adapter instanceof Deserializable)) {
             throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
         }
 
-        executorService.submit(() -> refresh());
+        executorService.submit(this::refresh);
+    }
+
+    /**
+     * Launch a thread to refresh the offsets periodically
+     */
+    private void refresh() {
+        LOG.trace("Creating a offset cache refresher");
+
+        try {
+            consumer = createConsumer();
+
+            subscribe(consumer);
+
+            LOG.debug("Loading records from topic {}", resumeStrategyConfiguration.getTopic());
+            consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
+
+            poll(consumer);
+        } catch (WakeupException e) {
+            LOG.info("Kafka consumer was interrupted during a blocking call");
+        } catch (Exception e) {
+            LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e);
+        } finally {
+            if (consumer != null) {
+                consumer.unsubscribe();
+                consumer.close(Duration.ofSeconds(5));
+            }
+        }
     }
 
-    protected void poll() {
+    protected void poll(Consumer<byte[], byte[]> consumer) {
         Deserializable deserializable = (Deserializable) adapter;
 
-        ConsumerRecords<byte[], byte[]> records;
         do {
-            records = consume();
+            ConsumerRecords<byte[], byte[]> records = consume(consumer);
 
             if (records.isEmpty()) {
-                break;
+                continue;
             }
 
             for (ConsumerRecord<byte[], byte[]> record : records) {
@@ -191,7 +211,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
      *
      * @param topic the topic to consume the messages from
      */
-    protected void checkAndSubscribe(String topic) {
+    protected void checkAndSubscribe(Consumer<byte[], byte[]> consumer, String topic) {
         if (!subscribed) {
             consumer.subscribe(Collections.singletonList(topic));
 
@@ -205,26 +225,18 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
      * @param topic     the topic to consume the messages from
      * @param remaining the number of messages to rewind from the last offset position (used to fill the cache)
      */
-    public void checkAndSubscribe(String topic, long remaining) {
+    public void checkAndSubscribe(Consumer<byte[], byte[]> consumer, String topic, long remaining) {
         if (!subscribed) {
-            consumer.subscribe(Collections.singletonList(topic), getConsumerRebalanceListener(remaining));
-
+            consumer.subscribe(Collections.singletonList(topic), getConsumerRebalanceListener(consumer, remaining));
             subscribed = true;
         }
     }
 
-    /**
-     * Creates a new consumer rebalance listener. This can be useful for setting the exact Kafka offset when necessary
-     * to read a limited amount of messages or customize the resume strategy behavior when a rebalance occurs.
-     *
-     * @param  remaining the number of remaining messages on the topic to try to collect
-     * @return
-     */
-    protected ConsumerRebalanceListener getConsumerRebalanceListener(long remaining) {
+    private ConsumerRebalanceListener getConsumerRebalanceListener(Consumer<byte[], byte[]> consumer, long remaining) {
         return new ConsumerRebalanceListener() {
             @Override
             public void onPartitionsRevoked(Collection<TopicPartition> collection) {
-
+                // NO-OP
             }
 
             @Override
@@ -244,45 +256,15 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         };
     }
 
-    /**
-     * Unsubscribe from the topic
-     */
-    protected void unsubscribe() {
-        try {
-            consumer.unsubscribe();
-        } catch (IllegalStateException e) {
-            LOG.warn("The consumer is likely already closed. Skipping unsubscribing from {}",
-                    resumeStrategyConfiguration.getTopic());
-        } catch (Exception e) {
-            LOG.error("Error unsubscribing from the Kafka topic {}: {}", resumeStrategyConfiguration.getTopic(), e.getMessage(),
-                    e);
-        }
-    }
-
     /**
      * Consumes message from the topic previously setup
      *
      * @return An instance of the consumer records
      */
-    protected ConsumerRecords<byte[], byte[]> consume() {
-        int retries = 10;
-
-        return consume(retries);
-    }
-
-    /**
-     * Consumes message from the topic previously setup
-     *
-     * @param  retries how many times to retry consuming data from the topic
-     * @return         An instance of the consumer records
-     */
-    protected ConsumerRecords<byte[], byte[]> consume(int retries) {
-        while (retries > 0) {
-            ConsumerRecords<byte[], byte[]> records = consumer.poll(pollDuration);
-            if (!records.isEmpty()) {
-                return records;
-            }
-            retries--;
+    protected ConsumerRecords<byte[], byte[]> consume(Consumer<byte[], byte[]> consumer) {
+        ConsumerRecords<byte[], byte[]> records = consumer.poll(pollDuration);
+        if (!records.isEmpty()) {
+            return records;
         }
 
         return ConsumerRecords.empty();
@@ -307,17 +289,17 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         return ConsumerRecords.empty();
     }
 
-    public void subscribe() throws Exception {
+    private void subscribe(Consumer<byte[], byte[]> consumer) {
         if (adapter instanceof Cacheable) {
             ResumeCache<?> cache = ((Cacheable) adapter).getCache();
 
             if (cache.capacity() >= 1) {
-                checkAndSubscribe(resumeStrategyConfiguration.getTopic(), cache.capacity());
+                checkAndSubscribe(consumer, resumeStrategyConfiguration.getTopic(), cache.capacity());
             } else {
-                checkAndSubscribe(resumeStrategyConfiguration.getTopic());
+                checkAndSubscribe(consumer, resumeStrategyConfiguration.getTopic());
             }
         } else {
-            checkAndSubscribe(resumeStrategyConfiguration.getTopic());
+            checkAndSubscribe(consumer, resumeStrategyConfiguration.getTopic());
         }
     }
 
@@ -356,21 +338,38 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         }
     }
 
-    private void createConsumer() {
-        if (consumer == null) {
-            consumer = new KafkaConsumer<>(resumeStrategyConfiguration.getConsumerProperties());
-        }
+    private Consumer<byte[], byte[]> createConsumer() {
+        return new KafkaConsumer<>(resumeStrategyConfiguration.getConsumerProperties());
     }
 
     @Override
     public void stop() {
-        LOG.info("Closing the Kafka producer");
-        IOHelper.close(producer, "Kafka producer", LOG);
+        try {
+            LOG.trace("Trying to obtain a lock for closing the producer");
+            if (!lock.tryLock(1, TimeUnit.SECONDS)) {
+                LOG.warn("Failed to obtain a lock for closing the producer. Force closing the producer ...");
+            }
 
-        LOG.info("Closing the Kafka consumer");
-        IOHelper.close(producer, "Kafka consumer", LOG);
+            LOG.info("Closing the Kafka producer");
+            IOHelper.close(producer, "Kafka producer", LOG);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        } finally {
+            lock.unlock();
+        }
 
-        executorService.shutdown();
+        try {
+            LOG.info("Closing the Kafka consumer");
+            consumer.wakeup();
+            executorService.shutdown();
+
+            if (!executorService.awaitTermination(2, TimeUnit.SECONDS)) {
+                LOG.warn("Kafka consumer did not shutdown within 2 seconds");
+                executorService.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
     }
 
     @Override
@@ -381,6 +380,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
     @Override
     public void start() {
         LOG.info("Starting the kafka resume strategy");
+        createProducer();
     }
 
     public Duration getPollDuration() {
@@ -391,10 +391,6 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         this.pollDuration = Objects.requireNonNull(pollDuration, "The poll duration cannot be null");
     }
 
-    protected Consumer<byte[], byte[]> getConsumer() {
-        return consumer;
-    }
-
     protected Producer<byte[], byte[]> getProducer() {
         return producer;
     }
@@ -410,22 +406,4 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         return resumeStrategyConfiguration;
     }
 
-    /**
-     * Launch a thread to refresh the offsets periodically
-     */
-    private void refresh() {
-        LOG.trace("Creating a offset cache refresher");
-        try {
-            Properties prop = (Properties) getResumeStrategyConfiguration().getConsumerProperties().clone();
-            prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
-
-            try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(prop)) {
-                consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
-
-                poll();
-            }
-        } catch (Exception e) {
-            LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e);
-        }
-    }
 }


[camel] 03/06: (chores) camel-kafka: add log guard on resume strategy critical path

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8bf4ae698469a4d4a4010410a4eb8d382becc07c
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Mon Aug 8 16:16:37 2022 +0200

    (chores) camel-kafka: add log guard on resume strategy critical path
---
 .../processor/resume/kafka/SingleNodeKafkaResumeStrategy.java     | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 007e9d29bb7..b731e9b0c1c 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -130,7 +130,9 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         OffsetKey<?> key = offset.getOffsetKey();
         Offset<?> offsetValue = offset.getLastOffset();
 
-        LOG.debug("Updating offset on Kafka with key {} to {}", key.getValue(), offsetValue.getValue());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Updating offset on Kafka with key {} to {}", key.getValue(), offsetValue.getValue());
+        }
 
         ByteBuffer keyBuffer = key.serialize();
         ByteBuffer valueBuffer = offsetValue.serialize();
@@ -173,7 +175,9 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
             for (ConsumerRecord<byte[], byte[]> record : records) {
                 byte[] value = record.value();
 
-                LOG.trace("Read from Kafka: {}", value);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Read from Kafka: {}", value);
+                }
 
                 if (!deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value()))) {
                     LOG.warn("Deserializer indicates that this is the last record to deserialize");


[camel] 02/06: (chores) camel-kafka: fixed typo in the resume strategy

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b4c8000991b63fbcd7e4cf72ffaa06791592bc09
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Mon Aug 8 11:20:23 2022 +0200

    (chores) camel-kafka: fixed typo in the resume strategy
---
 .../camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index db87f65074a..007e9d29bb7 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -176,7 +176,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
                 LOG.trace("Read from Kafka: {}", value);
 
                 if (!deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value()))) {
-                    LOG.warn("Deserializar indicates that this is the last record to deserialize");
+                    LOG.warn("Deserializer indicates that this is the last record to deserialize");
                 }
             }
         } while (true);


[camel] 05/06: (chores) camel-kafka: ensure tests run a little bit faster

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 28e1abba16c52b0b8d234ccc92c18110ca96e04a
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Aug 4 09:12:38 2022 +0200

    (chores) camel-kafka: ensure tests run a little bit faster
    
    This ensures that the polls happen multiple times within the test execution window and also that the session timeout does not exceed it.
---
 .../processor/resume/kafka/MultiNodeKafkaResumeStrategy.java  | 11 -----------
 .../kafka/integration/KafkaConsumerAsyncManualCommitIT.java   |  2 +-
 .../kafka/integration/KafkaConsumerBadPortHealthCheckIT.java  |  2 +-
 .../KafkaConsumerBadPortSupervisingHealthCheckIT.java         |  2 +-
 .../component/kafka/integration/KafkaConsumerFullIT.java      |  2 +-
 .../kafka/integration/KafkaConsumerHealthCheckIT.java         |  2 +-
 .../kafka/integration/KafkaConsumerIdempotentIT.java          |  2 +-
 .../KafkaConsumerIdempotentWithCustomSerializerIT.java        |  2 +-
 .../integration/KafkaConsumerIdempotentWithProcessorIT.java   |  2 +-
 .../kafka/integration/KafkaConsumerTopicIsPatternIT.java      |  5 +++--
 .../integration/KafkaConsumerUnresolvableHealthCheckIT.java   |  2 +-
 .../kafka/integration/commit/KafkaConsumerNoopCommitIT.java   |  2 +-
 .../kafka/integration/commit/KafkaConsumerSyncCommitIT.java   |  2 +-
 .../pause/KafkaPausableConsumerCircuitBreakerIT.java          |  2 +-
 .../kafka/integration/pause/KafkaPausableConsumerIT.java      |  2 +-
 15 files changed, 16 insertions(+), 26 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
index 82580200f03..fd76c7a6046 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
@@ -17,20 +17,10 @@
 
 package org.apache.camel.processor.resume.kafka;
 
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import org.apache.camel.resume.Deserializable;
 import org.apache.camel.resume.Resumable;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,5 +57,4 @@ public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNod
         super(resumeStrategyConfiguration, executorService);
     }
 
-
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
index bfda5a5a5e8..6b94fb6454a 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
@@ -55,7 +55,7 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
     private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerAsyncManualCommitIT.class);
 
     @EndpointInject("kafka:" + TOPIC
-                    + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+                    + "?groupId=KafkaConsumerAsyncManualCommitIT&pollTimeoutMs=1000&autoCommitEnable=false"
                     + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#testFactory")
     private Endpoint from;
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java
index 0ca75d0b2b4..c9a8402cf66 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java
@@ -72,7 +72,7 @@ public class KafkaConsumerBadPortHealthCheckIT extends CamelTestSupport {
     @EndpointInject("kafka:" + TOPIC
                     + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
                     + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
     @EndpointInject("mock:result")
     private MockEndpoint to;
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java
index 31727cae3fd..d210fe298ad 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java
@@ -74,7 +74,7 @@ public class KafkaConsumerBadPortSupervisingHealthCheckIT extends CamelTestSuppo
     @EndpointInject("kafka:" + TOPIC
                     + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
                     + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+                    + "&autoCommitIntervalMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
     @EndpointInject("mock:result")
     private MockEndpoint to;
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
index 46e304d0613..9b857342181 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -59,7 +59,7 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport {
     @EndpointInject("kafka:" + TOPIC
                     + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
                     + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
 
     @EndpointInject("mock:result")
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
index bfac711c265..74fbc8fdbf6 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
@@ -79,7 +79,7 @@ public class KafkaConsumerHealthCheckIT extends CamelTestSupport {
     @EndpointInject("kafka:" + TOPIC
                     + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
                     + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
     @EndpointInject("mock:result")
     private MockEndpoint to;
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
index 6ad1dba93ce..36de458fcfe 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
@@ -45,7 +45,7 @@ public class KafkaConsumerIdempotentIT extends KafkaConsumerIdempotentTestSuppor
                     + "?groupId=group2&autoOffsetReset=earliest"
                     + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
                     + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true"
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true"
                     + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
index 2e80a8d23b7..2095aafdc0f 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
@@ -41,7 +41,7 @@ public class KafkaConsumerIdempotentWithCustomSerializerIT extends KafkaConsumer
                     + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
                     + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
                     + "&headerDeserializer=#class:org.apache.camel.component.kafka.integration.CustomHeaderDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true"
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true"
                     + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
index 4a5f9ff685f..d68eddfa296 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
@@ -40,7 +40,7 @@ public class KafkaConsumerIdempotentWithProcessorIT extends KafkaConsumerIdempot
                     + "?groupId=group2&autoOffsetReset=earliest"
                     + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
                     + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true"
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true"
                     + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java
index a178095e280..bbae0f74910 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java
@@ -38,8 +38,9 @@ public class KafkaConsumerTopicIsPatternIT extends BaseEmbeddedKafkaTestSupport
     public static final String TOPIC = "vess123d";
     public static final String TOPIC_PATTERN = "v.*d";
 
-    @EndpointInject("kafka:" + TOPIC_PATTERN + "?topicIsPattern=true&groupId=group1&autoOffsetReset=earliest"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor&metadataMaxAgeMs=1000")
+    @EndpointInject("kafka:" + TOPIC_PATTERN
+                    + "?topicIsPattern=true&groupId=KafkaConsumerTopicIsPatternIT&autoOffsetReset=earliest"
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor&metadataMaxAgeMs=1000")
     private Endpoint from;
 
     @EndpointInject("mock:result")
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java
index c0395ec7f6c..5f34c34997c 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java
@@ -72,7 +72,7 @@ public class KafkaConsumerUnresolvableHealthCheckIT extends CamelTestSupport {
     @EndpointInject("kafka:" + TOPIC
                     + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
                     + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
     @EndpointInject("mock:result")
     private MockEndpoint to;
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java
index a1daccb40df..d69387a6460 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java
@@ -35,7 +35,7 @@ public class KafkaConsumerNoopCommitIT extends BaseManualCommitTestSupport {
     public static final String TOPIC = "testManualNoopCommitTest";
 
     @EndpointInject("kafka:" + TOPIC
-                    + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+                    + "?groupId=KafkaConsumerNoopCommitIT&pollTimeoutMs=1000&autoCommitEnable=false"
                     + "&allowManualCommit=true&autoOffsetReset=earliest&metadataMaxAgeMs=1000")
     private Endpoint from;
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java
index a6e4cddb173..e9967641009 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java
@@ -34,7 +34,7 @@ public class KafkaConsumerSyncCommitIT extends BaseManualCommitTestSupport {
     public static final String TOPIC = "testManualCommitSyncTest";
 
     @EndpointInject("kafka:" + TOPIC
-                    + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+                    + "?groupId=KafkaConsumerSyncCommitIT&pollTimeoutMs=1000&autoCommitEnable=false"
                     + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
     private Endpoint from;
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java
index 89501aa7c5e..1b813178602 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java
@@ -63,7 +63,7 @@ public class KafkaPausableConsumerCircuitBreakerIT extends BaseEmbeddedKafkaTest
     @EndpointInject("kafka:" + SOURCE_TOPIC
                     + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
                     + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
 
     @EndpointInject("direct:intermediate")
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
index 369bd194e9f..a30f21a252b 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
@@ -81,7 +81,7 @@ public class KafkaPausableConsumerIT extends BaseEmbeddedKafkaTestSupport {
     @EndpointInject("kafka:" + SOURCE_TOPIC
                     + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
                     + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
 
     @EndpointInject("direct:intermediate")


[camel] 06/06: CAMEL-18362: wait for the initial cache loading

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5fedf91e806a394635e2d3837d45c0d849f2b0fe
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Aug 9 16:58:49 2022 +0200

    CAMEL-18362: wait for the initial cache loading
---
 .../kafka/KafkaResumeStrategyConfiguration.java    | 23 ++++++++++++++
 .../KafkaResumeStrategyConfigurationBuilder.java   | 17 ++++++++++
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 37 ++++++++++++++++------
 3 files changed, 68 insertions(+), 9 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
index 2e314079223..94aede6fadd 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
@@ -17,6 +17,7 @@
 
 package org.apache.camel.processor.resume.kafka;
 
+import java.time.Duration;
 import java.util.Properties;
 
 import org.apache.camel.resume.ResumeStrategyConfiguration;
@@ -28,6 +29,8 @@ public class KafkaResumeStrategyConfiguration extends ResumeStrategyConfiguratio
     private Properties producerProperties;
     private Properties consumerProperties;
     private String topic;
+    private Duration maxInitializationDuration;
+    private int maxInitializationRetries;
 
     public Properties getProducerProperties() {
         return producerProperties;
@@ -58,4 +61,24 @@ public class KafkaResumeStrategyConfiguration extends ResumeStrategyConfiguratio
 
         this.topic = topic;
     }
+
+    public Duration getMaxInitializationDuration() {
+        return maxInitializationDuration;
+    }
+
+    public void setMaxInitializationDuration(Duration maxInitializationDuration) {
+        this.maxInitializationDuration = maxInitializationDuration;
+    }
+
+    public int getMaxInitializationRetries() {
+        return maxInitializationRetries;
+    }
+
+    public void setMaxInitializationRetries(int maxInitializationRetries) {
+        if (maxInitializationRetries < 1) {
+            throw new IllegalArgumentException("The maximum number of initialization retries must be equal or bigger than 1");
+        }
+
+        this.maxInitializationRetries = maxInitializationRetries;
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
index 150936148d0..3c4e6a21ca5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
@@ -17,6 +17,7 @@
 
 package org.apache.camel.processor.resume.kafka;
 
+import java.time.Duration;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -43,6 +44,8 @@ public class KafkaResumeStrategyConfigurationBuilder
     private Properties producerProperties;
     private Properties consumerProperties;
     private String topic;
+    private Duration maxInitializationDuration = Duration.ofSeconds(10);
+    private int maxInitializationRetries = 5;
 
     private KafkaResumeStrategyConfigurationBuilder() {
     }
@@ -103,6 +106,18 @@ public class KafkaResumeStrategyConfigurationBuilder
         return this;
     }
 
+    public KafkaResumeStrategyConfigurationBuilder withMaxInitializationDuration(Duration duration) {
+        this.maxInitializationDuration = duration;
+
+        return this;
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder withMaxInitializationRetries(int retries) {
+        this.maxInitializationRetries = retries;
+
+        return this;
+    }
+
     /**
      * Creates a basic consumer
      *
@@ -138,6 +153,8 @@ public class KafkaResumeStrategyConfigurationBuilder
         resumeStrategyConfiguration.setConsumerProperties(consumerProperties);
         resumeStrategyConfiguration.setProducerProperties(producerProperties);
         resumeStrategyConfiguration.setTopic(topic);
+        resumeStrategyConfiguration.setMaxInitializationDuration(maxInitializationDuration);
+        resumeStrategyConfiguration.setMaxInitializationRetries(maxInitializationRetries);
 
         return resumeStrategyConfiguration;
     }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index a527b66c9d7..9a60cce760f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -20,11 +20,13 @@ package org.apache.camel.processor.resume.kafka;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -152,13 +154,24 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
             throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
         }
 
-        executorService.submit(this::refresh);
+        CountDownLatch latch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
+        executorService.submit(() -> refresh(latch));
+
+        try {
+            LOG.trace("Waiting for kafka resume strategy async initialization");
+            if (!latch.await(resumeStrategyConfiguration.getMaxInitializationDuration().toMillis(), TimeUnit.MILLISECONDS)) {
+                LOG.debug("The initialization timed out");
+            }
+            LOG.trace("Kafka resume strategy initialization complete");
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     /**
      * Launch a thread to refresh the offsets periodically
      */
-    private void refresh() {
+    private void refresh(CountDownLatch latch) {
         LOG.trace("Creating a offset cache refresher");
 
         try {
@@ -169,7 +182,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
             LOG.debug("Loading records from topic {}", resumeStrategyConfiguration.getTopic());
             consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
 
-            poll(consumer);
+            poll(consumer, latch);
         } catch (WakeupException e) {
             LOG.info("Kafka consumer was interrupted during a blocking call");
         } catch (Exception e) {
@@ -182,27 +195,33 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         }
     }
 
-    protected void poll(Consumer<byte[], byte[]> consumer) {
+    protected void poll(Consumer<byte[], byte[]> consumer, CountDownLatch latch) {
         Deserializable deserializable = (Deserializable) adapter;
+        boolean initialized = false;
 
         do {
             ConsumerRecords<byte[], byte[]> records = consume(consumer);
 
-            if (records.isEmpty()) {
-                continue;
-            }
-
             for (ConsumerRecord<byte[], byte[]> record : records) {
                 byte[] value = record.value();
 
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace("Read from Kafka: {}", value);
+                    LOG.trace("Read from Kafka at {} ({}): {}", Instant.ofEpochMilli(record.timestamp()),
+                            record.timestampType(), value);
                 }
 
                 if (!deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value()))) {
                     LOG.warn("Deserializer indicates that this is the last record to deserialize");
                 }
             }
+
+            if (!initialized) {
+                if (latch.getCount() == 1) {
+                    initialized = true;
+                }
+
+                latch.countDown();
+            }
         } while (true);
     }