You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/08/10 15:01:18 UTC

[camel] 02/02: CAMEL-18362: wait for the initial cache loading

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ae1b727400e707b2d7b310747ab062febf50619b
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Aug 9 16:58:49 2022 +0200

    CAMEL-18362: wait for the initial cache loading
---
 .../kafka/KafkaResumeStrategyConfiguration.java    | 23 ++++++++++++++
 .../KafkaResumeStrategyConfigurationBuilder.java   | 17 ++++++++++
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 37 ++++++++++++++++------
 3 files changed, 68 insertions(+), 9 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
index 2e314079223..94aede6fadd 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
@@ -17,6 +17,7 @@
 
 package org.apache.camel.processor.resume.kafka;
 
+import java.time.Duration;
 import java.util.Properties;
 
 import org.apache.camel.resume.ResumeStrategyConfiguration;
@@ -28,6 +29,8 @@ public class KafkaResumeStrategyConfiguration extends ResumeStrategyConfiguratio
     private Properties producerProperties;
     private Properties consumerProperties;
     private String topic;
+    private Duration maxInitializationDuration;
+    private int maxInitializationRetries;
 
     public Properties getProducerProperties() {
         return producerProperties;
@@ -58,4 +61,24 @@ public class KafkaResumeStrategyConfiguration extends ResumeStrategyConfiguratio
 
         this.topic = topic;
     }
+
+    public Duration getMaxInitializationDuration() {
+        return maxInitializationDuration;
+    }
+
+    public void setMaxInitializationDuration(Duration maxInitializationDuration) {
+        this.maxInitializationDuration = maxInitializationDuration;
+    }
+
+    public int getMaxInitializationRetries() {
+        return maxInitializationRetries;
+    }
+
+    public void setMaxInitializationRetries(int maxInitializationRetries) {
+        if (maxInitializationRetries < 1) {
+            throw new IllegalArgumentException("The maximum number of initialization retries must be equal or bigger than 1");
+        }
+
+        this.maxInitializationRetries = maxInitializationRetries;
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
index 150936148d0..3c4e6a21ca5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
@@ -17,6 +17,7 @@
 
 package org.apache.camel.processor.resume.kafka;
 
+import java.time.Duration;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -43,6 +44,8 @@ public class KafkaResumeStrategyConfigurationBuilder
     private Properties producerProperties;
     private Properties consumerProperties;
     private String topic;
+    private Duration maxInitializationDuration = Duration.ofSeconds(10);
+    private int maxInitializationRetries = 5;
 
     private KafkaResumeStrategyConfigurationBuilder() {
     }
@@ -103,6 +106,18 @@ public class KafkaResumeStrategyConfigurationBuilder
         return this;
     }
 
+    public KafkaResumeStrategyConfigurationBuilder withMaxInitializationDuration(Duration duration) {
+        this.maxInitializationDuration = duration;
+
+        return this;
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder withMaxInitializationRetries(int retries) {
+        this.maxInitializationRetries = retries;
+
+        return this;
+    }
+
     /**
      * Creates a basic consumer
      *
@@ -138,6 +153,8 @@ public class KafkaResumeStrategyConfigurationBuilder
         resumeStrategyConfiguration.setConsumerProperties(consumerProperties);
         resumeStrategyConfiguration.setProducerProperties(producerProperties);
         resumeStrategyConfiguration.setTopic(topic);
+        resumeStrategyConfiguration.setMaxInitializationDuration(maxInitializationDuration);
+        resumeStrategyConfiguration.setMaxInitializationRetries(maxInitializationRetries);
 
         return resumeStrategyConfiguration;
     }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index a527b66c9d7..9a60cce760f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -20,11 +20,13 @@ package org.apache.camel.processor.resume.kafka;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -152,13 +154,24 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
             throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
         }
 
-        executorService.submit(this::refresh);
+        CountDownLatch latch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
+        executorService.submit(() -> refresh(latch));
+
+        try {
+            LOG.trace("Waiting for kafka resume strategy async initialization");
+            if (!latch.await(resumeStrategyConfiguration.getMaxInitializationDuration().toMillis(), TimeUnit.MILLISECONDS)) {
+                LOG.debug("The initialization timed out");
+            }
+            LOG.trace("Kafka resume strategy initialization complete");
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     /**
      * Launch a thread to refresh the offsets periodically
      */
-    private void refresh() {
+    private void refresh(CountDownLatch latch) {
         LOG.trace("Creating a offset cache refresher");
 
         try {
@@ -169,7 +182,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
             LOG.debug("Loading records from topic {}", resumeStrategyConfiguration.getTopic());
             consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
 
-            poll(consumer);
+            poll(consumer, latch);
         } catch (WakeupException e) {
             LOG.info("Kafka consumer was interrupted during a blocking call");
         } catch (Exception e) {
@@ -182,27 +195,33 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         }
     }
 
-    protected void poll(Consumer<byte[], byte[]> consumer) {
+    protected void poll(Consumer<byte[], byte[]> consumer, CountDownLatch latch) {
         Deserializable deserializable = (Deserializable) adapter;
+        boolean initialized = false;
 
         do {
             ConsumerRecords<byte[], byte[]> records = consume(consumer);
 
-            if (records.isEmpty()) {
-                continue;
-            }
-
             for (ConsumerRecord<byte[], byte[]> record : records) {
                 byte[] value = record.value();
 
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace("Read from Kafka: {}", value);
+                    LOG.trace("Read from Kafka at {} ({}): {}", Instant.ofEpochMilli(record.timestamp()),
+                            record.timestampType(), value);
                 }
 
                 if (!deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value()))) {
                     LOG.warn("Deserializer indicates that this is the last record to deserialize");
                 }
             }
+
+            if (!initialized) {
+                if (latch.getCount() == 1) {
+                    initialized = true;
+                }
+
+                latch.countDown();
+            }
         } while (true);
     }