You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/08/10 15:01:16 UTC

[camel] branch main updated (6a731f11d31 -> ae1b727400e)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from 6a731f11d31 Regen for commit a080904e750d8e1c656dada4681e142618be0d1b (#8142)
     new 7a50e23dde3 CAMEL-18371: do load the cache when resuming with the file component
     new ae1b727400e CAMEL-18362: wait for the initial cache loading

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../adapters/FileResumeAdapterDelegate.java        |  4 ++-
 .../kafka/KafkaResumeStrategyConfiguration.java    | 23 ++++++++++++++
 .../KafkaResumeStrategyConfigurationBuilder.java   | 17 ++++++++++
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 37 ++++++++++++++++------
 4 files changed, 71 insertions(+), 10 deletions(-)


[camel] 01/02: CAMEL-18371: do load the cache when resuming with the file component

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7a50e23dde3f986a1d55560047d7ee432369314f
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Aug 9 16:54:18 2022 +0200

    CAMEL-18371: do load the cache when resuming with the file component
---
 .../component/file/consumer/adapters/FileResumeAdapterDelegate.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
index f00229329f6..c5f0ffaa2c9 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
@@ -30,6 +30,8 @@ import org.apache.camel.resume.Offset;
 import org.apache.camel.resume.OffsetKey;
 import org.apache.camel.resume.cache.ResumeCache;
 import org.apache.camel.spi.annotations.JdkService;
+import org.apache.camel.support.resume.OffsetKeys;
+import org.apache.camel.support.resume.Offsets;
 
 @JdkService("file-adapter-factory")
 public class FileResumeAdapterDelegate
@@ -83,7 +85,7 @@ public class FileResumeAdapterDelegate
             fileOffsetResumeAdapter.deserializeFileOffset((File) keyObj, (Long) valueObj);
         }
 
-        return true;
+        return add(OffsetKeys.of(keyObj), Offsets.of(valueObj));
     }
 
     @Override


[camel] 02/02: CAMEL-18362: wait for the initial cache loading

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ae1b727400e707b2d7b310747ab062febf50619b
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Aug 9 16:58:49 2022 +0200

    CAMEL-18362: wait for the initial cache loading
---
 .../kafka/KafkaResumeStrategyConfiguration.java    | 23 ++++++++++++++
 .../KafkaResumeStrategyConfigurationBuilder.java   | 17 ++++++++++
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 37 ++++++++++++++++------
 3 files changed, 68 insertions(+), 9 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
index 2e314079223..94aede6fadd 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
@@ -17,6 +17,7 @@
 
 package org.apache.camel.processor.resume.kafka;
 
+import java.time.Duration;
 import java.util.Properties;
 
 import org.apache.camel.resume.ResumeStrategyConfiguration;
@@ -28,6 +29,8 @@ public class KafkaResumeStrategyConfiguration extends ResumeStrategyConfiguratio
     private Properties producerProperties;
     private Properties consumerProperties;
     private String topic;
+    private Duration maxInitializationDuration;
+    private int maxInitializationRetries;
 
     public Properties getProducerProperties() {
         return producerProperties;
@@ -58,4 +61,24 @@ public class KafkaResumeStrategyConfiguration extends ResumeStrategyConfiguratio
 
         this.topic = topic;
     }
