You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/06/17 14:13:20 UTC
[camel] 03/04: CAMEL-18128: allow determining the desirable resume cache fill policy
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit e8896bc3ab473086efd504d52542414b03a67b01
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Jun 16 17:26:26 2022 +0200
CAMEL-18128: allow determining the desirable resume cache fill policy
---
.../kafka/SingleNodeKafkaResumeStrategy.java | 10 +++++++++
.../java/org/apache/camel/resume/Cacheable.java | 24 ++++++++++++++++++++++
2 files changed, 34 insertions(+)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 71d8ad26781..4de3f0acc7a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -199,6 +199,16 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
* @throws Exception
*/
public void loadCache() throws Exception {
+ if (adapter instanceof Cacheable) {
+ Cacheable cacheable = (Cacheable) adapter;
+
+ if (cacheable.getFillPolicy() == Cacheable.FillPolicy.MAXIMIZING) {
+ consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ } else {
+ consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ }
+ }
+
createConsumer();
subscribe();
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java b/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java
index 157d3689fe0..e11337c4464 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java
@@ -24,6 +24,21 @@ import org.apache.camel.resume.cache.ResumeCache;
*/
public interface Cacheable {
+ /**
+ * The cache fill policy can be used to determine how this cache should be filled with data.
+ */
+ enum FillPolicy {
+ /**
+ * With MAXIMIZING, entities try to maximize cache usage and fill it with as much data as possible
+ */
+ MAXIMIZING,
+
+ /**
+ * With MINIMIZING, entities should fill it with as little data as reasonable.
+ */
+ MINIMIZING,
+ }
+
/**
* Adds an offset key and value to the cache
*
@@ -46,4 +61,13 @@ public interface Cacheable {
* @return A resume cache instance
*/
ResumeCache<?> getCache();
+
+ /**
+ * Gets the {@Link FillPolicy} for this cache instance
+ *
+ * @return the fill policy set for this instance FillPolicy.MAXIMIZING
+ */
+ default FillPolicy getFillPolicy() {
+ return FillPolicy.MAXIMIZING;
+ }
}