You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/03/02 16:22:19 UTC
[camel] branch main updated: CAMEL-15562: adjust the resumable processing to run at the end of the route
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 67662bb CAMEL-15562: adjust the resumable processing to run at the end of the route
67662bb is described below
commit 67662bba11bd043301d83d84c35d737ba34ffd14
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Mar 2 13:45:42 2022 +0100
CAMEL-15562: adjust the resumable processing to run at the end of the route
---
.../docs/modules/eips/pages/resume-strategies.adoc | 12 +++---
.../processor/resume/ResumableCompletion.java | 24 ++++++++---
.../camel/processor/resume/ResumableProcessor.java | 46 ++--------------------
.../FileConsumerResumeFromOffsetStrategyTest.java | 30 ++++++++++++--
.../file/FileConsumerResumeStrategyTest.java | 2 +-
5 files changed, 56 insertions(+), 58 deletions(-)
diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
index e608045..656197f 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -26,8 +26,8 @@ This instance can be bound in the Context registry as follows:
getCamelContext().getRegistry().bind("testResumeStrategy", new MyTestResumeStrategy());
from("some:component")
- .process(this::process)
- .resumable().resumableStrategy("testResumeStrategy");
+ .resumable().resumableStrategy("testResumeStrategy")
+ .process(this::process);
----
Or the instance can be constructed as follows:
@@ -168,9 +168,9 @@ For instance, the behavior of:
[source,java]
----
from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
+ .resumable().resumableStrategy("testResumeStrategy")
.convertBodyTo(Reader.class)
- .process(this::process)
- .resumable().resumableStrategyRef("testResumeStrategy");
+ .process(this::process);
----
Is different from the behavior of:
@@ -178,9 +178,9 @@ Is different from the behavior of:
[source,java]
----
from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
+ .resumable().resumableStrategyRef("testResumeStrategy")
.convertBodyTo(InputStream.class)
- .process(this::process)
- .resumable().resumableStrategyRef("testResumeStrategy");
+ .process(this::process);
----
*Reason*: the `skip` method in the Reader will skip characters, whereas the same method on the InputStream will skip bytes.
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
index c7e712f..5b213cb 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
@@ -30,16 +30,27 @@ public class ResumableCompletion implements Synchronization {
private static final Logger LOG = LoggerFactory.getLogger(ResumableCompletion.class);
private final ResumeStrategy resumeStrategy;
- private final Resumable<?, ?> resumable;
- public ResumableCompletion(ResumeStrategy resumeStrategy, Resumable<?, ?> resumable) {
+ public ResumableCompletion(ResumeStrategy resumeStrategy) {
this.resumeStrategy = resumeStrategy;
- this.resumable = resumable;
}
@Override
public void onComplete(Exchange exchange) {
- if (!ExchangeHelper.isFailureHandled(exchange)) {
+ if (ExchangeHelper.isFailureHandled(exchange)) {
+ return;
+ }
+
+ Object offset = ExchangeHelper.getResultMessage(exchange).getHeader(Exchange.OFFSET);
+
+ if (offset instanceof Resumable) {
+ Resumable<?, ?> resumable = (Resumable<?, ?>) offset;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing the resumable: {}", resumable.getAddressable());
+ LOG.debug("Processing the resumable of type: {}", resumable.getLastOffset().offset());
+ }
+
if (resumeStrategy instanceof UpdatableConsumerResumeStrategy) {
UpdatableConsumerResumeStrategy updatableConsumerResumeStrategy
= (UpdatableConsumerResumeStrategy) resumeStrategy;
@@ -51,11 +62,14 @@ public class ResumableCompletion implements Synchronization {
} else {
LOG.debug("Cannot perform an offset update because the strategy is not updatable");
}
+ } else {
+ exchange.setException(new NoOffsetException(exchange));
+ LOG.warn("Cannot update the last offset because it's not available");
}
}
@Override
public void onFailure(Exchange exchange) {
- LOG.warn("Skipping offset update for {} due to failure in processing", resumable.getAddressable());
+ LOG.warn("Skipping offset update for due to failure in processing");
}
}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
index 467f124..4221614 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
@@ -25,9 +25,9 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
-import org.apache.camel.Resumable;
import org.apache.camel.ResumeStrategy;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.RouteIdAware;
@@ -51,32 +51,6 @@ public class ResumableProcessor extends AsyncProcessorSupport
private String id;
private String routeId;
- private static class ResumableProcessorCallback implements AsyncCallback {
-
- private final Exchange exchange;
- private final Synchronization completion;
- private final AsyncCallback callback;
-
- public ResumableProcessorCallback(Exchange exchange, Synchronization completion, AsyncCallback callback) {
- this.exchange = exchange;
- this.completion = completion;
- this.callback = callback;
- }
-
- @Override
- public void done(boolean doneSync) {
- try {
- if (exchange.isFailed()) {
- completion.onFailure(exchange);
- } else {
- completion.onComplete(exchange);
- }
- } finally {
- callback.done(doneSync);
- }
- }
- }
-
public ResumableProcessor(ResumeStrategy resumeStrategy, Processor processor) {
this.resumeStrategy = Objects.requireNonNull(resumeStrategy);
this.processor = AsyncProcessorConverterHelper.convert(processor);
@@ -92,23 +66,11 @@ public class ResumableProcessor extends AsyncProcessorSupport
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
- Object offset = exchange.getMessage().getHeader(Exchange.OFFSET);
-
- if (offset instanceof Resumable) {
- Resumable<?, ?> resumable = (Resumable<?, ?>) offset;
+ final Synchronization onCompletion = new ResumableCompletion(resumeStrategy);
- LOG.warn("Processing the resumable: {}", resumable.getAddressable());
- LOG.warn("Processing the resumable of type: {}", resumable.getLastOffset().offset());
+ exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
- final Synchronization onCompletion = new ResumableCompletion(resumeStrategy, resumable);
- final AsyncCallback target = new ResumableProcessorCallback(exchange, onCompletion, callback);
- return processor.process(exchange, target);
-
- } else {
- exchange.setException(new NoOffsetException(exchange));
- LOG.warn("Cannot update the last offset because it's not available");
- return true;
- }
+ return processor.process(exchange, callback);
}
@Override
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index a084eb8..044b79e 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -18,21 +18,25 @@ package org.apache.camel.component.file;
import java.io.File;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
+import org.apache.camel.Resumable;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.UpdatableConsumerResumeStrategy;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.file.consumer.GenericFileResumable;
import org.apache.camel.component.file.consumer.GenericFileResumeStrategy;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.resume.Resumables;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport {
-
private static class TestResumeStrategy implements GenericFileResumeStrategy<File> {
@Override
public void resume(GenericFileResumable<File> resumable) {
@@ -60,6 +64,18 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
}
}
+ private static class FailResumeStrategy extends TestResumeStrategy
+ implements UpdatableConsumerResumeStrategy<File, Long, Resumable<File, Long>> {
+ private boolean called;
+
+ @Override
+ public void updateLastOffset(Resumable<File, Long> offset) {
+ called = true;
+ }
+ }
+
+ private static final FailResumeStrategy FAIL_RESUME_STRATEGY = new FailResumeStrategy();
+
@DisplayName("Tests whether it can resume from an offset")
@Test
public void testResumeFromOffset() throws Exception {
@@ -81,10 +97,15 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
public void testMissingOffset() throws InterruptedException {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceivedInAnyOrder("34567890");
+ mock.expectedMessageCount(2);
template.sendBodyAndHeader(fileUri("resumeMissingOffset"), "01234567890", Exchange.FILE_NAME, "resume-from-offset.txt");
- mock.assertIsNotSatisfied();
+ MockEndpoint.assertWait(2, TimeUnit.SECONDS, mock);
+
+ List<Exchange> exchangeList = mock.getExchanges();
+ Assertions.assertFalse(exchangeList.isEmpty(), "It should have received a few messages");
+ Assertions.assertFalse(FAIL_RESUME_STRATEGY.called);
}
@DisplayName("Tests whether we can start from the beginning (i.e.: no resume strategy)")
@@ -106,16 +127,17 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
public void configure() {
bindToRegistry("resumeStrategy", new TestResumeStrategy());
+ bindToRegistry("resumeNotToBeCalledStrategy", FAIL_RESUME_STRATEGY);
from(fileUri("resumeOff?noop=true&recursive=true"))
+ .resumable().resumeStrategy("resumeStrategy")
.setHeader(Exchange.OFFSET,
constant(Resumables.of("resume-none.txt", 3)))
- .resumable().resumeStrategy("resumeStrategy")
.log("${body}")
.convertBodyTo(String.class).to("mock:result");
from(fileUri("resumeMissingOffset?noop=true&recursive=true"))
- .resumable().resumeStrategy("resumeStrategy")
+ .resumable().resumeStrategy("resumeNotToBeCalledStrategy")
.log("${body}")
.convertBodyTo(String.class).to("mock:result");
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
index 470a459..77e4e9d 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
@@ -102,8 +102,8 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport {
bindToRegistry("testResumeStrategy", new TestResumeStrategy());
from(fileUri("resume?noop=true&recursive=true"))
- .process(e -> setOffset(e))
.resumable().resumeStrategy("testResumeStrategy")
+ .process(e -> setOffset(e))
.convertBodyTo(String.class)
.to("mock:result");
}