You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/03/01 18:24:15 UTC

[camel] 09/16: CAMEL-17117: preliminary documentation for the resume strategy

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ece9df55eca1fad0878ecf48a916c06a2afd6e5b
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Feb 24 10:06:34 2022 +0100

    CAMEL-17117: preliminary documentation for the resume strategy
---
 .../docs/modules/eips/pages/resume-strategies.adoc | 179 +++++++++++++++++++++
 1 file changed, 179 insertions(+)

diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
new file mode 100644
index 0000000..b284b4c
--- /dev/null
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -0,0 +1,179 @@
+= Resume Strategies
+:doctitle: Resume Strategies
+:shortname: resume
+:description: Provide strategies to allow consuming data from specific offsets
+:since:
+:supportlevel: Experimental
+
+The resume strategies allow users to implement strategies that point the consumer part of the routes to the last point of consumption. This allows Camel to skip reading and processing data that has already been consumed.
+
+The resume strategies can be used to allow quicker stop and resume operations when consuming large data sources. For instance, imagine a scenario where the file consumer is reading a large file. Without a resume strategy, stopping and starting Camel would cause the consumer in the File component to read all the bytes of the given file at the initial offset (offset 0). The resume strategy allow integrations can point the consumer to the exact offset to resume the operations.
+
+The resume strategies comes in 3 parts:
+
+* A DSL method that marks the route as supporting resume operations and points to an instance of a strategy implementation.
+* A set of core infrastructure that allow integrations to implement different types of strategies
+* Basic strategies implementations that can be extended to implement the specific resume strategies required by the integrations
+
+=== The DSL method
+
+The route needs to use the `resumable()` method followed by a `resumableStrategyRef` to point to an instance of the resume strategy in use. This instance needs to be bound in the Context registry.
+
+[source,java]
+----
+getCamelContext().getRegistry().bind("testResumeStrategy", testResumeStrategy);
+
+from("some:component")
+    .process(this::process)
+    .resumable().resumableStrategyRef("testResumeStrategy");
+----
+
+
+=== The Core Interfaces
+
+These are the *core interfaces*:
+
+* org.apache.camel.ResumeStrategy: the basic resume strategy
+* org.apache.camel.UpdatableConsumerResumeStrategy: an extension to the resume strategy to allow updatable strategies
+* org.apache.camel.ResumeCache: an interface for local cache for resumable information
+
+These are the *core classes* supporting the strategies:
+
+* org.apache.camel.Resumable: an interface to allow users to work with abstract resumable entities (files, offsets, etc)
+* org.apache.camel.ResumableSet: an interface for resumables with a 1-to-many relationship
+* org.apache.camel.Offset: a generic offset without a concrete type (it may represent a long, a file name, etc)
+
+
+These are the *supporting classes*:
+
+* org.apache.camel.support.Resumables: resumables handling support
+* org.apache.camel.support.Offsets: offset handling support
+
+
+=== Basic Strategies
+
+The basic strategies offer a component-specific skeleton that can be used to implement strategies.
+
+* AbstractKafkaResumeStrategy: a resume strategy from the `camel-kafka` component that uses Kafka as the store for the offsets.
+
+
+[source,java]
+----
+public class KafkaResumeStrategy<K> extends AbstractKafkaResumeStrategy<K, Long> implements GenericFileResumeStrategy<File> {
+private static final Logger LOG = LoggerFactory.getLogger(KafkaResumeStrategy.class);
+public static final int CACHE_SIZE = 100;
+
+    private final String topic;
+    private final ResumeCache<K, Long> cache;
+
+    public KafkaResumeStrategy(String topic,
+                               ResumeCache<K, Long> cache,
+                               DefaultProducerPropertyFactory producerPropertyFactory,
+                               DefaultConsumerPropertyFactory consumerPropertyFactory)
+    {
+        super(topic, cache, producerPropertyFactory.getProperties(), consumerPropertyFactory.getProperties());
+        this.topic = topic;
+        this.cache = cache;
+    }
+
+
+    private Optional<Long> getLastOffset(GenericFileResumable<File> resumable) {
+        final File addressable = resumable.getAddressable();
+        return getLastOffset((K) addressable);
+    }
+
+    public Optional<Long> getLastOffset(K addressable) {
+        return cache.get(addressable);
+    }
+
+    @Override
+    public void subscribe() {
+        checkAndSubscribe(topic, 1);
+    }
+
+    @Override
+    public void resume(GenericFileResumable<File> resumable) {
+        final Optional<Long> lastOffsetOpt = getLastOffset(resumable);
+
+        if (!lastOffsetOpt.isPresent()) {
+            return;
+        }
+
+        final long lastOffset = lastOffsetOpt.get();
+        resumable.updateLastOffset(lastOffset);
+    }
+
+    @Override
+    public void resume() {
+        throw new UnsupportedOperationException("Cannot perform blind resume");
+    }
+}
+----
+
+
+=== Local Cache Support
+
+A sample local cache implemented using https://github.com/ben-manes/caffeine[Caffeine].
+
+[source,java]
+----
+public class SingleItemCache<K> implements ResumeCache<K, Long> {
+    public static final int CACHE_SIZE = 100;
+    private final Cache<K, Long> cache = Caffeine.newBuilder()
+            .maximumSize(CACHE_SIZE)
+            .build();
+
+    @Override
+    public void add(K key, Long offsetValue) {
+        cache.put(key, offsetValue);
+    }
+
+    @Override
+    public Optional<Long> get(K key) {
+        Long entry = cache.getIfPresent(key);
+
+        if (entry == null) {
+            return Optional.empty();
+        }
+
+        return Optional.of(entry.longValue());
+    }
+
+    @Override
+    public boolean isFull() {
+        if (cache.estimatedSize() < CACHE_SIZE) {
+            return true;
+        }
+
+        return false;
+    }
+}
+----
+
+
+
+=== Known Limitations, Bugs and Other Notes
+
+* When using the converters with the file component, beware of the differences in the behavior from `Reader` and `InputStream`:
+
+For instance, the behavior of:
+
+[source,java]
+----
+from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
+    .convertBodyTo(Reader.class)
+    .process(this::process)
+    .resumable().resumableStrategyRef("testResumeStrategy");
+----
+
+Is different from the behavior of:
+
+[source,java]
+----
+from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
+    .convertBodyTo(InputStream.class)
+    .process(this::process)
+    .resumable().resumableStrategyRef("testResumeStrategy");
+----
+
+*Reason*: the `skip` method in the Reader will skip characters, whereas the same method on the InputStream will skip bytes.