You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/05/03 13:14:52 UTC
[camel] 02/02: CAMEL-17762: rework the resume strategy to separate concerns
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit a68c2607e5fd2cb35b4d11828ba830240212a9d8
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Apr 29 16:14:39 2022 +0200
CAMEL-17762: rework the resume strategy to separate concerns
Rework the strategy so that the handling of the strategy service is
separate from the logic that handles the component-specific resume.
This brings builtin caches, improved reusable Kafka resume trategies and
additional cleanups to the resume API interfaces, classes and
documentation.
---
.../org/apache/camel/catalog/models/pausable.json | 2 +-
.../org/apache/camel/catalog/models/resumable.json | 2 +-
.../apache/camel/component/feed/EntryFilter.java | 14 +-
.../component/feed/FeedEntryPollingConsumer.java | 24 ++-
.../AtomEntryPollingConsumerWithResumeTest.java | 3 +-
.../component/aws2/kinesis/Kinesis2Consumer.java | 26 ++-
...sumeStrategy.java => KinesisResumeAdapter.java} | 23 +--
... => KinesisUserConfigurationResumeAdapter.java} | 4 +-
.../caffeine/resume/multi/CaffeineCache.java | 98 +++++++++
.../caffeine/resume/single/CaffeineCache.java | 93 +++++++++
.../component/cassandra/CassandraConsumer.java | 20 +-
...meStrategy.java => CassandraResumeAdapter.java} | 16 +-
.../CassandraComponentResumeStrategyIT.java | 9 +-
.../component/couchbase/CouchbaseConsumer.java | 18 +-
...meStrategy.java => CouchbaseResumeAdapter.java} | 16 +-
.../integration/ConsumeResumeStrategyIT.java | 16 +-
components/camel-couchdb/pom.xml | 4 +
.../component/couchdb/CouchDbChangesetTracker.java | 15 +-
.../camel/component/couchdb/CouchDbConsumer.java | 12 +-
.../couchdb/consumer/CouchDbResumable.java | 4 +-
...sumeStrategy.java => CouchDbResumeAdapter.java} | 21 +-
.../consumer/CouchDbResumeStrategyFactory.java | 8 +-
...java => LatestUpdateSequenceResumeAdapter.java} | 4 +-
.../apache/camel/component/file/FileConsumer.java | 37 ++--
.../apache/camel/component/file/GenericFile.java | 2 +-
...rResumeStrategy.java => FileResumeAdapter.java} | 6 +-
.../component/file/consumer/FileResumeSet.java | 2 +-
...sumeStrategy.java => FileSetResumeAdapter.java} | 7 +-
.../file/consumer/GenericFileResumable.java | 2 +-
...Strategy.java => GenericFileResumeAdapter.java} | 15 +-
.../adapters/DefaultFileSetResumeAdapter.java | 65 ++++++
.../adapters/DefaultGenericFileResumeAdapter.java | 64 ++++++
.../camel/component/kafka/KafkaConsumer.java | 14 +-
.../camel/component/kafka/KafkaFetchRecords.java | 4 +-
.../errorhandler/KafkaConsumerListener.java | 2 +-
...rategy.java => KafkaConsumerResumeAdapter.java} | 33 +--
.../kafka/consumer/support/KafkaOffset.java | 2 +-
.../kafka/consumer/support/KafkaResumable.java | 4 +-
....java => OffsetKafkaConsumerResumeAdapter.java} | 21 +-
.../support/PartitionAssignmentListener.java | 11 +-
.../consumer/support/ResumeStrategyFactory.java | 30 ++-
...a => SeekPolicyKafkaConsumerResumeAdapter.java} | 20 +-
.../resume/kafka/KafkaResumeStrategy.java | 22 +-
.../resume/kafka/MultiNodeKafkaResumeStrategy.java | 152 ++++++++++++++
.../camel/processor/resume/kafka/RecordError.java} | 31 ++-
...egy.java => SingleNodeKafkaResumeStrategy.java} | 227 ++++++++++++++-------
.../KafkaConsumerWithResumeRouteStrategyIT.java | 99 ++++-----
.../camel/component/master/MasterConsumer.java | 4 +-
.../src/main/java/org/apache/camel/Route.java | 2 +
.../camel/{ => resume}/ConsumerListener.java | 2 +-
.../camel/{ => resume}/ConsumerListenerAware.java | 2 +-
.../java/org/apache/camel/{ => resume}/Offset.java | 2 +-
.../org/apache/camel/{ => resume}/Resumable.java | 2 +-
.../apache/camel/{ => resume}/ResumableSet.java | 2 +-
.../ResumeAdapter.java} | 16 +-
.../org/apache/camel/{ => resume}/ResumeAware.java | 2 +-
.../apache/camel/{ => resume}/ResumeStrategy.java | 23 ++-
.../UpdatableConsumerResumeStrategy.java | 2 +-
.../apache/camel/resume/cache/MultiEntryCache.java | 11 +-
.../camel/{ => resume/cache}/ResumeCache.java | 20 +-
.../cache/SingleEntryCache.java} | 21 +-
.../org/apache/camel/impl/engine/DefaultRoute.java | 8 +-
.../docs/modules/eips/pages/resume-strategies.adoc | 139 ++++---------
.../resources/org/apache/camel/model/pausable.json | 2 +-
.../org/apache/camel/model/resumable.json | 2 +-
.../org/apache/camel/model/PausableDefinition.java | 4 +-
.../apache/camel/model/ProcessorDefinition.java | 4 +-
.../apache/camel/model/ResumableDefinition.java | 4 +-
.../apache/camel/processor/PausableProcessor.java | 2 +-
.../processor/resume/DelegatingResumeAdapter.java | 181 ++++++++++++++++
.../processor/resume/DelegatingResumeStrategy.java | 102 ---------
.../processor/resume/ResumableCompletion.java | 6 +-
.../camel/processor/resume/ResumableProcessor.java | 9 +-
.../processor/resume/TransientResumeStrategy.java | 33 +--
.../org/apache/camel/reifier/PausableReifier.java | 2 +-
.../org/apache/camel/reifier/ResumableReifier.java | 2 +-
.../FileConsumerResumeFromOffsetStrategyTest.java | 27 ++-
.../file/FileConsumerResumeStrategyTest.java | 17 +-
.../main/java/org/apache/camel/resume/Offsets.java | 2 -
.../java/org/apache/camel/resume/Resumables.java | 3 -
80 files changed, 1304 insertions(+), 680 deletions(-)
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json
index 88b088805ab..f1526b5311b 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json
@@ -12,7 +12,7 @@
"output": false
},
"properties": {
- "consumerListener": { "kind": "attribute", "displayName": "Consumer Listener", "required": true, "type": "object", "javaType": "org.apache.camel.ConsumerListener", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the consumer listener to use" },
+ "consumerListener": { "kind": "attribute", "displayName": "Consumer Listener", "required": true, "type": "object", "javaType": "org.apache.camel.resume.ConsumerListener", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the consumer listener to use" },
"untilCheck": { "kind": "attribute", "displayName": "Until Check", "required": true, "type": "object", "javaType": "java.util.function.Predicate", "deprecated": false, "autowired": false, "secret": false },
"id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
"description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json
index acbea228865..03a0a440e46 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json
@@ -12,7 +12,7 @@
"output": false
},
"properties": {
- "resumeStrategy": { "kind": "attribute", "displayName": "Resume Strategy", "required": true, "type": "object", "javaType": "org.apache.camel.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the resume strategy to use" },
+ "resumeStrategy": { "kind": "attribute", "displayName": "Resume Strategy", "required": true, "type": "object", "javaType": "org.apache.camel.resume.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the resume strategy to use" },
"id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
"description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
}
diff --git a/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java b/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java
index b477e1bb372..b0a4dd6e5a3 100644
--- a/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java
+++ b/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java
@@ -16,14 +16,14 @@
*/
package org.apache.camel.component.feed;
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
/**
* Filter used by the {@link org.apache.camel.component.feed.FeedEntryPollingConsumer} to filter entries from the feed.
*
* @param <E> entry type
*/
-public interface EntryFilter<E> extends ResumeStrategy {
+public interface EntryFilter<E> extends ResumeAdapter {
/**
* Tests to be used as filtering the feed for only entries of interest, such as only new entries, etc.
@@ -37,14 +37,4 @@ public interface EntryFilter<E> extends ResumeStrategy {
default void resume() {
// NO-OP by default. Implementations can implement more complex behaviors if needed
}
-
- @Override
- default void start() {
- // NO-OP
- }
-
- @Override
- default void stop() {
- // NO-OP
- }
}
diff --git a/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java b/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
index d8bcfcf255e..035e7ea4f15 100644
--- a/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
+++ b/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
@@ -20,14 +20,16 @@ import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
/**
* Consumer to poll feeds and return each entry from the feed step by step.
*/
-public abstract class FeedEntryPollingConsumer<E> extends FeedPollingConsumer implements ResumeAware<EntryFilter<E>> {
+public abstract class FeedEntryPollingConsumer<E> extends FeedPollingConsumer implements ResumeAware<ResumeStrategy> {
protected int entryIndex;
- protected EntryFilter<E> entryFilter;
+ protected ResumeStrategy resumeStrategy;
@SuppressWarnings("rawtypes")
protected List<E> list;
protected boolean throttleEntries;
@@ -52,8 +54,12 @@ public abstract class FeedEntryPollingConsumer<E> extends FeedPollingConsumer im
polledMessages++;
boolean valid = true;
- if (entryFilter != null) {
- valid = entryFilter.isValidEntry(entry);
+ if (resumeStrategy != null) {
+ ResumeAdapter adapter = resumeStrategy.getAdapter();
+
+ if (adapter instanceof EntryFilter) {
+ valid = ((EntryFilter) adapter).isValidEntry(entry);
+ }
}
if (valid) {
Exchange exchange = endpoint.createExchange(feed, entry);
@@ -73,13 +79,13 @@ public abstract class FeedEntryPollingConsumer<E> extends FeedPollingConsumer im
}
@Override
- public void setResumeStrategy(EntryFilter<E> resumeStrategy) {
- this.entryFilter = resumeStrategy;
+ public void setResumeStrategy(ResumeStrategy resumeStrategy) {
+ this.resumeStrategy = resumeStrategy;
}
@Override
- public EntryFilter<E> getResumeStrategy() {
- return entryFilter;
+ public ResumeStrategy getResumeStrategy() {
+ return resumeStrategy;
}
protected abstract void resetList();
diff --git a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerWithResumeTest.java b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerWithResumeTest.java
index b39e4d957c7..4c810f8a65e 100644
--- a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerWithResumeTest.java
+++ b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerWithResumeTest.java
@@ -22,6 +22,7 @@ import java.util.Date;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
@@ -57,7 +58,7 @@ public class AtomEntryPollingConsumerWithResumeTest extends CamelTestSupport {
return new RouteBuilder() {
public void configure() {
from("atom:file:src/test/data/feed.atom?splitEntries=true&delay=500")
- .resumable().resumeStrategy(new UpdatedDateFilter(getTestFilterDate()))
+ .resumable().resumeStrategy(new TransientResumeStrategy(new UpdatedDateFilter(getTestFilterDate())))
.routeId("WithResume")
.to("mock:result");
}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 1925b6ae57e..de0317df604 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -23,9 +23,10 @@ import java.util.Queue;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisUserConfigurationResumeStrategy;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisUserConfigurationResumeAdapter;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
@@ -41,13 +42,13 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
-public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements ResumeAware<KinesisResumeStrategy> {
+public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements ResumeAware<ResumeStrategy> {
private static final Logger LOG = LoggerFactory.getLogger(Kinesis2Consumer.class);
private String currentShardIterator;
private boolean isShardClosed;
- private KinesisResumeStrategy resumeStrategy;
+ private ResumeStrategy resumeStrategy;
public Kinesis2Consumer(Kinesis2Endpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -173,12 +174,15 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
}
private void resume(GetShardIteratorRequest.Builder req) {
- if (resumeStrategy == null) {
- resumeStrategy = new KinesisUserConfigurationResumeStrategy(getEndpoint().getConfiguration());
+ KinesisResumeAdapter adapter;
+ if (resumeStrategy != null) {
+ adapter = resumeStrategy.getAdapter(KinesisResumeAdapter.class);
+ } else {
+ adapter = new KinesisUserConfigurationResumeAdapter(getEndpoint().getConfiguration());
}
- resumeStrategy.setRequestBuilder(req);
- resumeStrategy.resume();
+ adapter.setRequestBuilder(req);
+ adapter.resume();
}
private Queue<Exchange> createExchanges(List<Record> records) {
@@ -203,12 +207,12 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
}
@Override
- public void setResumeStrategy(KinesisResumeStrategy resumeStrategy) {
+ public void setResumeStrategy(ResumeStrategy resumeStrategy) {
this.resumeStrategy = resumeStrategy;
}
@Override
- public KinesisResumeStrategy getResumeStrategy() {
+ public ResumeStrategy getResumeStrategy() {
return resumeStrategy;
}
}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java
similarity index 75%
rename from components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java
rename to components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java
index 0592bfbaa09..b4f26b437eb 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java
@@ -17,20 +17,17 @@
package org.apache.camel.component.aws2.kinesis.consumer;
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
-public interface KinesisResumeStrategy extends ResumeStrategy {
-
+/**
+ * The resume adapter for Kinesis
+ */
+public interface KinesisResumeAdapter extends ResumeAdapter {
+ /**
+ * Sets the shard iterator request builder that can be used to customize the call and set the exact resume point
+ *
+ * @param builder the builder instance
+ */
void setRequestBuilder(GetShardIteratorRequest.Builder builder);
-
- @Override
- default void start() {
-
- }
-
- @Override
- default void stop() {
-
- }
}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeAdapter.java
similarity index 91%
rename from components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java
rename to components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeAdapter.java
index 56334437770..5e47dfb817b 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeAdapter.java
@@ -21,11 +21,11 @@ import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-public class KinesisUserConfigurationResumeStrategy implements KinesisResumeStrategy {
+public class KinesisUserConfigurationResumeAdapter implements KinesisResumeAdapter {
private final Kinesis2Configuration configuration;
private GetShardIteratorRequest.Builder resumable;
- public KinesisUserConfigurationResumeStrategy(Kinesis2Configuration configuration) {
+ public KinesisUserConfigurationResumeAdapter(Kinesis2Configuration configuration) {
this.configuration = configuration;
}
diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/multi/CaffeineCache.java b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/multi/CaffeineCache.java
new file mode 100644
index 00000000000..e775be5d063
--- /dev/null
+++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/multi/CaffeineCache.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.caffeine.resume.multi;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.camel.resume.cache.MultiEntryCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A cache that can store multiple key/valued resumables where the value is a list containing multiple values
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public class CaffeineCache<K, V> implements MultiEntryCache<K, V> {
+ private static final Logger LOG = LoggerFactory.getLogger(CaffeineCache.class);
+
+ private final Cache<K, List<V>> cache;
+ private final long cacheSize;
+
+ /**
+ * Builds a new instance of this object with the given cache size
+ *
+ * @param cacheSize the size of the cache
+ */
+ public CaffeineCache(long cacheSize) {
+ this(Caffeine.newBuilder().maximumSize(cacheSize).build(), cacheSize);
+ }
+
+ /**
+ * Builds a new instance of this object
+ *
+ * @param cache an instance of a pre-constructed cache object
+ * @param cacheSize the size of the pre-constructed cache object
+ */
+ public CaffeineCache(Cache<K, List<V>> cache, long cacheSize) {
+ this.cache = cache;
+ this.cacheSize = cacheSize;
+ }
+
+ @Override
+ public long capacity() {
+ return cacheSize;
+ }
+
+ @Override
+ public synchronized void add(K key, V offsetValue) {
+ LOG.trace("Adding entry to the cache (k/v): {}/{}", key, offsetValue);
+ LOG.trace("Adding entry to the cache (k/v) with types: {}/{}", key.getClass(), offsetValue.getClass());
+ List<V> entries = cache.get(key, k -> new ArrayList<>());
+
+ entries.add(offsetValue);
+ }
+
+ @Override
+ public synchronized boolean contains(K key, V entry) {
+ final List<V> entries = cache.getIfPresent(key);
+
+ if (entries == null) {
+ return false;
+ }
+
+ boolean ret = entries.contains(entry);
+ LOG.trace("Checking if cache contains key {} with value {} ({})", key, entry, ret);
+
+ return ret;
+ }
+
+ @Override
+ public boolean isFull() {
+ if (cache.estimatedSize() >= cacheSize) {
+ return true;
+ }
+
+ return false;
+ }
+
+}
diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/single/CaffeineCache.java b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/single/CaffeineCache.java
new file mode 100644
index 00000000000..d551418cf38
--- /dev/null
+++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/single/CaffeineCache.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.caffeine.resume.single;
+
+import java.util.Optional;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.camel.resume.cache.SingleEntryCache;
+
+/**
+ * This is a simple cache implementation that uses Caffeine to store the resume offsets
+ *
+ * @param <K> The type of the key to cache
+ * @param <V> The type of the value/entry to cache
+ */
+public class CaffeineCache<K, V> implements SingleEntryCache<K, V> {
+ private final Cache<K, V> cache;
+ private final long cacheSize;
+
+ /**
+ * Builds a new cache with the given cache size
+ *
+ * @param cacheSize the cache size
+ */
+ public CaffeineCache(long cacheSize) {
+ this(Caffeine.newBuilder().maximumSize(cacheSize).build(), cacheSize);
+ }
+
+ /**
+ * Builds a new instance of this object
+ *
+ * @param cache an instance of a pre-constructed cache object
+ * @param cacheSize the size of the pre-constructed cache object
+ */
+ public CaffeineCache(Cache<K, V> cache, long cacheSize) {
+ this.cache = cache;
+ this.cacheSize = cacheSize;
+ }
+
+ @Override
+ public boolean contains(K key, V entry) {
+ assert key != null;
+ V cachedEntry = cache.getIfPresent(key);
+
+ return entry.equals(cachedEntry);
+ }
+
+ @Override
+ public void add(K key, V offsetValue) {
+ cache.put(key, offsetValue);
+ }
+
+ @Override
+ public Optional<V> get(K key) {
+ V entry = cache.getIfPresent(key);
+
+ if (entry == null) {
+ return Optional.empty();
+ }
+
+ return Optional.of(entry);
+ }
+
+ @Override
+ public boolean isFull() {
+ if (cache.estimatedSize() >= cacheSize) {
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public long capacity() {
+ return cacheSize;
+ }
+}
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
index 659bc4cec9c..f3bb3bf971e 100644
--- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
@@ -22,21 +22,22 @@ import com.datastax.oss.driver.api.core.cql.ResultSet;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.component.cassandra.consumer.support.CassandraResumeStrategy;
+import org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.support.ScheduledPollConsumer;
/**
* Cassandra 2 CQL3 consumer.
*/
-public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAware<CassandraResumeStrategy> {
+public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAware<ResumeStrategy> {
/**
* Prepared statement used for polling
*/
private PreparedStatement preparedStatement;
- private CassandraResumeStrategy resumeStrategy;
+ private ResumeStrategy resumeStrategy;
public CassandraConsumer(CassandraEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -86,8 +87,11 @@ public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAw
if (resumeStrategy != null) {
CqlSession session = getEndpoint().getSessionHolder().getSession();
- resumeStrategy.setSession(session);
- resumeStrategy.resume();
+ CassandraResumeAdapter resumeAdapter = resumeStrategy.getAdapter(CassandraResumeAdapter.class);
+ if (resumeAdapter != null) {
+ resumeAdapter.setSession(session);
+ resumeAdapter.resume();
+ }
}
}
@@ -102,12 +106,12 @@ public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAw
}
@Override
- public CassandraResumeStrategy getResumeStrategy() {
+ public ResumeStrategy getResumeStrategy() {
return resumeStrategy;
}
@Override
- public void setResumeStrategy(CassandraResumeStrategy resumeStrategy) {
+ public void setResumeStrategy(ResumeStrategy resumeStrategy) {
this.resumeStrategy = resumeStrategy;
}
}
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeStrategy.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
similarity index 81%
rename from components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeStrategy.java
rename to components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
index 220242c7b2c..e8ccbed8efe 100644
--- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeStrategy.java
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
@@ -18,12 +18,12 @@
package org.apache.camel.component.cassandra.consumer.support;
import com.datastax.oss.driver.api.core.CqlSession;
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
/**
- * Provides a resume strategy for Cassandra consumers
+ * Provides a resume adapter for Cassandra consumers
*/
-public interface CassandraResumeStrategy extends ResumeStrategy {
+public interface CassandraResumeAdapter extends ResumeAdapter {
/**
* Sets the session that allow implementations to run a one-time query on the DB
@@ -31,14 +31,4 @@ public interface CassandraResumeStrategy extends ResumeStrategy {
* @param session
*/
void setSession(CqlSession session);
-
- @Override
- default void start() {
-
- }
-
- @Override
- default void stop() {
-
- }
}
diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
index dcff5c23842..b41a2a2ef33 100644
--- a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
+++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
@@ -24,15 +24,16 @@ import com.datastax.oss.driver.api.core.CqlSession;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.cassandra.consumer.support.CassandraResumeStrategy;
+import org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class CassandraComponentResumeStrategyIT extends BaseCassandra {
- private static class TestCassandraResumeStrategy implements CassandraResumeStrategy {
+ private static class TestCassandraResumeAdapter implements CassandraResumeAdapter {
private boolean sessionCalled;
private boolean sessionNotNull;
private boolean resumeCalled;
@@ -62,7 +63,7 @@ public class CassandraComponentResumeStrategyIT extends BaseCassandra {
}
private static final String CQL = "select login, first_name, last_name from camel_user";
- private final TestCassandraResumeStrategy resumeStrategy = new TestCassandraResumeStrategy();
+ private final TestCassandraResumeAdapter resumeStrategy = new TestCassandraResumeAdapter();
@Test
public void testConsumeAll() throws Exception {
@@ -88,7 +89,7 @@ public class CassandraComponentResumeStrategyIT extends BaseCassandra {
return new RouteBuilder() {
public void configure() {
fromF("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, CQL)
- .resumable(resumeStrategy)
+ .resumable(new TransientResumeStrategy(resumeStrategy))
.to("mock:resultAll");
}
};
diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
index 1ac3bbefb05..a1dc66deefd 100644
--- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
+++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
@@ -26,7 +26,8 @@ import com.couchbase.client.java.view.ViewResult;
import com.couchbase.client.java.view.ViewRow;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.support.DefaultScheduledPollConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +37,7 @@ import static org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_ID;
import static org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_KEY;
import static org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_VIEWNAME;
-public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements ResumeAware<CouchbaseResumeStrategy> {
+public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements ResumeAware<ResumeStrategy> {
private static final Logger LOG = LoggerFactory.getLogger(CouchbaseConsumer.class);
@@ -45,7 +46,7 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements R
private final Collection collection;
private ViewOptions viewOptions;
- private CouchbaseResumeStrategy resumeStrategy;
+ private ResumeStrategy resumeStrategy;
public CouchbaseConsumer(CouchbaseEndpoint endpoint, Bucket client, Processor processor) {
super(endpoint, processor);
@@ -96,9 +97,12 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements R
if (resumeStrategy != null) {
LOG.info("Couchbase consumer running with resume strategy enabled");
- resumeStrategy.setBucket(bucket);
- resumeStrategy.resume();
+ CouchbaseResumeAdapter resumeAdapter = resumeStrategy.getAdapter(CouchbaseResumeAdapter.class);
+ if (resumeAdapter != null) {
+ resumeAdapter.setBucket(bucket);
+ resumeAdapter.resume();
+ }
}
}
@@ -182,12 +186,12 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements R
}
@Override
- public CouchbaseResumeStrategy getResumeStrategy() {
+ public ResumeStrategy getResumeStrategy() {
return resumeStrategy;
}
@Override
- public void setResumeStrategy(CouchbaseResumeStrategy resumeStrategy) {
+ public void setResumeStrategy(ResumeStrategy resumeStrategy) {
this.resumeStrategy = resumeStrategy;
}
}
diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeAdapter.java
similarity index 79%
copy from components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java
copy to components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeAdapter.java
index 6e90080b8b8..b7e87ec6a47 100644
--- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java
+++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeAdapter.java
@@ -18,22 +18,12 @@
package org.apache.camel.component.couchbase;
import com.couchbase.client.java.Bucket;
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
/**
- * Allow implementing resume strategies for couchbase consumers
+ * Allow implementing resume adapters for couchbase consumers
*/
-public interface CouchbaseResumeStrategy extends ResumeStrategy {
-
- @Override
- default void start() {
-
- }
-
- @Override
- default void stop() {
-
- }
+public interface CouchbaseResumeAdapter extends ResumeAdapter {
/**
* Sets the bucket in use
diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
index ac7616a879e..20b03444564 100644
--- a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
+++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
@@ -22,8 +22,9 @@ import java.util.concurrent.TimeUnit;
import com.couchbase.client.java.Bucket;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.couchbase.CouchbaseResumeStrategy;
+import org.apache.camel.component.couchbase.CouchbaseResumeAdapter;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
import org.apache.camel.resume.Resumables;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -32,7 +33,7 @@ import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
public class ConsumeResumeStrategyIT extends CouchbaseIntegrationTestBase {
- static class TestResumeStrategy implements CouchbaseResumeStrategy {
+ static class TestCouchbaseResumeAdapter implements CouchbaseResumeAdapter {
volatile boolean setBucketCalled;
volatile boolean bucketNotNull;
volatile boolean resumeCalled;
@@ -49,7 +50,7 @@ public class ConsumeResumeStrategyIT extends CouchbaseIntegrationTestBase {
}
}
- TestResumeStrategy resumeStrategy = new TestResumeStrategy();
+ TransientResumeStrategy resumeStrategy = new TransientResumeStrategy(new TestCouchbaseResumeAdapter());
@Test
public void testQueryForBeers() throws Exception {
@@ -61,15 +62,18 @@ public class ConsumeResumeStrategyIT extends CouchbaseIntegrationTestBase {
assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+ TestCouchbaseResumeAdapter adapter = resumeStrategy.getAdapter(TestCouchbaseResumeAdapter.class);
+ await().atMost(30, TimeUnit.SECONDS).until(() -> adapter != null);
+
await().atMost(30, TimeUnit.SECONDS)
- .untilAsserted(() -> Assertions.assertTrue(resumeStrategy.setBucketCalled,
+ .untilAsserted(() -> Assertions.assertTrue(adapter.setBucketCalled,
"The setBucket method should have been called"));
await().atMost(3, TimeUnit.SECONDS)
- .untilAsserted(() -> Assertions.assertTrue(resumeStrategy.bucketNotNull,
+ .untilAsserted(() -> Assertions.assertTrue(adapter.bucketNotNull,
"The input bucket should not have been null"));
await().atMost(3, TimeUnit.SECONDS)
.untilAsserted(
- () -> Assertions.assertTrue(resumeStrategy.resumeCalled, "The resume method should have been called"));
+ () -> Assertions.assertTrue(adapter.resumeCalled, "The resume method should have been called"));
}
@AfterEach
diff --git a/components/camel-couchdb/pom.xml b/components/camel-couchdb/pom.xml
index b83914157d7..0372ec29c29 100644
--- a/components/camel-couchdb/pom.xml
+++ b/components/camel-couchdb/pom.xml
@@ -39,6 +39,10 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-support</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core-processor</artifactId>
+ </dependency>
<dependency>
<groupId>org.lightcouch</groupId>
<artifactId>lightcouch</artifactId>
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
index 58a9a70d6ee..3b239cea610 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
@@ -21,8 +21,9 @@ import java.time.Duration;
import com.google.gson.JsonObject;
import org.apache.camel.Exchange;
import org.apache.camel.component.couchdb.consumer.CouchDbResumable;
-import org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy;
+import org.apache.camel.component.couchdb.consumer.CouchDbResumeAdapter;
import org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategyFactory;
+import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.support.task.BlockingTask;
import org.apache.camel.support.task.Tasks;
import org.apache.camel.support.task.budget.Budgets;
@@ -53,10 +54,16 @@ public class CouchDbChangesetTracker implements Runnable {
CouchDbResumable resumable = new CouchDbResumable(couchClient, sequence);
if (sequence == null) {
- CouchDbResumeStrategy resumeStrategy = CouchDbResumeStrategyFactory.newResumeStrategy(this.consumer);
+ ResumeStrategy resumeStrategy = CouchDbResumeStrategyFactory.newResumeStrategy(this.consumer);
- resumeStrategy.setResumable(resumable);
- resumeStrategy.resume();
+ assert resumeStrategy != null;
+
+ CouchDbResumeAdapter adapter = resumeStrategy.getAdapter(CouchDbResumeAdapter.class);
+
+ if (adapter != null) {
+ adapter.setResumable(resumable);
+ adapter.resume();
+ }
}
LOG.debug("Last sequence [{}]", resumable.getLastOffset());
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
index 615823c23ad..4512004fff1 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
@@ -21,17 +21,17 @@ import java.util.concurrent.ExecutorService;
import com.google.gson.JsonObject;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.support.DefaultConsumer;
-public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<CouchDbResumeStrategy> {
+public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<ResumeStrategy> {
private final CouchDbClientWrapper couchClient;
private final CouchDbEndpoint endpoint;
private ExecutorService executor;
private CouchDbChangesetTracker task;
- private CouchDbResumeStrategy resumeStrategy;
+ private ResumeStrategy resumeStrategy;
public CouchDbConsumer(CouchDbEndpoint endpoint, CouchDbClientWrapper couchClient, Processor processor) {
super(endpoint, processor);
@@ -40,12 +40,12 @@ public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<Couc
}
@Override
- public void setResumeStrategy(CouchDbResumeStrategy resumeStrategy) {
+ public void setResumeStrategy(ResumeStrategy resumeStrategy) {
this.resumeStrategy = resumeStrategy;
}
@Override
- public CouchDbResumeStrategy getResumeStrategy() {
+ public ResumeStrategy getResumeStrategy() {
return resumeStrategy;
}
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
index e6ebd10c7c8..d7a7e91311e 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
@@ -17,10 +17,10 @@
package org.apache.camel.component.couchdb.consumer;
-import org.apache.camel.Offset;
-import org.apache.camel.Resumable;
import org.apache.camel.component.couchdb.CouchDbClientWrapper;
+import org.apache.camel.resume.Offset;
import org.apache.camel.resume.Offsets;
+import org.apache.camel.resume.Resumable;
/**
* Wraps the resume data for CouchDb
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
similarity index 76%
rename from components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java
rename to components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
index 3a92a042e6d..f66a2ccd5a8 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
@@ -17,21 +17,16 @@
package org.apache.camel.component.couchdb.consumer;
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
/**
- * Defines a resumable strategy usable by the CouchDB component
+ * Defines a resumable adapter usable by the CouchDB component
*/
-public interface CouchDbResumeStrategy extends ResumeStrategy {
+public interface CouchDbResumeAdapter extends ResumeAdapter {
+ /**
+ * Sets the resumable for the adapter
+ *
+ * @param resumable the resumable instance
+ */
void setResumable(CouchDbResumable resumable);
-
- @Override
- default void start() {
-
- }
-
- @Override
- default void stop() {
-
- }
}
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java
index ce0196f0b15..b8515b1ad6a 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java
@@ -18,16 +18,18 @@
package org.apache.camel.component.couchdb.consumer;
import org.apache.camel.component.couchdb.CouchDbConsumer;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
+import org.apache.camel.resume.ResumeStrategy;
public final class CouchDbResumeStrategyFactory {
private CouchDbResumeStrategyFactory() {
}
- public static CouchDbResumeStrategy newResumeStrategy(CouchDbConsumer consumer) {
- CouchDbResumeStrategy resumeStrategy = consumer.getResumeStrategy();
+ public static ResumeStrategy newResumeStrategy(CouchDbConsumer consumer) {
+ ResumeStrategy resumeStrategy = consumer.getResumeStrategy();
if (resumeStrategy == null) {
- resumeStrategy = new LatestUpdateSequenceResumeStrategy();
+ resumeStrategy = new TransientResumeStrategy(new LatestUpdateSequenceResumeAdapter());
}
return resumeStrategy;
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeAdapter.java
similarity index 88%
rename from components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java
rename to components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeAdapter.java
index 277d84243e7..e9815b9171a 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeAdapter.java
@@ -20,9 +20,9 @@ package org.apache.camel.component.couchdb.consumer;
import org.apache.camel.component.couchdb.CouchDbClientWrapper;
/**
- * A resume strategy that resumes from the last update sequence
+ * A resume adapter for couchdb that resumes from the last update sequence
*/
-public final class LatestUpdateSequenceResumeStrategy implements CouchDbResumeStrategy {
+public final class LatestUpdateSequenceResumeAdapter implements CouchDbResumeAdapter {
private CouchDbResumable resumable;
@Override
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
index 6dedb43cd18..9d730a528a4 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -31,11 +31,13 @@ import java.util.Set;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.component.file.consumer.FileConsumerResumeStrategy;
+import org.apache.camel.component.file.consumer.FileResumeAdapter;
import org.apache.camel.component.file.consumer.FileResumeSet;
-import org.apache.camel.component.file.consumer.FileSetResumeStrategy;
-import org.apache.camel.component.file.consumer.GenericFileResumeStrategy;
+import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
+import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.util.FileUtil;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
@@ -44,10 +46,10 @@ import org.slf4j.LoggerFactory;
/**
* File consumer.
*/
-public class FileConsumer extends GenericFileConsumer<File> implements ResumeAware<FileConsumerResumeStrategy> {
+public class FileConsumer extends GenericFileConsumer<File> implements ResumeAware<ResumeStrategy> {
private static final Logger LOG = LoggerFactory.getLogger(FileConsumer.class);
- private FileConsumerResumeStrategy resumeStrategy;
+ private ResumeStrategy resumeStrategy;
private String endpointPath;
private Set<String> extendedAttributes;
@@ -103,8 +105,11 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa
GenericFile<File> gf
= asGenericFile(endpointPath, file, getEndpoint().getCharset(), getEndpoint().isProbeContentType());
- if (resumeStrategy instanceof GenericFileResumeStrategy) {
- ((GenericFileResumeStrategy<File>) resumeStrategy).resume(gf);
+ if (resumeStrategy != null) {
+ ResumeAdapter adapter = resumeStrategy.getAdapter();
+ if (adapter instanceof GenericFileResumeAdapter) {
+ ((FileResumeAdapter) adapter).resume(gf);
+ }
}
if (file.isDirectory()) {
@@ -171,11 +176,15 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa
}
}
- if (resumeStrategy instanceof FileSetResumeStrategy) {
- FileResumeSet resumeSet = new FileResumeSet(dirFiles);
- resumeStrategy.resume(resumeSet);
+ if (resumeStrategy != null) {
+ ResumeAdapter adapter = resumeStrategy.getAdapter();
+ if (adapter instanceof FileSetResumeAdapter) {
+ FileResumeSet resumeSet = new FileResumeSet(dirFiles);
+
+ ((FileResumeAdapter) adapter).resume(resumeSet);
- return resumeSet.resumed();
+ return resumeSet.resumed();
+ }
}
return dirFiles;
@@ -307,12 +316,12 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa
}
@Override
- public FileConsumerResumeStrategy getResumeStrategy() {
+ public ResumeStrategy getResumeStrategy() {
return resumeStrategy;
}
@Override
- public void setResumeStrategy(FileConsumerResumeStrategy resumeStrategy) {
+ public void setResumeStrategy(ResumeStrategy resumeStrategy) {
this.resumeStrategy = resumeStrategy;
}
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
index c04a0e9ecc3..c1fcc9de5f9 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
@@ -22,9 +22,9 @@ import java.nio.file.Path;
import java.util.Map;
import org.apache.camel.Exchange;
-import org.apache.camel.Offset;
import org.apache.camel.WrappedFile;
import org.apache.camel.component.file.consumer.GenericFileResumable;
+import org.apache.camel.resume.Offset;
import org.apache.camel.resume.Offsets;
import org.apache.camel.util.FileUtil;
import org.apache.camel.util.ObjectHelper;
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
similarity index 85%
copy from components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java
copy to components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
index fed8000c512..25b807cbf40 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
@@ -17,12 +17,12 @@
package org.apache.camel.component.file.consumer;
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
/**
- * Defines resume strategy for consumers of the file component.
+ * Defines resume adapter for consumers of the file component.
*/
-public interface FileConsumerResumeStrategy<T> extends ResumeStrategy {
+public interface FileResumeAdapter<T> extends ResumeAdapter {
/**
* Returns the last offset read for the given file.
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
index f746c0f9fa3..41b1751223c 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
@@ -21,7 +21,7 @@ import java.io.File;
import java.util.Objects;
import java.util.function.Predicate;
-import org.apache.camel.ResumableSet;
+import org.apache.camel.resume.ResumableSet;
/**
* This contains the input/output file set for resume operations.
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeAdapter.java
similarity index 79%
rename from components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeStrategy.java
rename to components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeAdapter.java
index dc75e80bd89..2e1f5e56477 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeStrategy.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeAdapter.java
@@ -17,6 +17,9 @@
package org.apache.camel.component.file.consumer;
-public interface FileSetResumeStrategy extends FileConsumerResumeStrategy<FileResumeSet> {
-
+/**
+ * Allows the implementation of file adapters for handling resume operations for file sets (i.e.: file entries in a
+ * directory)
+ */
+public interface FileSetResumeAdapter extends FileResumeAdapter<FileResumeSet> {
}
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java
index 037621d751f..79899514a54 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java
@@ -17,7 +17,7 @@
package org.apache.camel.component.file.consumer;
-import org.apache.camel.Resumable;
+import org.apache.camel.resume.Resumable;
public interface GenericFileResumable<T> extends Resumable<T, Long> {
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeAdapter.java
similarity index 66%
rename from components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java
rename to components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeAdapter.java
index fed8000c512..8f7c1f2900c 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeAdapter.java
@@ -17,17 +17,18 @@
package org.apache.camel.component.file.consumer;
-import org.apache.camel.ResumeStrategy;
+import java.io.File;
+import java.util.Optional;
/**
- * Defines resume strategy for consumers of the file component.
+ * Allows the implementation of file adapters for handling resume operations for generic files
*/
-public interface FileConsumerResumeStrategy<T> extends ResumeStrategy {
-
+public interface GenericFileResumeAdapter extends FileResumeAdapter<GenericFileResumable<File>> {
/**
- * Returns the last offset read for the given file.
+ * Gets the last offset for the given file
*
- * @param resumable the resumable file or resumable set to run the resume
+ * @param addressable the file instance
+ * @return An Optional with the offset value
*/
- void resume(T resumable);
+ Optional<Long> getLastOffset(File addressable);
}
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileSetResumeAdapter.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileSetResumeAdapter.java
new file mode 100644
index 00000000000..9a1f7f49f76
--- /dev/null
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileSetResumeAdapter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.file.consumer.adapters;
+
+import java.io.File;
+
+import org.apache.camel.component.file.consumer.FileResumeSet;
+import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
+import org.apache.camel.resume.cache.MultiEntryCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the {@link FileSetResumeAdapter} that can be used for resume operations for multiple files. For
+ * instance, this can be used to manage the resume operations for files within a directory.
+ */
+public class DefaultFileSetResumeAdapter implements FileSetResumeAdapter {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultFileSetResumeAdapter.class);
+
+ private final MultiEntryCache<File, File> cache;
+
+ public DefaultFileSetResumeAdapter(MultiEntryCache<File, File> cache) {
+ this.cache = cache;
+ }
+
+ private boolean notProcessed(File file) {
+ File key = file.getParentFile();
+
+ // if the file is in the cache, then it's already processed
+ boolean ret = !cache.contains(key, file);
+ return ret;
+ }
+
+ @Override
+ public void resume(FileResumeSet resumable) {
+ if (resumable != null) {
+ resumable.resumeEach(this::notProcessed);
+ if (resumable.hasResumables()) {
+ LOG.debug("There's {} files to still to be processed", resumable.resumed().length);
+ }
+ } else {
+ LOG.trace("Nothing to resume");
+ }
+ }
+
+ @Override
+ public void resume() {
+ // NO-OP
+ }
+}
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultGenericFileResumeAdapter.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultGenericFileResumeAdapter.java
new file mode 100644
index 00000000000..54e339fe73e
--- /dev/null
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultGenericFileResumeAdapter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.file.consumer.adapters;
+
+import java.io.File;
+import java.util.Optional;
+
+import org.apache.camel.component.file.consumer.GenericFileResumable;
+import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
+import org.apache.camel.resume.cache.SingleEntryCache;
+
+/**
+ * An implementation of the {@link GenericFileResumeAdapter} that can be used to handle resume operations for file
+ * offsets (where the offsets are of Long format).
+ */
+public class DefaultGenericFileResumeAdapter implements GenericFileResumeAdapter {
+ private final SingleEntryCache<File, Long> cache;
+
+ public DefaultGenericFileResumeAdapter(SingleEntryCache<File, Long> cache) {
+ this.cache = cache;
+ }
+
+ private Optional<Long> getLastOffset(GenericFileResumable<File> resumable) {
+ final File addressable = resumable.getAddressable();
+ return cache.get(addressable);
+ }
+
+ @Override
+ public Optional<Long> getLastOffset(File addressable) {
+ return cache.get(addressable);
+ }
+
+ @Override
+ public void resume(GenericFileResumable<File> resumable) {
+ final Optional<Long> lastOffsetOpt = getLastOffset(resumable);
+
+ if (!lastOffsetOpt.isPresent()) {
+ return;
+ }
+
+ final long lastOffset = lastOffsetOpt.get();
+ resumable.updateLastOffset(lastOffset);
+ }
+
+ @Override
+ public void resume() {
+
+ }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index f864cf272b6..3a6d2860329 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -25,14 +25,14 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import org.apache.camel.ConsumerListenerAware;
import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
import org.apache.camel.Suspendable;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
-import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
import org.apache.camel.health.HealthCheckAware;
import org.apache.camel.health.HealthCheckHelper;
+import org.apache.camel.resume.ConsumerListenerAware;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.spi.StateRepository;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.support.DefaultConsumer;
@@ -44,7 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaConsumer extends DefaultConsumer
- implements ResumeAware<KafkaConsumerResumeStrategy>, HealthCheckAware, ConsumerListenerAware<KafkaConsumerListener>,
+ implements ResumeAware<ResumeStrategy>, HealthCheckAware, ConsumerListenerAware<KafkaConsumerListener>,
Suspendable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
@@ -56,7 +56,7 @@ public class KafkaConsumer extends DefaultConsumer
// This list helps to work around the infinite loop of KAFKA-1894
private final List<KafkaFetchRecords> tasks = new ArrayList<>();
private volatile boolean stopOffsetRepo;
- private KafkaConsumerResumeStrategy resumeStrategy;
+ private ResumeStrategy resumeStrategy;
private KafkaConsumerListener consumerListener;
public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
@@ -65,12 +65,12 @@ public class KafkaConsumer extends DefaultConsumer
}
@Override
- public void setResumeStrategy(KafkaConsumerResumeStrategy resumeStrategy) {
+ public void setResumeStrategy(ResumeStrategy resumeStrategy) {
this.resumeStrategy = resumeStrategy;
}
@Override
- public KafkaConsumerResumeStrategy getResumeStrategy() {
+ public ResumeStrategy getResumeStrategy() {
return resumeStrategy;
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index b1befdfa5c3..f6f7392a6ac 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -27,7 +27,7 @@ import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.CommitManagers;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaErrorStrategies;
-import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
+import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade;
import org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
@@ -273,7 +273,7 @@ public class KafkaFetchRecords implements Runnable {
}
private void subscribe() {
- KafkaConsumerResumeStrategy resumeStrategy = ResumeStrategyFactory.newResumeStrategy(kafkaConsumer);
+ KafkaConsumerResumeAdapter resumeStrategy = ResumeStrategyFactory.resolveResumeAdapter(kafkaConsumer);
resumeStrategy.setConsumer(consumer);
PartitionAssignmentListener listener = new PartitionAssignmentListener(
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
index 44f9f6337ed..58211ba86a0 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
@@ -19,9 +19,9 @@ package org.apache.camel.component.kafka.consumer.errorhandler;
import java.util.function.Predicate;
-import org.apache.camel.ConsumerListener;
import org.apache.camel.component.kafka.SeekPolicy;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
+import org.apache.camel.resume.ConsumerListener;
import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java
similarity index 65%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java
index 97f25e0745d..d25c05662cb 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java
@@ -16,31 +16,32 @@
*/
package org.apache.camel.component.kafka.consumer.support;
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
import org.apache.kafka.clients.consumer.Consumer;
/**
- * Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume
+ * Defines a adapters for handling resume operations. Implementations can define different ways to handle how to resume
* processing records.
*
* The resume runs in the scope of the Kafka Consumer thread and may run concurrently with other consumer instances when
* the component is set up to use more than one of them. As such, implementations are responsible for ensuring the
* thread-safety of the operations within the resume method.
*/
-public interface KafkaConsumerResumeStrategy extends ResumeStrategy {
+public interface KafkaConsumerResumeAdapter extends ResumeAdapter {
+
+ /**
+ * Sets the Kafka consumer instance for the adapter. Please note that the Kafka consumer is not safe for concurrent
+ * access
+ *
+ * @param consumer the consumer instance
+ */
void setConsumer(Consumer<?, ?> consumer);
- default void resume(KafkaResumable resumable) {
-
- }
-
- @Override
- default void start() {
-
- }
-
- @Override
- default void stop() {
-
- }
+ /**
+ * Sets an optional resumable instance for the adapter. This is usually set during partition assignment. Garanteed
+ * not to be null and safe to ignore if partition and topic information are not used.
+ *
+ * @param kafkaResumable the resumable instance
+ */
+ void setKafkaResumable(KafkaResumable kafkaResumable);
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java
index 8a86b658b4d..11a0879a59c 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java
@@ -17,7 +17,7 @@
package org.apache.camel.component.kafka.consumer.support;
-import org.apache.camel.Offset;
+import org.apache.camel.resume.Offset;
import org.apache.camel.util.KeyValueHolder;
/**
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
index 388f47fa3e5..03bc3a977b1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
@@ -17,9 +17,9 @@
package org.apache.camel.component.kafka.consumer.support;
-import org.apache.camel.Offset;
-import org.apache.camel.Resumable;
+import org.apache.camel.resume.Offset;
import org.apache.camel.resume.Offsets;
+import org.apache.camel.resume.Resumable;
public class KafkaResumable implements Resumable<String, String> {
private final String partition;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java
similarity index 82%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java
index 4502831f8fb..3ef4f209c43 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java
@@ -30,14 +30,14 @@ import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProce
/**
* A resume strategy that uses Kafka's offset for resuming
*/
-public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
+public class OffsetKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
- private static final Logger LOG = LoggerFactory.getLogger(OffsetKafkaConsumerResumeStrategy.class);
+ private static final Logger LOG = LoggerFactory.getLogger(OffsetKafkaConsumerResumeAdapter.class);
private final StateRepository<String, String> offsetRepository;
private Consumer<?, ?> consumer;
- public OffsetKafkaConsumerResumeStrategy(StateRepository<String, String> offsetRepository) {
+ public OffsetKafkaConsumerResumeAdapter(StateRepository<String, String> offsetRepository) {
this.offsetRepository = offsetRepository;
}
@@ -46,6 +46,11 @@ public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStr
this.consumer = consumer;
}
+ @Override
+ public void setKafkaResumable(KafkaResumable kafkaResumable) {
+ // NO-OP
+ }
+
private void resumeFromOffset(final Consumer<?, ?> consumer, TopicPartition topicPartition, String offsetState) {
// The state contains the last read offset, so you need to seek from the next one
long offset = deserializeOffsetValue(offsetState) + 1;
@@ -63,14 +68,4 @@ public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStr
}
}
}
-
- /*
- Note: when self-managing the offsets, we don't need to use the information on the resumable
- instance. We can collect the assignments directly from the consumer instance as we always did.
- */
- @SuppressWarnings("unused")
- @Override
- public void resume(KafkaResumable resumable) {
- resume();
- }
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index e471d1e2ea6..03046f7a10b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -33,12 +33,12 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
private final String threadId;
private final KafkaConfiguration configuration;
- private final KafkaConsumerResumeStrategy resumeStrategy;
+ private final KafkaConsumerResumeAdapter resumeStrategy;
private final CommitManager commitManager;
public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration,
CommitManager commitManager,
- KafkaConsumerResumeStrategy resumeStrategy) {
+ KafkaConsumerResumeAdapter resumeStrategy) {
this.threadId = threadId;
this.configuration = configuration;
this.commitManager = commitManager;
@@ -67,6 +67,11 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
List<KafkaResumable> resumables = partitions.stream()
.map(p -> new KafkaResumable(String.valueOf(p.partition()), p.topic())).collect(Collectors.toList());
- resumables.forEach(r -> resumeStrategy.resume(r));
+ resumables.forEach(this::doResume);
+ }
+
+ private void doResume(KafkaResumable r) {
+ resumeStrategy.setKafkaResumable(r);
+ resumeStrategy.resume();
}
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index 3784b4c5c15..77315f2e85b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kafka.consumer.support;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.SeekPolicy;
+import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.spi.StateRepository;
import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
@@ -29,12 +30,17 @@ public final class ResumeStrategyFactory {
/**
* A NO-OP resume strategy that does nothing (i.e.: no resume)
*/
- private static class NoOpKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
+ private static class NoOpKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
@SuppressWarnings("unused")
@Override
public void setConsumer(Consumer<?, ?> consumer) {
+ // NO-OP
+ }
+ @Override
+ public void setKafkaResumable(KafkaResumable kafkaResumable) {
+ // NO-OP
}
@SuppressWarnings("unused")
@@ -44,36 +50,40 @@ public final class ResumeStrategyFactory {
}
}
- private static final NoOpKafkaConsumerResumeStrategy NO_OP_RESUME_STRATEGY = new NoOpKafkaConsumerResumeStrategy();
+ private static final NoOpKafkaConsumerResumeAdapter NO_OP_RESUME_STRATEGY = new NoOpKafkaConsumerResumeAdapter();
private static final Logger LOG = LoggerFactory.getLogger(ResumeStrategyFactory.class);
private ResumeStrategyFactory() {
}
- public static KafkaConsumerResumeStrategy newResumeStrategy(KafkaConsumer kafkaConsumer) {
+ public static KafkaConsumerResumeAdapter resolveResumeAdapter(KafkaConsumer kafkaConsumer) {
// When using resumable routes, which register the strategy via service, it takes priority over everything else
- KafkaConsumerResumeStrategy resumableRouteStrategy = kafkaConsumer.getResumeStrategy();
+ ResumeStrategy resumeStrategy = kafkaConsumer.getResumeStrategy();
+ if (resumeStrategy != null) {
+ KafkaConsumerResumeAdapter adapter = resumeStrategy.getAdapter(KafkaConsumerResumeAdapter.class);
+
+ // The strategy should not be able to be created without an adapter, but let's be safe
+ assert adapter != null;
- if (resumableRouteStrategy != null) {
- return resumableRouteStrategy;
+ return adapter;
}
KafkaConfiguration configuration = kafkaConsumer.getEndpoint().getConfiguration();
- return builtinResumeStrategies(configuration);
+ return resolveBuiltinResumeAdapters(configuration);
}
- private static KafkaConsumerResumeStrategy builtinResumeStrategies(KafkaConfiguration configuration) {
+ private static KafkaConsumerResumeAdapter resolveBuiltinResumeAdapters(KafkaConfiguration configuration) {
LOG.debug("No resume strategy was provided ... checking for built-ins ...");
StateRepository<String, String> offsetRepository = configuration.getOffsetRepository();
SeekPolicy seekTo = configuration.getSeekTo();
if (offsetRepository != null) {
LOG.info("Using resume from offset strategy");
- return new OffsetKafkaConsumerResumeStrategy(offsetRepository);
+ return new OffsetKafkaConsumerResumeAdapter(offsetRepository);
} else if (seekTo != null) {
LOG.info("Using resume from seek policy strategy with seeking from {}", seekTo);
- return new SeekPolicyKafkaConsumerResumeStrategy(seekTo);
+ return new SeekPolicyKafkaConsumerResumeAdapter(seekTo);
}
LOG.info("Using NO-OP resume strategy");
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java
similarity index 80%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java
index b877763854e..0dee4e1b849 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java
@@ -24,14 +24,14 @@ import org.slf4j.LoggerFactory;
/**
* A resume strategy that uses Camel's seekTo configuration for resuming
*/
-public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
+public class SeekPolicyKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
- private static final Logger LOG = LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeStrategy.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeAdapter.class);
private final SeekPolicy seekPolicy;
private Consumer<?, ?> consumer;
- public SeekPolicyKafkaConsumerResumeStrategy(SeekPolicy seekPolicy) {
+ public SeekPolicyKafkaConsumerResumeAdapter(SeekPolicy seekPolicy) {
this.seekPolicy = seekPolicy;
}
@@ -40,6 +40,11 @@ public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResum
this.consumer = consumer;
}
+ @Override
+ public void setKafkaResumable(KafkaResumable kafkaResumable) {
+ // NO-OP
+ }
+
@Override
public void resume() {
if (seekPolicy == SeekPolicy.BEGINNING) {
@@ -50,13 +55,4 @@ public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResum
consumer.seekToEnd(consumer.assignment());
}
}
-
- /*
- * Note: when using the seek policy strategy, we don't use the resumable information
- * because we use the consumer to set the policy.
- */
- @Override
- public void resume(KafkaResumable resumable) {
- resume();
- }
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/Offset.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
similarity index 63%
copy from core/camel-api/src/main/java/org/apache/camel/Offset.java
copy to components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
index 1e73d1dcc2a..a1614d4bfc2 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Offset.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
@@ -15,20 +15,18 @@
* limitations under the License.
*/
-package org.apache.camel;
+package org.apache.camel.processor.resume.kafka;
+
+import org.apache.camel.resume.Resumable;
+import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
/**
- * Generic offset without a concrete type
- *
- * @param <T> the type of the offset
+ * Base interface for resume strategies that publish the offsets to a Kafka topic
+ *
+ * @param <K> the type of key
+ * @param <V> the type of the value
*/
-public interface Offset<T> {
-
- /**
- * Gets the offset value
- *
- * @return the offset value
- */
- T offset();
+public interface KafkaResumeStrategy<K, V> extends UpdatableConsumerResumeStrategy<K, V, Resumable<K, V>>, ResumeStrategy {
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
new file mode 100644
index 00000000000..e9974cac4e8
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume.kafka;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.cache.ResumeCache;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for multi node
+ * integrations. This is suitable, for instance, when using clusters with the master component.
+ *
+ * @param <K> the type of key
+ * @param <V> the type of the value
+ */
+public class MultiNodeKafkaResumeStrategy<K, V> extends SingleNodeKafkaResumeStrategy<K, V> {
+ private static final Logger LOG = LoggerFactory.getLogger(MultiNodeKafkaResumeStrategy.class);
+ private final ExecutorService executorService;
+
+ /**
+ * Create a new instance of this class
+ *
+ * @param bootstrapServers the address of the Kafka broker
+ * @param topic the topic where to publish the offsets
+ * @param resumeCache a cache instance where to store the offsets locally for faster access
+ * @param resumeAdapter the component-specific resume adapter
+ */
+ public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache,
+ ResumeAdapter resumeAdapter) {
+ // just in case users don't want to provide their own worker thread pool
+ this(bootstrapServers, topic, resumeCache, resumeAdapter, Executors.newSingleThreadExecutor());
+ }
+
+ /**
+ * Builds an instance of this class
+ *
+ * @param bootstrapServers
+ * @param topic the topic where to publish the offsets
+ * @param resumeCache a cache instance where to store the offsets locally for faster access
+ * @param resumeAdapter the component-specific resume adapter
+ * @param executorService an executor service that will run a separate thread for periodically refreshing the
+ * offsets
+ */
+
+ public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache,
+ ResumeAdapter resumeAdapter, ExecutorService executorService) {
+ super(bootstrapServers, topic, resumeCache, resumeAdapter);
+
+ // We need to keep refreshing the cache
+ this.executorService = executorService;
+ executorService.submit(() -> refresh());
+ }
+
+ /**
+ * Builds an instance of this class
+ *
+ * @param topic the topic where to publish the offsets
+ * @param resumeCache a cache instance where to store the offsets locally for faster access
+ * @param resumeAdapter the component-specific resume adapter
+ * @param producerConfig the set of properties to be used by the Kafka producer within this class
+ * @param consumerConfig the set of properties to be used by the Kafka consumer within this class
+ */
+ public MultiNodeKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, ResumeAdapter resumeAdapter,
+ Properties producerConfig, Properties consumerConfig) {
+ this(topic, resumeCache, resumeAdapter, producerConfig, consumerConfig, Executors.newSingleThreadExecutor());
+ }
+
+ /**
+ * Builds an instance of this class
+ *
+ * @param topic the topic where to publish the offsets
+ * @param resumeCache a cache instance where to store the offsets locally for faster access
+ * @param resumeAdapter the component-specific resume adapter
+ * @param producerConfig the set of properties to be used by the Kafka producer within this class
+ * @param consumerConfig the set of properties to be used by the Kafka consumer within this class
+ * @param executorService an executor service that will run a separate thread for periodically refreshing the
+ * offsets
+ */
+
+ public MultiNodeKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, ResumeAdapter resumeAdapter,
+ Properties producerConfig, Properties consumerConfig, ExecutorService executorService) {
+ super(topic, resumeCache, resumeAdapter, producerConfig, consumerConfig);
+
+ this.executorService = executorService;
+ executorService.submit(() -> refresh());
+ }
+
+ /**
+ * Launch a thread to refresh the offsets periodically
+ */
+ protected void refresh() {
+ LOG.trace("Creating a offset cache refresher");
+ try {
+ Properties prop = (Properties) getConsumerConfig().clone();
+ prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+
+ Consumer<K, V> consumer = new KafkaConsumer<>(prop);
+
+ consumer.subscribe(Collections.singletonList(getTopic()));
+
+ while (true) {
+ var records = consumer.poll(getPollDuration());
+ if (records.isEmpty()) {
+ continue;
+ }
+
+ for (var record : records) {
+ V value = record.value();
+
+ LOG.trace("Read from Kafka: {}", value);
+ getResumeCache().add(record.key(), record.value());
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ executorService.shutdown();
+ } finally {
+ super.stop();
+ }
+ }
+}
diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/RecordError.java
similarity index 58%
rename from components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java
rename to components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/RecordError.java
index 6e90080b8b8..c0bfbb4f8a1 100644
--- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/RecordError.java
@@ -15,30 +15,27 @@
* limitations under the License.
*/
-package org.apache.camel.component.couchbase;
+package org.apache.camel.processor.resume.kafka;
-import com.couchbase.client.java.Bucket;
-import org.apache.camel.ResumeStrategy;
+import org.apache.kafka.clients.producer.RecordMetadata;
/**
- * Allow implementing resume strategies for couchbase consumers
+ * Contains the error details when failing to produce records
*/
-public interface CouchbaseResumeStrategy extends ResumeStrategy {
-
- @Override
- default void start() {
+public class RecordError {
+ private final RecordMetadata recordMetadata;
+ private final Exception exception;
+ public RecordError(RecordMetadata recordMetadata, Exception exception) {
+ this.recordMetadata = recordMetadata;
+ this.exception = exception;
}
- @Override
- default void stop() {
-
+ public RecordMetadata getRecordMetadata() {
+ return recordMetadata;
}
- /**
- * Sets the bucket in use
- *
- * @param bucket the bucket in use
- */
- void setBucket(Bucket bucket);
+ public Exception getException() {
+ return exception;
+ }
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
similarity index 55%
rename from components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
rename to components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 9960cecccf4..0a14bb481f6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -19,20 +19,20 @@ package org.apache.camel.processor.resume.kafka;
import java.io.IOException;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.Objects;
import java.util.Properties;
+import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import org.apache.camel.Resumable;
-import org.apache.camel.ResumeCache;
-import org.apache.camel.Service;
-import org.apache.camel.UpdatableConsumerResumeStrategy;
+import org.apache.camel.resume.Resumable;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.cache.ResumeCache;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -44,45 +44,64 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class AbstractKafkaResumeStrategy<K, V>
- implements UpdatableConsumerResumeStrategy<K, V, Resumable<K, V>>, Service {
- public static final int UNLIMITED = -1;
- private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaResumeStrategy.class);
+/**
+ * A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for single node
+ * integrations. For multi-node integrations (i.e: using clusters with the master component check
+ * {@link MultiNodeKafkaResumeStrategy}.
+ *
+ * @param <K> the type of key
+ * @param <V> the type of the value
+ */
+public class SingleNodeKafkaResumeStrategy<K, V> implements KafkaResumeStrategy<K, V> {
+ private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
private final String topic;
private Consumer<K, V> consumer;
private Producer<K, V> producer;
- private long errorCount;
private Duration pollDuration = Duration.ofSeconds(1);
- private final List<Future<RecordMetadata>> sentItems = new ArrayList<>();
+ private final Queue<RecordError> producerErrors = new ConcurrentLinkedQueue<>();
private final ResumeCache<K, V> resumeCache;
private boolean subscribed;
private final Properties producerConfig;
private final Properties consumerConfig;
+ private ResumeAdapter resumeAdapter;
- public AbstractKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache) {
- this.topic = topic;
-
- this.producerConfig = createProducer(bootstrapServers);
- this.consumerConfig = createConsumer(bootstrapServers);
- this.resumeCache = resumeCache;
-
- init();
+ /**
+ * Builds an instance of this class
+ *
+ * @param bootstrapServers the address of the Kafka broker
+ * @param topic the topic where to publish the offsets
+ * @param resumeCache a cache instance where to store the offsets locally for faster access
+ * @param resumeAdapter the component-specific resume adapter
+ */
+ public SingleNodeKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache,
+ ResumeAdapter resumeAdapter) {
+ this(topic, resumeCache, resumeAdapter, createProducer(bootstrapServers), createConsumer(bootstrapServers));
}
- public AbstractKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, Properties producerConfig,
- Properties consumerConfig) {
- this.topic = topic;
+ /**
+ * Builds an instance of this class
+ *
+ * @param topic the topic where to publish the offsets
+ * @param resumeCache a cache instance where to store the offsets locally for faster access
+ * @param resumeAdapter the component-specific resume adapter
+ * @param producerConfig the set of properties to be used by the Kafka producer within this class
+ * @param consumerConfig the set of properties to be used by the Kafka consumer within this class
+ */
+ public SingleNodeKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, ResumeAdapter resumeAdapter,
+ Properties producerConfig,
+ Properties consumerConfig) {
+ this.topic = ObjectHelper.notNull(topic, "The topic must not be null");
this.resumeCache = resumeCache;
+ this.resumeAdapter = resumeAdapter;
this.producerConfig = producerConfig;
this.consumerConfig = consumerConfig;
@@ -132,24 +151,24 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
}
/**
- * Sends data to a topic
- *
+ * Sends data to a topic. The records will always be sent asynchronously. If there's an error, a producer error
+ * counter will be increased.
+ *
+ * @see SingleNodeKafkaResumeStrategy#getProducerErrors()
* @param message the message to send
* @throws ExecutionException
* @throws InterruptedException
+ *
*/
- public void produce(K key, V message) throws ExecutionException, InterruptedException {
+ protected void produce(K key, V message) throws ExecutionException, InterruptedException {
ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, message);
- errorCount = 0;
- Future<RecordMetadata> future = producer.send(record, (recordMetadata, e) -> {
+ producer.send(record, (recordMetadata, e) -> {
if (e != null) {
LOG.error("Failed to send message {}", e.getMessage(), e);
- errorCount++;
+ producerErrors.add(new RecordError(recordMetadata, e));
}
});
-
- sentItems.add(future);
}
@Override
@@ -164,6 +183,11 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
resumeCache.add(key, offsetValue);
}
+ /**
+ * Loads the existing data into the cache
+ *
+ * @throws Exception
+ */
protected void loadCache() throws Exception {
subscribe();
@@ -192,11 +216,12 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
unsubscribe();
}
- // TODO: bad method ...
/**
+ * Subscribe to the topic if not subscribed yet
+ *
* @param topic the topic to consume the messages from
*/
- public void checkAndSubscribe(String topic) {
+ protected void checkAndSubscribe(String topic) {
if (!subscribed) {
consumer.subscribe(Collections.singletonList(topic));
@@ -205,40 +230,54 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
}
/**
- * @param topic the topic to consume the messages from
+ * Subscribe to the topic if not subscribed yet
+ *
+ * @param topic the topic to consume the messages from
+ * @param remaining the number of messages to rewind from the last offset position (used to fill the cache)
*/
public void checkAndSubscribe(String topic, long remaining) {
if (!subscribed) {
- consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> collection) {
-
- }
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> assignments) {
- consumer.seekToEnd(assignments);
- for (TopicPartition assignment : assignments) {
- final long endPosition = consumer.position(assignment);
- final long startPosition = endPosition - remaining;
-
- if (startPosition >= 0) {
- consumer.seek(assignment, startPosition);
- } else {
- LOG.info(
- "Ignoring the seek command because the initial offset is negative (the topic is likely empty)");
- }
- }
- }
- });
+ consumer.subscribe(Collections.singletonList(topic), getConsumerRebalanceListener(remaining));
subscribed = true;
}
}
- public abstract void subscribe() throws Exception;
+ /**
+ * Creates a new consumer rebalance listener. This can be useful for setting the exact Kafka offset when necessary
+ * to read a limited amount of messages or customize the resume strategy behavior when a rebalance occurs.
+ *
+ * @param remaining
+ * @return
+ */
+ protected ConsumerRebalanceListener getConsumerRebalanceListener(long remaining) {
+ return new ConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> collection) {
+
+ }
- public void unsubscribe() {
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> assignments) {
+ for (TopicPartition assignment : assignments) {
+ final long endPosition = consumer.position(assignment);
+ final long startPosition = endPosition - remaining;
+
+ if (startPosition >= 0) {
+ consumer.seek(assignment, startPosition);
+ } else {
+ LOG.info(
+ "Ignoring the seek command because the initial offset is negative (the topic is likely empty)");
+ }
+ }
+ }
+ };
+ }
+
+ /**
+ * Unsubscribe from the topic
+ */
+ protected void unsubscribe() {
try {
consumer.unsubscribe();
} catch (IllegalStateException e) {
@@ -249,17 +288,23 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
}
/**
- * Consumes message from the given topic until the predicate returns false
+ * Consumes message from the topic previously setup
*
- * @return
+ * @return An instance of the consumer records
*/
- public ConsumerRecords<K, V> consume() {
+ protected ConsumerRecords<K, V> consume() {
int retries = 10;
return consume(retries);
}
- public ConsumerRecords<K, V> consume(int retries) {
+ /**
+ * Consumes message from the topic previously setup
+ *
+ * @param retries how many times to retry consuming data from the topic
+ * @return An instance of the consumer records
+ */
+ protected ConsumerRecords<K, V> consume(int retries) {
while (retries > 0) {
ConsumerRecords<K, V> records = consumer.poll(pollDuration);
if (!records.isEmpty()) {
@@ -271,22 +316,35 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
return ConsumerRecords.empty();
}
- public long getErrorCount() {
- return errorCount;
+ public void subscribe() throws Exception {
+ if (resumeCache.capacity() >= 1) {
+ checkAndSubscribe(topic, resumeCache.capacity());
+ } else {
+ checkAndSubscribe(topic);
+ }
}
- public List<Future<RecordMetadata>> getSentItems() {
- return Collections.unmodifiableList(sentItems);
+ @Override
+ public ResumeAdapter getAdapter() {
+ return resumeAdapter;
+ }
+
+ /**
+ * Gets the set record of sent items
+ *
+ * @return
+ */
+ protected Collection<RecordError> getProducerErrors() {
+ return Collections.unmodifiableCollection(producerErrors);
}
@Override
public void build() {
- Service.super.build();
+ // NO-OP
}
@Override
public void init() {
- Service.super.init();
LOG.debug("Initializing the Kafka resume strategy");
if (consumer == null) {
@@ -300,12 +358,16 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
@Override
public void stop() {
+ LOG.info("Closing the Kafka producer");
+ IOHelper.close(producer, "Kafka producer", LOG);
+ LOG.info("Closing the Kafka consumer");
+ IOHelper.close(producer, "Kafka consumer", LOG);
}
@Override
public void close() throws IOException {
- Service.super.close();
+ stop();
}
@Override
@@ -334,4 +396,27 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
protected Producer<K, V> getProducer() {
return producer;
}
+
+ protected Properties getProducerConfig() {
+ return producerConfig;
+ }
+
+ protected Properties getConsumerConfig() {
+ return consumerConfig;
+ }
+
+ protected String getTopic() {
+ return topic;
+ }
+
+ protected ResumeCache<K, V> getResumeCache() {
+ return resumeCache;
+ }
+
+ /**
+ * Clear the producer errors
+ */
+ public void resetProducerErrors() {
+ producerErrors.clear();
+ }
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
index 181009da26c..52388fd9081 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
@@ -25,15 +25,16 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.BindToRegistry;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
-import org.apache.camel.Offset;
-import org.apache.camel.Resumable;
-import org.apache.camel.Service;
-import org.apache.camel.UpdatableConsumerResumeStrategy;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
+import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter;
import org.apache.camel.component.kafka.consumer.support.KafkaResumable;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.Resumables;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTestSupport {
@@ -56,16 +58,13 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
private MockEndpoint result;
@BindToRegistry("resumeStrategy")
- private TestKafkaConsumerResumeStrategy resumeStrategy;
+ private TestUpdateStrategy resumeStrategy;
private CountDownLatch messagesLatch;
private KafkaProducer<Object, Object> producer;
- private static class TestKafkaConsumerResumeStrategy
- implements KafkaConsumerResumeStrategy,
- UpdatableConsumerResumeStrategy<String, Integer, Resumable<String, Integer>>, Service {
+ private static class TestUpdateStrategy extends TransientResumeStrategy
+ implements UpdatableConsumerResumeStrategy<String, Integer, Resumable<String, Integer>> {
private final CountDownLatch messagesLatch;
- private boolean resumeCalled;
- private boolean consumerIsNull = true;
private boolean startCalled;
private boolean offsetNull = true;
private boolean offsetAddressableNull = true;
@@ -74,34 +73,10 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
private boolean offsetValueEmpty = true;
private int lastOffset;
- public TestKafkaConsumerResumeStrategy(CountDownLatch messagesLatch) {
- this.messagesLatch = messagesLatch;
- }
-
- @Override
- public void setConsumer(Consumer<?, ?> consumer) {
- if (consumer != null) {
- consumerIsNull = false;
- }
- }
-
- @Override
- public void resume(KafkaResumable resumable) {
- resumeCalled = true;
-
- }
-
- @Override
- public void resume() {
- resumeCalled = true;
- }
-
- public boolean isResumeCalled() {
- return resumeCalled;
- }
+ public TestUpdateStrategy(ResumeAdapter resumeAdapter, CountDownLatch messagesLatch) {
+ super(resumeAdapter);
- public boolean isConsumerIsNull() {
- return consumerIsNull;
+ this.messagesLatch = messagesLatch;
}
@Override
@@ -115,12 +90,8 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
LOG.warn("Init was called");
}
- public boolean isStartCalled() {
- return startCalled;
- }
-
@Override
- public void updateLastOffset(Resumable<String, Integer> offset) {
+ public void updateLastOffset(Resumable<String, Integer> offset) throws Exception {
try {
if (offset != null) {
offsetNull = false;
@@ -166,6 +137,40 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
public boolean isOffsetValueEmpty() {
return offsetValueEmpty;
}
+
+ public boolean isStartCalled() {
+ return startCalled;
+ }
+ }
+
+ private static class TestKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
+ private boolean resumeCalled;
+ private boolean consumerIsNull = true;
+
+ @Override
+ public void setConsumer(Consumer<?, ?> consumer) {
+ if (consumer != null) {
+ consumerIsNull = false;
+ }
+ }
+
+ @Override
+ public void setKafkaResumable(KafkaResumable kafkaResumable) {
+
+ }
+
+ @Override
+ public void resume() {
+ resumeCalled = true;
+ }
+
+ public boolean isResumeCalled() {
+ return resumeCalled;
+ }
+
+ public boolean isConsumerIsNull() {
+ return consumerIsNull;
+ }
}
@BeforeEach
@@ -183,7 +188,7 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
super.doPreSetup();
messagesLatch = new CountDownLatch(1);
- resumeStrategy = new TestKafkaConsumerResumeStrategy(messagesLatch);
+ resumeStrategy = new TestUpdateStrategy(new TestKafkaConsumerResumeAdapter(), messagesLatch);
}
@Test
@@ -191,9 +196,11 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
public void testOffsetIsBeingChecked() throws InterruptedException {
assertTrue(messagesLatch.await(100, TimeUnit.SECONDS), "The resume was not called");
- assertTrue(resumeStrategy.isResumeCalled(),
+ final TestKafkaConsumerResumeAdapter adapter = resumeStrategy.getAdapter(TestKafkaConsumerResumeAdapter.class);
+ assertNotNull(adapter, "The adapter should not be null");
+ assertTrue(adapter.isResumeCalled(),
"The resume strategy should have been called when the partition was assigned");
- assertFalse(resumeStrategy.isConsumerIsNull(),
+ assertFalse(adapter.isConsumerIsNull(),
"The consumer passed to the strategy should not be null");
assertTrue(resumeStrategy.isStartCalled(),
"The resume strategy should have been started");
diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
index a79ce85a743..2e699a2de92 100644
--- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
+++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
@@ -21,8 +21,6 @@ import java.util.Optional;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.ResumeStrategy;
import org.apache.camel.StartupListener;
import org.apache.camel.SuspendableService;
import org.apache.camel.api.management.ManagedAttribute;
@@ -31,6 +29,8 @@ import org.apache.camel.cluster.CamelClusterEventListener;
import org.apache.camel.cluster.CamelClusterMember;
import org.apache.camel.cluster.CamelClusterService;
import org.apache.camel.cluster.CamelClusterView;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
diff --git a/core/camel-api/src/main/java/org/apache/camel/Route.java b/core/camel-api/src/main/java/org/apache/camel/Route.java
index 0e6f90882c5..c5f46ec328c 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Route.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Route.java
@@ -21,6 +21,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.camel.resume.ConsumerListener;
+import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.ManagementInterceptStrategy;
import org.apache.camel.spi.Resource;
diff --git a/core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java b/core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListener.java
similarity index 98%
rename from core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListener.java
index d92808e5d2a..bbf6be9d018 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListener.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.camel;
+package org.apache.camel.resume;
import java.util.function.Predicate;
diff --git a/core/camel-api/src/main/java/org/apache/camel/ConsumerListenerAware.java b/core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListenerAware.java
similarity index 97%
rename from core/camel-api/src/main/java/org/apache/camel/ConsumerListenerAware.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListenerAware.java
index bbd24c59e47..2e188721028 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ConsumerListenerAware.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListenerAware.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.camel;
+package org.apache.camel.resume;
/**
* An interface to represent an object which wishes to support listening for consumer events using the
diff --git a/core/camel-api/src/main/java/org/apache/camel/Offset.java b/core/camel-api/src/main/java/org/apache/camel/resume/Offset.java
similarity index 96%
rename from core/camel-api/src/main/java/org/apache/camel/Offset.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/Offset.java
index 1e73d1dcc2a..c41f312d49a 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Offset.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Offset.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.camel;
+package org.apache.camel.resume;
/**
* Generic offset without a concrete type
diff --git a/core/camel-api/src/main/java/org/apache/camel/Resumable.java b/core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java
similarity index 98%
rename from core/camel-api/src/main/java/org/apache/camel/Resumable.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java
index 804c05bf9a3..108af04293a 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Resumable.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.camel;
+package org.apache.camel.resume;
/**
* This provides an interface for resumable objects. Such objects allow its users to address them at a specific offset.
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumableSet.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumableSet.java
similarity index 98%
rename from core/camel-api/src/main/java/org/apache/camel/ResumableSet.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/ResumableSet.java
index 0a18ba9e232..d09a61f7e72 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumableSet.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumableSet.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.camel;
+package org.apache.camel.resume;
import java.lang.reflect.Array;
import java.util.Arrays;
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java
similarity index 62%
copy from core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
copy to core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java
index f23e7625e75..53221ac7d10 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-package org.apache.camel;
+package org.apache.camel.resume;
/**
- * Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume
- * processing records.
+ * A resume adapter provides the component-specific logic that plugs the more generic strategic with the lower level
+ * requirements of the component being used.
+ *
+ * It is the responsibility of the supported components to implement the custom implementation for this part of the
+ * resume API, as well as to offer component-specific interfaces that can be specialized by other integrations.
*/
-public interface ResumeStrategy extends Service {
-
+public interface ResumeAdapter {
/**
- * A consumer, iterator or value class that can be used to set the index position from which to resume from. The
- * type is specific to the component.
- *
+ * Execute the resume logic for the adapter
*/
void resume();
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeAware.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAware.java
similarity index 97%
rename from core/camel-api/src/main/java/org/apache/camel/ResumeAware.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/ResumeAware.java
index 6ee69db413f..10f59b3c00c 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumeAware.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAware.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.camel;
+package org.apache.camel.resume;
/**
* An interface to represent an object which wishes to support resume operations using a {@link ResumeStrategy}.
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
similarity index 65%
copy from core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
copy to core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
index f23e7625e75..06096be3b1e 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
@@ -15,7 +15,9 @@
* limitations under the License.
*/
-package org.apache.camel;
+package org.apache.camel.resume;
+
+import org.apache.camel.Service;
/**
* Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume
@@ -24,9 +26,20 @@ package org.apache.camel;
public interface ResumeStrategy extends Service {
/**
- * A consumer, iterator or value class that can be used to set the index position from which to resume from. The
- * type is specific to the component.
- *
+ * Gets an adapter for resuming operations
+ *
+ * @return
+ */
+ ResumeAdapter getAdapter();
+
+ /**
+ * Gets and adapter for resuming operations
+ *
+ * @param clazz the class of the adapter
+ * @return the adapter or null if it can't be cast to the requested class
+ * @param <T> the type of the adapter
*/
- void resume();
+ default <T extends ResumeAdapter> T getAdapter(Class<T> clazz) {
+ return clazz.cast(getAdapter());
+ }
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/UpdatableConsumerResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
similarity index 97%
rename from core/camel-api/src/main/java/org/apache/camel/UpdatableConsumerResumeStrategy.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
index 34bd71690f2..af6e682f198 100644
--- a/core/camel-api/src/main/java/org/apache/camel/UpdatableConsumerResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.camel;
+package org.apache.camel.resume;
/**
* An updatable resume strategy
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/cache/MultiEntryCache.java
similarity index 71%
rename from components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeStrategy.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/cache/MultiEntryCache.java
index 294e77223ab..0708d1ddff0 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/cache/MultiEntryCache.java
@@ -15,8 +15,15 @@
* limitations under the License.
*/
-package org.apache.camel.component.file.consumer;
+package org.apache.camel.resume.cache;
-public interface GenericFileResumeStrategy<T> extends FileConsumerResumeStrategy<GenericFileResumable<T>> {
+/**
+ * A cache where an entry can point to one or more entries. For instance, a path as the key and the file entries as its
+ * entries
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public interface MultiEntryCache<K, V> extends ResumeCache<K, V> {
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeCache.java b/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
similarity index 76%
rename from core/camel-api/src/main/java/org/apache/camel/ResumeCache.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
index 0abe2e6dca7..c2ce3758f33 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumeCache.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.camel;
+package org.apache.camel.resume.cache;
-import java.util.Optional;
+import org.apache.camel.resume.ResumeStrategy;
/**
* This cache stored the resumed data from a {@link ResumeStrategy}.
@@ -27,6 +27,15 @@ import java.util.Optional;
*/
public interface ResumeCache<K, V> {
+ /**
+ * Whether the cache contains the key with the given entry value
+ *
+ * @param key the key
+ * @param entry the entry
+ * @return true if the key/entry pair is stored in the cache
+ */
+ boolean contains(K key, V entry);
+
/**
* Adds a value to the cache
*
@@ -43,10 +52,9 @@ public interface ResumeCache<K, V> {
boolean isFull();
/**
- * Gets the offset value for the key
+ * Gets the cache pool size
*
- * @param key the key
- * @return the key
+ * @return
*/
- Optional<V> get(K key);
+ long capacity();
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/cache/SingleEntryCache.java
similarity index 67%
rename from core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/cache/SingleEntryCache.java
index f23e7625e75..e8b243846e7 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/cache/SingleEntryCache.java
@@ -15,18 +15,23 @@
* limitations under the License.
*/
-package org.apache.camel;
+package org.apache.camel.resume.cache;
+
+import java.util.Optional;
/**
- * Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume
- * processing records.
+ * A resume cache where a single key can only be mapped to a single entry
+ *
+ * @param <K> the type the key
+ * @param <V> the type of the entry
*/
-public interface ResumeStrategy extends Service {
-
+public interface SingleEntryCache<K, V> extends ResumeCache<K, V> {
/**
- * A consumer, iterator or value class that can be used to set the index position from which to resume from. The
- * type is specific to the component.
+ * Gets the offset value for the key
*
+ * @param key the key
+ * @return the key
*/
- void resume();
+ Optional<V> get(K key);
+
}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
index 8a8fc6a10d5..c35da8960f3 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
@@ -28,15 +28,11 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
-import org.apache.camel.ConsumerListener;
-import org.apache.camel.ConsumerListenerAware;
import org.apache.camel.Endpoint;
import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.NamedNode;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.ResumeStrategy;
import org.apache.camel.Route;
import org.apache.camel.RouteAware;
import org.apache.camel.Service;
@@ -44,6 +40,10 @@ import org.apache.camel.ShutdownRoute;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.Suspendable;
import org.apache.camel.SuspendableService;
+import org.apache.camel.resume.ConsumerListener;
+import org.apache.camel.resume.ConsumerListenerAware;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.ManagementInterceptStrategy;
diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
index 8176f7d6a52..bcaee49fecf 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -34,7 +34,7 @@ This instance can be bound in the Context registry as follows:
[source,java]
----
-getCamelContext().getRegistry().bind("testResumeStrategy", new MyTestResumeStrategy());
+getCamelContext().getRegistry().bind("testResumeStrategy", new MyTestResumeStrategy(new MyAdapter()));
from("some:component")
.resumable("testResumeStrategy")
@@ -46,129 +46,70 @@ Or the instance can be constructed as follows:
[source,java]
----
from("some:component")
- .resumable(new MyTestResumeStrategy())
+ .resumable(new MyTestResumeStrategy(new MyAdapter()))
.process(this::process)
----
+=== The Resume Adapter
+
+The adapter class responsibility is to bind the component-specific part of the logic to the more generic handling of the
+resume strategy. The adapter is always component specific and some components may have more than one. Integrations with
+more complex resume processes, may implement their own adapters, although the builtin ones should be useful in most of the
+cases. Currently, the following adapters are available:
+
+* camel-atom: `org.apache.camel.component.feed.EntryFilter`
+* camel-aws2-kinesis: `org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter`
+* camel-cassandracql: `org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter`
+* camel-couchbase: `org.apache.camel.component.couchbase.CouchbaseResumeAdapter`
+* camel-couchdb: `org.apache.camel.component.couchdb.consumer.CouchDbResumeAdapter`
+* camel-file: `org.apache.camel.component.file.consumer.adapters.FileSetResumeAdapter` for directories
+* camel-file: `org.apache.camel.component.file.consumer.adapters.GenericFileResumeAdapter` for files
+* camel-kafka: `org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter`
+* camel-rss: `org.apache.camel.component.feed.EntryFilter`
+* generic: `org.apache.camel.processor.resume.DelegatingResumeAdapter`
+
+Note: in the future, these adapters will be resolved automatically by Camel.
+
== The Resume API Interfaces
These are the *core interfaces*:
-* `org.apache.camel.ResumeStrategy` - the basic resume strategy
-* `org.apache.camel.UpdatableConsumerResumeStrategy` - an extension to the resume strategy to allow updatable strategies
-* `org.apache.camel.ResumeCache` - an interface for local cache for resumable information
+* `org.apache.camel.resume.ResumeStrategy` - the resume strategy service
+* `org.apache.camel.resume.ResumeAdapter` - an adapter that binds the generic parts of the resume strategy with the component
+* `org.apache.camel.resume.UpdatableConsumerResumeStrategy` - an extension to the resume strategy to allow updatable strategies
+* `org.apache.camel.resume.cache.ResumeCache` - the base interface for local cache for resumable information
+* `org.apache.camel.resume.cache.SingleEntryCache` - an interface for local cache for resumable information where there is a one-to-one relationship between cache a key and its entry (i.e: a file and its offset)
+* `org.apache.camel.resume.cache.MultiEntryCache` - an interface for local cache for resumable information where there is a one-to-many relationship between the cache a keys and its entries (i.e.: a path and its file entries)
These are the *core classes* supporting the strategies:
-* `org.apache.camel.Resumable` - an interface to allow users to work with abstract resumable entities (files, offsets, etc)
-* `org.apache.camel.ResumableSet` - an interface for resumables with a 1-to-many relationship
-* `org.apache.camel.Offset` - a generic offset without a concrete type (it may represent a long, a file name, etc)
+* `org.apache.camel.resume.Resumable` - an interface to allow users to work with abstract resumable entities (files, offsets, etc)
+* `org.apache.camel.resume.ResumableSet` - an interface for resumables with a 1-to-many relationship
+* `org.apache.camel.resume.Offset` - a generic offset without a concrete type (it may represent a long, a file name, etc)
These are the *supporting classes*:
* `org.apache.camel.support.Resumables` - resumables handling support
* `org.apache.camel.support.Offsets` - offset handling support
-== Basic Strategies
-
-The basic strategies offer a component-specific skeleton that can be used to implement strategies.
-
-* `AbstractKafkaResumeStrategy` - a resume strategy from the `camel-kafka` component that uses Kafka as the store for the offsets.
-
-[source,java]
-----
-public class KafkaResumeStrategy<K> extends AbstractKafkaResumeStrategy<K, Long> implements GenericFileResumeStrategy<File> {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaResumeStrategy.class);
- public static final int CACHE_SIZE = 100;
-
- private final String topic;
- private final ResumeCache<K, Long> cache;
+== Builtin Resume Strategies
- public KafkaResumeStrategy(String topic,
- ResumeCache<K, Long> cache,
- DefaultProducerPropertyFactory producerPropertyFactory,
- DefaultConsumerPropertyFactory consumerPropertyFactory)
- {
- super(topic, cache, producerPropertyFactory.getProperties(), consumerPropertyFactory.getProperties());
- this.topic = topic;
- this.cache = cache;
- }
+Camel comes with a few builtin strategies that can be used to store, retrieve and update the offsets. The following strategies are available:
+* `SingleNodeKafkaResumeStrategy` - a resume strategy from the `camel-kafka` component that uses Kafka as the store for the offsets and is suitable for single node integrations.
+* `MultiNodeKafkaResumeStrategy` - a resume strategy from the `camel-kafka` component that uses Kafka as the store for the offsets and is suitable for multi node integrations (i.e.: integrations running on clusters using the xref:components::master-component.adoc[camel-master] component.
- private Optional<Long> getLastOffset(GenericFileResumable<File> resumable) {
- final File addressable = resumable.getAddressable();
- return getLastOffset((K) addressable);
- }
- public Optional<Long> getLastOffset(K addressable) {
- return cache.get(addressable);
- }
-
- @Override
- public void subscribe() {
- checkAndSubscribe(topic, 1);
- }
-
- @Override
- public void resume(GenericFileResumable<File> resumable) {
- final Optional<Long> lastOffsetOpt = getLastOffset(resumable);
-
- if (!lastOffsetOpt.isPresent()) {
- return;
- }
-
- final long lastOffset = lastOffsetOpt.get();
- resumable.updateLastOffset(lastOffset);
- }
-
- @Override
- public void resume() {
- throw new UnsupportedOperationException("Cannot perform blind resume");
- }
-}
-----
+=== Implementing New Builtin Resume Strategies
+New builtin resume strategies can be created by implementing the `UpdatableConsumerResumeStrategy` and the `ResumeStrategy` interfaces. Check the code for `SingleNodeKafkaResumeStrategy` for implementation details.
== Local Cache Support
A sample local cache implemented using https://github.com/ben-manes/caffeine[Caffeine].
-[source,java]
-----
-public class SingleItemCache<K> implements ResumeCache<K, Long> {
- public static final int CACHE_SIZE = 100;
- private final Cache<K, Long> cache = Caffeine.newBuilder()
- .maximumSize(CACHE_SIZE)
- .build();
-
- @Override
- public void add(K key, Long offsetValue) {
- cache.put(key, offsetValue);
- }
-
- @Override
- public Optional<Long> get(K key) {
- Long entry = cache.getIfPresent(key);
-
- if (entry == null) {
- return Optional.empty();
- }
-
- return Optional.of(entry.longValue());
- }
-
- @Override
- public boolean isFull() {
- if (cache.estimatedSize() < CACHE_SIZE) {
- return true;
- }
-
- return false;
- }
-}
-----
-
+* `org.apache.camel.component.caffeine.resume.single.CaffeineCache`: for data with where 1 key can only point to 1 entry (1-to-1 relationship)
+* `org.apache.camel.component.caffeine.resume.multi.CaffeineCache`: for data with where 1 key can point to 1 or more entries (1-to-many relationship)
== Known Limitations
@@ -209,7 +150,7 @@ Currently, support for pausable consumers is available for the following compone
To use the API, it needs an instance of a Consumer listener along with a predicate that tests whether to continue.
-* `org.apache.camel.ConsumerListener` - the consumer listener interface. Camel already comes with pre-built consumer listeners, but users in need of more complex behaviors can create their own listeners.
+* `org.apache.camel.resume.ConsumerListener` - the consumer listener interface. Camel already comes with pre-built consumer listeners, but users in need of more complex behaviors can create their own listeners.
* a predicate that returns true if data consumption should resume or false if consumption should be put on pause
Usage example:
diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
index 88b088805ab..f1526b5311b 100644
--- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
+++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
@@ -12,7 +12,7 @@
"output": false
},
"properties": {
- "consumerListener": { "kind": "attribute", "displayName": "Consumer Listener", "required": true, "type": "object", "javaType": "org.apache.camel.ConsumerListener", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the consumer listener to use" },
+ "consumerListener": { "kind": "attribute", "displayName": "Consumer Listener", "required": true, "type": "object", "javaType": "org.apache.camel.resume.ConsumerListener", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the consumer listener to use" },
"untilCheck": { "kind": "attribute", "displayName": "Until Check", "required": true, "type": "object", "javaType": "java.util.function.Predicate", "deprecated": false, "autowired": false, "secret": false },
"id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
"description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json
index acbea228865..03a0a440e46 100644
--- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json
+++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json
@@ -12,7 +12,7 @@
"output": false
},
"properties": {
- "resumeStrategy": { "kind": "attribute", "displayName": "Resume Strategy", "required": true, "type": "object", "javaType": "org.apache.camel.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the resume strategy to use" },
+ "resumeStrategy": { "kind": "attribute", "displayName": "Resume Strategy", "required": true, "type": "object", "javaType": "org.apache.camel.resume.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the resume strategy to use" },
"id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
"description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
}
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java
index d6f1bdfcaf2..7c8ceef4738 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java
@@ -25,7 +25,7 @@ import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
-import org.apache.camel.ConsumerListener;
+import org.apache.camel.resume.ConsumerListener;
import org.apache.camel.spi.Metadata;
/**
@@ -40,7 +40,7 @@ public class PausableDefinition extends NoOutputDefinition<PausableDefinition> {
private ConsumerListener<?, ?> consumerListenerBean;
@XmlAttribute(required = true)
- @Metadata(required = true, javaType = "org.apache.camel.ConsumerListener")
+ @Metadata(required = true, javaType = "org.apache.camel.resume.ConsumerListener")
private String consumerListener;
@XmlTransient
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index af065fcbae0..e2b0cadf078 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -37,7 +37,6 @@ import org.apache.camel.AggregationStrategy;
import org.apache.camel.BeanScope;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
-import org.apache.camel.ConsumerListener;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
@@ -45,7 +44,6 @@ import org.apache.camel.Expression;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
-import org.apache.camel.ResumeStrategy;
import org.apache.camel.builder.DataFormatClause;
import org.apache.camel.builder.EndpointConsumerBuilder;
import org.apache.camel.builder.EndpointProducerBuilder;
@@ -60,6 +58,8 @@ import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.model.language.LanguageExpression;
import org.apache.camel.model.language.SimpleExpression;
import org.apache.camel.processor.loadbalancer.LoadBalancer;
+import org.apache.camel.resume.ConsumerListener;
+import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.spi.AsEndpointUri;
import org.apache.camel.spi.AsPredicate;
import org.apache.camel.spi.DataFormat;
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
index 4e2d62deb86..bc68161ae30 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
@@ -22,7 +22,7 @@ import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.spi.Metadata;
/**
@@ -37,7 +37,7 @@ public class ResumableDefinition extends NoOutputDefinition<ResumableDefinition>
private ResumeStrategy resumeStrategyBean;
@XmlAttribute(required = true)
- @Metadata(required = true, javaType = "org.apache.camel.ResumeStrategy")
+ @Metadata(required = true, javaType = "org.apache.camel.resume.ResumeStrategy")
private String resumeStrategy;
@Metadata(label = "advanced", javaType = "org.apache.camel.LoggingLevel", defaultValue = "ERROR",
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java
index 85954c3b6f7..09c3ab6db35 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java
@@ -24,10 +24,10 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
-import org.apache.camel.ConsumerListener;
import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
+import org.apache.camel.resume.ConsumerListener;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeAdapter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeAdapter.java
new file mode 100644
index 00000000000..6aee7fd8f4f
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeAdapter.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.function.IntFunction;
+import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
+
+import org.apache.camel.resume.ResumeAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A delegating adapter that can be used to delegate to and/or abstract resume adapters
+ */
+public class DelegatingResumeAdapter implements ResumeAdapter {
+ private static final Logger LOG = LoggerFactory.getLogger(DelegatingResumeAdapter.class);
+
+ private final List<ResumeAdapter> resumeStrategies;
+
+ public DelegatingResumeAdapter() {
+ resumeStrategies = new ArrayList<>();
+ }
+
+ protected DelegatingResumeAdapter(List<ResumeAdapter> resumeStrategies) {
+ this.resumeStrategies = resumeStrategies;
+ }
+
+ public boolean add(ResumeAdapter resumeAdapter) {
+ return resumeStrategies.add(resumeAdapter);
+ }
+
+ public boolean remove(Object resumeAdapter) {
+ return resumeStrategies.remove(resumeAdapter);
+ }
+
+ public boolean removeIf(Predicate<? super ResumeAdapter> filter) {
+ return resumeStrategies.removeIf(filter);
+ }
+
+ @Override
+ public void resume() {
+ resumeStrategies.forEach(ResumeAdapter::resume);
+ }
+
+ public int size() {
+ return resumeStrategies.size();
+ }
+
+ public boolean isEmpty() {
+ return resumeStrategies.isEmpty();
+ }
+
+ public boolean contains(Object o) {
+ return resumeStrategies.contains(o);
+ }
+
+ public Iterator<ResumeAdapter> iterator() {
+ return resumeStrategies.iterator();
+ }
+
+ public Object[] toArray() {
+ return resumeStrategies.toArray();
+ }
+
+ public <T> T[] toArray(T[] a) {
+ return resumeStrategies.toArray(a);
+ }
+
+ public boolean containsAll(Collection<?> c) {
+ return resumeStrategies.containsAll(c);
+ }
+
+ public boolean addAll(Collection<? extends ResumeAdapter> c) {
+ return resumeStrategies.addAll(c);
+ }
+
+ public boolean addAll(int index, Collection<? extends ResumeAdapter> c) {
+ return resumeStrategies.addAll(index, c);
+ }
+
+ public boolean removeAll(Collection<?> c) {
+ return resumeStrategies.removeAll(c);
+ }
+
+ public boolean retainAll(Collection<?> c) {
+ return resumeStrategies.retainAll(c);
+ }
+
+ public void replaceAll(UnaryOperator<ResumeAdapter> operator) {
+ resumeStrategies.replaceAll(operator);
+ }
+
+ public void sort(Comparator<? super ResumeAdapter> c) {
+ resumeStrategies.sort(c);
+ }
+
+ public void clear() {
+ resumeStrategies.clear();
+ }
+
+ public ResumeAdapter get(int index) {
+ return resumeStrategies.get(index);
+ }
+
+ public ResumeAdapter set(int index, ResumeAdapter element) {
+ return resumeStrategies.set(index, element);
+ }
+
+ public void add(int index, ResumeAdapter element) {
+ resumeStrategies.add(index, element);
+ }
+
+ public ResumeAdapter remove(int index) {
+ return resumeStrategies.remove(index);
+ }
+
+ public int indexOf(Object o) {
+ return resumeStrategies.indexOf(o);
+ }
+
+ public int lastIndexOf(Object o) {
+ return resumeStrategies.lastIndexOf(o);
+ }
+
+ public ListIterator<ResumeAdapter> listIterator() {
+ return resumeStrategies.listIterator();
+ }
+
+ public ListIterator<ResumeAdapter> listIterator(int index) {
+ return resumeStrategies.listIterator(index);
+ }
+
+ public List<ResumeAdapter> subList(int fromIndex, int toIndex) {
+ return resumeStrategies.subList(fromIndex, toIndex);
+ }
+
+ public Spliterator<ResumeAdapter> spliterator() {
+ return resumeStrategies.spliterator();
+ }
+
+ public <T> T[] toArray(IntFunction<T[]> generator) {
+ return resumeStrategies.toArray(generator);
+ }
+
+ public Stream<ResumeAdapter> stream() {
+ return resumeStrategies.stream();
+ }
+
+ public Stream<ResumeAdapter> parallelStream() {
+ return resumeStrategies.parallelStream();
+ }
+
+ public void forEach(Consumer<? super ResumeAdapter> action) {
+ resumeStrategies.forEach(action);
+ }
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeStrategy.java
deleted file mode 100644
index 71ea930f95c..00000000000
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeStrategy.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.processor.resume;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.function.Predicate;
-
-import org.apache.camel.ResumeStrategy;
-import org.apache.camel.Service;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A delegating strategy that can be used to delegate to and/or abstract resume strategies
- */
-public class DelegatingResumeStrategy implements ResumeStrategy {
- private static final Logger LOG = LoggerFactory.getLogger(DelegatingResumeStrategy.class);
-
- private final List<ResumeStrategy> resumeStrategies;
-
- public DelegatingResumeStrategy() {
- resumeStrategies = new ArrayList<>();
- }
-
- protected DelegatingResumeStrategy(List<ResumeStrategy> resumeStrategies) {
- this.resumeStrategies = resumeStrategies;
- }
-
- public boolean add(ResumeStrategy resumeStrategy) {
- return resumeStrategies.add(resumeStrategy);
- }
-
- public boolean remove(Object resumeStrategy) {
- return resumeStrategies.remove(resumeStrategy);
- }
-
- public boolean removeIf(Predicate<? super ResumeStrategy> filter) {
- return resumeStrategies.removeIf(filter);
- }
-
- @Override
- public void resume() {
- resumeStrategies.forEach(ResumeStrategy::resume);
- }
-
- @Override
- public void start() {
- resumeStrategies.forEach(Service::start);
- }
-
- @Override
- public void stop() {
- resumeStrategies.forEach(Service::stop);
- }
-
- @Override
- public void build() {
- resumeStrategies.forEach(Service::build);
- }
-
- @Override
- public void init() {
- resumeStrategies.forEach(Service::init);
- }
-
- private void close(ResumeStrategy resumeStrategy) {
- try {
- resumeStrategy.close();
- } catch (IOException e) {
- LOG.warn("Failed to close resume strategy {}: {}", resumeStrategy.getClass().getSimpleName(), e.getMessage());
- }
- }
-
- @Override
- public void close() throws IOException {
- resumeStrategies.forEach(r -> close(r));
- }
-
- @Override
- public String toString() {
- return "DelegatingResumeStrategy{" +
- "resumeStrategies=" + resumeStrategies +
- '}';
- }
-}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
index a09d04591e6..aa347da63e1 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
@@ -19,9 +19,9 @@ package org.apache.camel.processor.resume;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
-import org.apache.camel.Resumable;
-import org.apache.camel.ResumeStrategy;
-import org.apache.camel.UpdatableConsumerResumeStrategy;
+import org.apache.camel.resume.Resumable;
+import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
import org.apache.camel.spi.CamelLogger;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.ExchangeHelper;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
index 240376d5acf..34b2b475c7e 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
@@ -29,7 +29,7 @@ import org.apache.camel.ExtendedExchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.Synchronization;
@@ -67,6 +67,13 @@ public class ResumableProcessor extends AsyncProcessorSupport
super.doStart();
}
+ @Override
+ protected void doStop() throws Exception {
+ LOG.info("Stopping the resumable strategy: {}", resumeStrategy.getClass().getSimpleName());
+ resumeStrategy.stop();
+ super.doStop();
+ }
+
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
final Synchronization onCompletion = new ResumableCompletion(resumeStrategy, loggingLevel);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
similarity index 54%
copy from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
copy to core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index 388f47fa3e5..b0ffd26c080 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -15,33 +15,34 @@
* limitations under the License.
*/
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.processor.resume;
-import org.apache.camel.Offset;
-import org.apache.camel.Resumable;
-import org.apache.camel.resume.Offsets;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.ResumeStrategy;
-public class KafkaResumable implements Resumable<String, String> {
- private final String partition;
- private String offset;
+/**
+ * A resume strategy that keeps all the resume strategy information in memory. This is hardly useful for production
+ * level implementations, but can be useful for testing the resume strategies
+ */
+public class TransientResumeStrategy implements ResumeStrategy {
+ private final ResumeAdapter resumeAdapter;
- public KafkaResumable(String partition, String offset) {
- this.partition = partition;
- this.offset = offset;
+ public TransientResumeStrategy(ResumeAdapter resumeAdapter) {
+ this.resumeAdapter = resumeAdapter;
}
@Override
- public void updateLastOffset(String offset) {
- this.offset = offset;
+ public ResumeAdapter getAdapter() {
+ return resumeAdapter;
}
@Override
- public Offset<String> getLastOffset() {
- return Offsets.of(offset);
+ public void start() {
+ // this is NO-OP
}
@Override
- public String getAddressable() {
- return partition;
+ public void stop() {
+ // this is NO-OP
}
}
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java
index 0626fcb05e5..f67406b871d 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java
@@ -19,12 +19,12 @@ package org.apache.camel.reifier;
import java.util.function.Predicate;
-import org.apache.camel.ConsumerListener;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.model.PausableDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.processor.PausableProcessor;
+import org.apache.camel.resume.ConsumerListener;
import org.apache.camel.util.ObjectHelper;
public class PausableReifier extends ProcessorReifier<PausableDefinition> {
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
index c96b14a941c..da5c21e9dc1 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
@@ -18,11 +18,11 @@ package org.apache.camel.reifier;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Processor;
-import org.apache.camel.ResumeStrategy;
import org.apache.camel.Route;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.ResumableDefinition;
import org.apache.camel.processor.resume.ResumableProcessor;
+import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.util.ObjectHelper;
public class ResumableReifier extends ProcessorReifier<ResumableDefinition> {
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index f258e5e32c9..9053c2ca3d3 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -20,25 +20,27 @@ import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
-import org.apache.camel.Resumable;
import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.UpdatableConsumerResumeStrategy;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.file.consumer.GenericFileResumable;
-import org.apache.camel.component.file.consumer.GenericFileResumeStrategy;
+import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
+import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.Resumables;
+import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport {
- private static class TestResumeStrategy implements GenericFileResumeStrategy<File> {
+ private static class TestFileResumeAdapter implements GenericFileResumeAdapter {
@Override
public void resume(GenericFileResumable<File> resumable) {
if (!resumable.getAddressable().getName().startsWith("resume-from-offset")) {
@@ -55,17 +57,12 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
}
@Override
- public void start() {
-
- }
-
- @Override
- public void stop() {
-
+ public Optional<Long> getLastOffset(File addressable) {
+ return Optional.empty();
}
}
- private static class FailResumeStrategy extends TestResumeStrategy
+ private static class FailResumeAdapter extends TestFileResumeAdapter
implements UpdatableConsumerResumeStrategy<File, Long, Resumable<File, Long>> {
private boolean called;
@@ -75,7 +72,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
}
}
- private static final FailResumeStrategy FAIL_RESUME_STRATEGY = new FailResumeStrategy();
+ private static final TransientResumeStrategy FAIL_RESUME_STRATEGY = new TransientResumeStrategy(new FailResumeAdapter());
@DisplayName("Tests whether it can resume from an offset")
@Test
@@ -106,7 +103,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
List<Exchange> exchangeList = mock.getExchanges();
Assertions.assertFalse(exchangeList.isEmpty(), "It should have received a few messages");
- Assertions.assertFalse(FAIL_RESUME_STRATEGY.called);
+ Assertions.assertFalse(((FailResumeAdapter) FAIL_RESUME_STRATEGY.getAdapter()).called);
}
@DisplayName("Tests whether we can start from the beginning (i.e.: no resume strategy)")
@@ -127,7 +124,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
@Override
public void configure() {
- bindToRegistry("myResumeStrategy", new TestResumeStrategy());
+ bindToRegistry("myResumeStrategy", new TransientResumeStrategy(new TestFileResumeAdapter()));
bindToRegistry("resumeNotToBeCalledStrategy", FAIL_RESUME_STRATEGY);
from(fileUri("resumeOff?noop=true&recursive=true"))
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
index c8308b0b0ff..b0f6ef96026 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
@@ -25,14 +25,15 @@ import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.file.consumer.FileResumeSet;
-import org.apache.camel.component.file.consumer.FileSetResumeStrategy;
+import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
import org.apache.camel.resume.Resumables;
import org.junit.jupiter.api.Test;
public class FileConsumerResumeStrategyTest extends ContextTestSupport {
- private static class TestResumeStrategy implements FileSetResumeStrategy {
+ private static class TestFileSetResumeAdapter implements FileSetResumeAdapter {
private List<String> processedFiles = Arrays.asList("0.txt", "1.txt", "2.txt");
private FileResumeSet resumeSet;
@@ -49,16 +50,6 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport {
resumeSet.resumeEach(f -> !processedFiles.contains(f.getName()));
}
}
-
- @Override
- public void start() {
-
- }
-
- @Override
- public void stop() {
-
- }
}
private static Map<String, Object> headerFor(int num) {
@@ -99,7 +90,7 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport {
@Override
public void configure() throws Exception {
- bindToRegistry("testResumeStrategy", new TestResumeStrategy());
+ bindToRegistry("testResumeStrategy", new TransientResumeStrategy(new TestFileSetResumeAdapter()));
from(fileUri("resume?noop=true&recursive=true"))
.resumable("testResumeStrategy")
diff --git a/core/camel-support/src/main/java/org/apache/camel/resume/Offsets.java b/core/camel-support/src/main/java/org/apache/camel/resume/Offsets.java
index a7f46836879..ab5421af693 100644
--- a/core/camel-support/src/main/java/org/apache/camel/resume/Offsets.java
+++ b/core/camel-support/src/main/java/org/apache/camel/resume/Offsets.java
@@ -17,8 +17,6 @@
package org.apache.camel.resume;
-import org.apache.camel.Offset;
-
/**
* Offset handling support
*/
diff --git a/core/camel-support/src/main/java/org/apache/camel/resume/Resumables.java b/core/camel-support/src/main/java/org/apache/camel/resume/Resumables.java
index 38bfed33c69..05a931cfd6b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/resume/Resumables.java
+++ b/core/camel-support/src/main/java/org/apache/camel/resume/Resumables.java
@@ -17,9 +17,6 @@
package org.apache.camel.resume;
-import org.apache.camel.Offset;
-import org.apache.camel.Resumable;
-
/**
* A wrapper for resumable entities
*/