+
+    public Duration getMaxInitializationDuration() {
+        return maxInitializationDuration;
+    }
+
+    public void setMaxInitializationDuration(Duration maxInitializationDuration) {
+        this.maxInitializationDuration = maxInitializationDuration;
+    }
+
+    public int getMaxInitializationRetries() {
+        return maxInitializationRetries;
+    }
+
+    public void setMaxInitializationRetries(int maxInitializationRetries) {
+        if (maxInitializationRetries < 1) {
+            throw new IllegalArgumentException("The maximum number of initialization retries must be equal or bigger than 1");
+        }
+
+        this.maxInitializationRetries = maxInitializationRetries;
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
index 150936148d0..3c4e6a21ca5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
@@ -17,6 +17,7 @@
 
 package org.apache.camel.processor.resume.kafka;
 
+import java.time.Duration;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -43,6 +44,8 @@ public class KafkaResumeStrategyConfigurationBuilder
     private Properties producerProperties;
     private Properties consumerProperties;
     private String topic;
+    private Duration maxInitializationDuration = Duration.ofSeconds(10);
+    private int maxInitializationRetries = 5;
 
     private KafkaResumeStrategyConfigurationBuilder() {
     }
@@ -103,6 +106,18 @@ public class KafkaResumeStrategyConfigurationBuilder
         return this;
     }
 
+    public KafkaResumeStrategyConfigurationBuilder withMaxInitializationDuration(Duration duration) {
+        this.maxInitializationDuration = duration;
+
+        return this;
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder withMaxInitializationRetries(int retries) {
+        this.maxInitializationRetries = retries;
+
+        return this;
+    }
+
     /**
      * Creates a basic consumer
      *
@@ -138,6 +153,8 @@ public class KafkaResumeStrategyConfigurationBuilder
         resumeStrategyConfiguration.setConsumerProperties(consumerProperties);
         resumeStrategyConfiguration.setProducerProperties(producerProperties);
         resumeStrategyConfiguration.setTopic(topic);
+        resumeStrategyConfiguration.setMaxInitializationDuration(maxInitializationDuration);
+        resumeStrategyConfiguration.setMaxInitializationRetries(maxInitializationRetries);
 
         return resumeStrategyConfiguration;
     }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index a527b66c9d7..9a60cce760f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -20,11 +20,13 @@ package org.apache.camel.processor.resume.kafka;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -152,13 +154,24 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
             throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
         }
 
-        executorService.submit(this::refresh);
+        CountDownLatch latch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
+        executorService.submit(() -> refresh(latch));
+
+        try {
+            LOG.trace("Waiting for kafka resume strategy async initialization");
+            if (!latch.await(resumeStrategyConfiguration.getMaxInitializationDuration().toMillis(), TimeUnit.MILLISECONDS)) {
+                LOG.debug("The initialization timed out");
+            }
+            LOG.trace("Kafka resume strategy initialization complete");
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     /**
      * Launch a thread to refresh the offsets periodically
      */
-    private void refresh() {
+    private void refresh(CountDownLatch latch) {
         LOG.trace("Creating a offset cache refresher");
 
         try {
@@ -169,7 +182,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
             LOG.debug("Loading records from topic {}", resumeStrategyConfiguration.getTopic());
             consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
 
-            poll(consumer);
+            poll(consumer, latch);
         } catch (WakeupException e) {
             LOG.info("Kafka consumer was interrupted during a blocking call");
         } catch (Exception e) {
@@ -182,27 +195,33 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         }
     }
 
-    protected void poll(Consumer<byte[], byte[]> consumer) {
+    protected void poll(Consumer<byte[], byte[]> consumer, CountDownLatch latch) {
         Deserializable deserializable = (Deserializable) adapter;
+        boolean initialized = false;
 
         do {
             ConsumerRecords<byte[], byte[]> records = consume(consumer);
 
-            if (records.isEmpty()) {
-                continue;
-            }
-
             for (ConsumerRecord<byte[], byte[]> record : records) {
                 byte[] value = record.value();
 
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace("Read from Kafka: {}", value);
+                    LOG.trace("Read from Kafka at {} ({}): {}", Instant.ofEpochMilli(record.timestamp()),
+                            record.timestampType(), value);
                 }
 
                 if (!deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value()))) {
                     LOG.warn("Deserializer indicates that this is the last record to deserialize");
                 }
             }
+
+            if (!initialized) {
+                if (latch.getCount() == 1) {
+                    initialized = true;
+                }
+
+                latch.countDown();
+            }
         } while (true);
     }