You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/03/02 16:22:19 UTC

[camel] branch main updated: CAMEL-15562: adjust the resumable processing to run at the end of the route

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 67662bb  CAMEL-15562: adjust the resumable processing to run at the end of the route
67662bb is described below

commit 67662bba11bd043301d83d84c35d737ba34ffd14
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Mar 2 13:45:42 2022 +0100

    CAMEL-15562: adjust the resumable processing to run at the end of the route
---
 .../docs/modules/eips/pages/resume-strategies.adoc | 12 +++---
 .../processor/resume/ResumableCompletion.java      | 24 ++++++++---
 .../camel/processor/resume/ResumableProcessor.java | 46 ++--------------------
 .../FileConsumerResumeFromOffsetStrategyTest.java  | 30 ++++++++++++--
 .../file/FileConsumerResumeStrategyTest.java       |  2 +-
 5 files changed, 56 insertions(+), 58 deletions(-)

diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
index e608045..656197f 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -26,8 +26,8 @@ This instance can be bound in the Context registry as follows:
 getCamelContext().getRegistry().bind("testResumeStrategy", new MyTestResumeStrategy());
 
 from("some:component")
-    .process(this::process)
-    .resumable().resumableStrategy("testResumeStrategy");
+    .resumable().resumableStrategy("testResumeStrategy")
+    .process(this::process);
 ----
 
 Or the instance can be constructed as follows:
@@ -168,9 +168,9 @@ For instance, the behavior of:
 [source,java]
 ----
 from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
+    .resumable().resumableStrategy("testResumeStrategy")
     .convertBodyTo(Reader.class)
-    .process(this::process)
-    .resumable().resumableStrategyRef("testResumeStrategy");
+    .process(this::process);
 ----
 
 Is different from the behavior of:
@@ -178,9 +178,9 @@ Is different from the behavior of:
 [source,java]
 ----
 from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
+    .resumable().resumableStrategyRef("testResumeStrategy")
     .convertBodyTo(InputStream.class)
