You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/06/17 14:13:20 UTC

[camel] 03/04: CAMEL-18128: allow determining the desirable resume cache fill policy

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e8896bc3ab473086efd504d52542414b03a67b01
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Jun 16 17:26:26 2022 +0200

    CAMEL-18128: allow determining the desirable resume cache fill policy
---
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 10 +++++++++
 .../java/org/apache/camel/resume/Cacheable.java    | 24 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 71d8ad26781..4de3f0acc7a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -199,6 +199,16 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
      * @throws Exception
      */
     public void loadCache() throws Exception {
+        if (adapter instanceof Cacheable) {
+            Cacheable cacheable = (Cacheable) adapter;
+
+            if (cacheable.getFillPolicy() == Cacheable.FillPolicy.MAXIMIZING) {
+                consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+            } else {
+                consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+            }
+        }
+
         createConsumer();
 
         subscribe();
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java b/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java
index 157d3689fe0..e11337c4464 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java
@@ -24,6 +24,21 @@ import org.apache.camel.resume.cache.ResumeCache;
  */
 public interface Cacheable {
 
+    /**
+     * The cache fill policy can be used to determine how this cache should be filled with data.
+     */
+    enum FillPolicy {
+        /**
+         * With MAXIMIZING, entities try to maximize cache usage and fill it with as much data as possible
+         */
+        MAXIMIZING,
+
+        /**
+         * With MINIMIZING, entities should fill it with as little data as reasonable.
+         */
+        MINIMIZING,
+    }
+
     /**
      * Adds an offset key and value to the cache
      * 
@@ -46,4 +61,13 @@ public interface Cacheable {
      * @return A resume cache instance
      */
     ResumeCache<?> getCache();
+
+    /**
+     * Gets the {@Link FillPolicy} for this cache instance
+     *
+     * @return the fill policy set for this instance FillPolicy.MAXIMIZING
+     */
+    default FillPolicy getFillPolicy() {
+        return FillPolicy.MAXIMIZING;
+    }
 }