-    .process(this::process)
-    .resumable().resumableStrategyRef("testResumeStrategy");
+    .process(this::process);
 ----
 
 *Reason*: the `skip` method in the Reader will skip characters, whereas the same method on the InputStream will skip bytes.
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
index c7e712f..5b213cb 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
@@ -30,16 +30,27 @@ public class ResumableCompletion implements Synchronization {
     private static final Logger LOG = LoggerFactory.getLogger(ResumableCompletion.class);
 
     private final ResumeStrategy resumeStrategy;
-    private final Resumable<?, ?> resumable;
 
-    public ResumableCompletion(ResumeStrategy resumeStrategy, Resumable<?, ?> resumable) {
+    public ResumableCompletion(ResumeStrategy resumeStrategy) {
         this.resumeStrategy = resumeStrategy;
-        this.resumable = resumable;
     }
 
     @Override
     public void onComplete(Exchange exchange) {
-        if (!ExchangeHelper.isFailureHandled(exchange)) {
+        if (ExchangeHelper.isFailureHandled(exchange)) {
+            return;
+        }
+
+        Object offset = ExchangeHelper.getResultMessage(exchange).getHeader(Exchange.OFFSET);
+
+        if (offset instanceof Resumable) {
+            Resumable<?, ?> resumable = (Resumable<?, ?>) offset;
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Processing the resumable: {}", resumable.getAddressable());
+                LOG.debug("Processing the resumable of type: {}", resumable.getLastOffset().offset());
+            }
+
             if (resumeStrategy instanceof UpdatableConsumerResumeStrategy) {
                 UpdatableConsumerResumeStrategy updatableConsumerResumeStrategy
                         = (UpdatableConsumerResumeStrategy) resumeStrategy;
@@ -51,11 +62,14 @@ public class ResumableCompletion implements Synchronization {
             } else {
                 LOG.debug("Cannot perform an offset update because the strategy is not updatable");
             }
+        } else {
+            exchange.setException(new NoOffsetException(exchange));
+            LOG.warn("Cannot update the last offset because it's not available");
         }
     }
 
     @Override
     public void onFailure(Exchange exchange) {
-        LOG.warn("Skipping offset update for {} due to failure in processing", resumable.getAddressable());
+        LOG.warn("Skipping offset update for due to failure in processing");
     }
 }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
index 467f124..4221614 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
@@ -25,9 +25,9 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
-import org.apache.camel.Resumable;
 import org.apache.camel.ResumeStrategy;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.RouteIdAware;
@@ -51,32 +51,6 @@ public class ResumableProcessor extends AsyncProcessorSupport
     private String id;
     private String routeId;
 
-    private static class ResumableProcessorCallback implements AsyncCallback {
-
-        private final Exchange exchange;
-        private final Synchronization completion;
-        private final AsyncCallback callback;
-
-        public ResumableProcessorCallback(Exchange exchange, Synchronization completion, AsyncCallback callback) {
-            this.exchange = exchange;
-            this.completion = completion;
-            this.callback = callback;
-        }
-
-        @Override
-        public void done(boolean doneSync) {
-            try {
-                if (exchange.isFailed()) {
-                    completion.onFailure(exchange);
-                } else {
-                    completion.onComplete(exchange);
-                }
-            } finally {
-                callback.done(doneSync);
-            }
-        }
-    }
-
     public ResumableProcessor(ResumeStrategy resumeStrategy, Processor processor) {
         this.resumeStrategy = Objects.requireNonNull(resumeStrategy);
         this.processor = AsyncProcessorConverterHelper.convert(processor);
@@ -92,23 +66,11 @@ public class ResumableProcessor extends AsyncProcessorSupport
 
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
-        Object offset = exchange.getMessage().getHeader(Exchange.OFFSET);
-
-        if (offset instanceof Resumable) {
-            Resumable<?, ?> resumable = (Resumable<?, ?>) offset;
+        final Synchronization onCompletion = new ResumableCompletion(resumeStrategy);
 
-            LOG.warn("Processing the resumable: {}", resumable.getAddressable());
-            LOG.warn("Processing the resumable of type: {}", resumable.getLastOffset().offset());
+        exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
 
-            final Synchronization onCompletion = new ResumableCompletion(resumeStrategy, resumable);
-            final AsyncCallback target = new ResumableProcessorCallback(exchange, onCompletion, callback);
-            return processor.process(exchange, target);
-
-        } else {
-            exchange.setException(new NoOffsetException(exchange));
-            LOG.warn("Cannot update the last offset because it's not available");
-            return true;
-        }
+        return processor.process(exchange, callback);
     }
 
     @Override
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index a084eb8..044b79e 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -18,21 +18,25 @@ package org.apache.camel.component.file;
 
 import java.io.File;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
+import org.apache.camel.Resumable;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.UpdatableConsumerResumeStrategy;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.consumer.GenericFileResumable;
 import org.apache.camel.component.file.consumer.GenericFileResumeStrategy;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.resume.Resumables;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 
 public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport {
-
     private static class TestResumeStrategy implements GenericFileResumeStrategy<File> {
         @Override
         public void resume(GenericFileResumable<File> resumable) {
@@ -60,6 +64,18 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
         }
     }
 
+    private static class FailResumeStrategy extends TestResumeStrategy
+            implements UpdatableConsumerResumeStrategy<File, Long, Resumable<File, Long>> {
+        private boolean called;
+
+        @Override
+        public void updateLastOffset(Resumable<File, Long> offset) {
+            called = true;
+        }
+    }
+
+    private static final FailResumeStrategy FAIL_RESUME_STRATEGY = new FailResumeStrategy();
+
     @DisplayName("Tests whether it can resume from an offset")
     @Test
     public void testResumeFromOffset() throws Exception {
@@ -81,10 +97,15 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
     public void testMissingOffset() throws InterruptedException {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceivedInAnyOrder("34567890");
+        mock.expectedMessageCount(2);
 
         template.sendBodyAndHeader(fileUri("resumeMissingOffset"), "01234567890", Exchange.FILE_NAME, "resume-from-offset.txt");
 
-        mock.assertIsNotSatisfied();
+        MockEndpoint.assertWait(2, TimeUnit.SECONDS, mock);
+
+        List<Exchange> exchangeList = mock.getExchanges();
+        Assertions.assertFalse(exchangeList.isEmpty(), "It should have received a few messages");
+        Assertions.assertFalse(FAIL_RESUME_STRATEGY.called);
     }
 
     @DisplayName("Tests whether we can start from the beginning (i.e.: no resume strategy)")
@@ -106,16 +127,17 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
             public void configure() {
 
                 bindToRegistry("resumeStrategy", new TestResumeStrategy());
+                bindToRegistry("resumeNotToBeCalledStrategy", FAIL_RESUME_STRATEGY);
 
                 from(fileUri("resumeOff?noop=true&recursive=true"))
+                        .resumable().resumeStrategy("resumeStrategy")
                         .setHeader(Exchange.OFFSET,
                                 constant(Resumables.of("resume-none.txt", 3)))
-                        .resumable().resumeStrategy("resumeStrategy")
                         .log("${body}")
                         .convertBodyTo(String.class).to("mock:result");
 
                 from(fileUri("resumeMissingOffset?noop=true&recursive=true"))
-                        .resumable().resumeStrategy("resumeStrategy")
+                        .resumable().resumeStrategy("resumeNotToBeCalledStrategy")
                         .log("${body}")
                         .convertBodyTo(String.class).to("mock:result");
 
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
index 470a459..77e4e9d 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
@@ -102,8 +102,8 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport {
                 bindToRegistry("testResumeStrategy", new TestResumeStrategy());
 
                 from(fileUri("resume?noop=true&recursive=true"))
-                        .process(e -> setOffset(e))
                         .resumable().resumeStrategy("testResumeStrategy")
+                        .process(e -> setOffset(e))
                         .convertBodyTo(String.class)
                         .to("mock:result");
             }