You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bv...@apache.org on 2022/12/24 11:45:34 UTC

[camel] branch CAMEL-18835 created (now fa4a7883c7e)

This is an automated email from the ASF dual-hosted git repository.

bvahdat pushed a change to branch CAMEL-18835
in repository https://gitbox.apache.org/repos/asf/camel.git


      at fa4a7883c7e CAMEL-18835: OnCompletionProcessor#onFailure callback fires more than once

This branch includes the following new commits:

     new fa4a7883c7e CAMEL-18835: OnCompletionProcessor#onFailure callback fires more than once

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel] 01/01: CAMEL-18835: OnCompletionProcessor#onFailure callback fires more than once

Posted by bv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bvahdat pushed a commit to branch CAMEL-18835
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fa4a7883c7e2e9f6b5febb97cf300fff5c459071
Author: Babak Vahdat <bv...@apache.org>
AuthorDate: Sat Dec 24 12:44:57 2022 +0100

    CAMEL-18835: OnCompletionProcessor#onFailure callback fires more than once
---
 .../camel/processor/OnCompletionProcessor.java     |  63 +++++++------
 .../model/RouteConfigurationOnCompletionTest.java  | 102 +++++++++++++++++++++
 2 files changed, 135 insertions(+), 30 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
index b7de659df48..58c708c29cc 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
@@ -217,7 +217,7 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
         Exchange answer;
 
         if (isCreateCopy()) {
-            // for asynchronous routing we must use a copy as we dont want it
+            // for asynchronous routing we must use a copy as we don't want it
             // to cause side effects of the original exchange
             // (the original thread will run in parallel)
             answer = ExchangeHelper.createCorrelatedCopy(exchange, false);
@@ -277,31 +277,12 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
         }
 
         @Override
-        @SuppressWarnings("unchecked")
         public void onComplete(final Exchange exchange) {
-            String currentRouteId = ExchangeHelper.getRouteId(exchange);
-            if (!routeScoped && currentRouteId != null && !routeId.equals(currentRouteId)) {
-                return;
-            }
-
-            if (routeScoped) {
-                // check if we visited the route
-                List<String> routeIds = exchange.getProperty(ExchangePropertyKey.ON_COMPLETION_ROUTE_IDS, List.class);
-                if (routeIds == null || !routeIds.contains(routeId)) {
-                    return;
-                }
-            }
-
-            if (onFailureOnly) {
+            if (shouldSkip(exchange, onFailureOnly)) {
                 return;
             }
 
-            if (onWhen != null && !onWhen.matches(exchange)) {
-                // predicate did not match so do not route the onComplete
-                return;
-            }
-
-            // must use a copy as we dont want it to cause side effects of the original exchange
+            // must use a copy as we don't want it to cause side effects of the original exchange
             final Exchange copy = prepareExchange(exchange);
 
             if (executorService != null) {
@@ -321,16 +302,11 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
 
         @Override
         public void onFailure(final Exchange exchange) {
-            if (onCompleteOnly) {
-                return;
-            }
-
-            if (onWhen != null && !onWhen.matches(exchange)) {
-                // predicate did not match so do not route the onComplete
+            if (shouldSkip(exchange, onCompleteOnly)) {
                 return;
             }
 
-            // must use a copy as we dont want it to cause side effects of the original exchange
+            // must use a copy as we don't want it to cause side effects of the original exchange
             final Exchange copy = prepareExchange(exchange);
             final Exception original = copy.getException();
             if (original != null) {
@@ -358,6 +334,33 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
             }
         }
 
+        @SuppressWarnings("unchecked")
+        private boolean shouldSkip(Exchange exchange, boolean onCompleteOrOnFailureOnly) {
+            String currentRouteId = ExchangeHelper.getRouteId(exchange);
+            if (!routeScoped && currentRouteId != null && !routeId.equals(currentRouteId)) {
+                return true;
+            }
+
+            if (routeScoped) {
+                // check if we visited the route
+                List<String> routeIds = exchange.getProperty(ExchangePropertyKey.ON_COMPLETION_ROUTE_IDS, List.class);
+                if (routeIds == null || !routeIds.contains(routeId)) {
+                    return true;
+                }
+            }
+
+            if (onCompleteOrOnFailureOnly) {
+                return true;
+            }
+
+            if (onWhen != null && !onWhen.matches(exchange)) {
+                // predicate did not match so do not route the onComplete
+                return true;
+            }
+
+            return false;
+        }
+
         @Override
         public String toString() {
             if (!onCompleteOnly && !onFailureOnly) {
@@ -432,7 +435,7 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
                 return;
             }
 
-            // must use a copy as we dont want it to cause side effects of the original exchange
+            // must use a copy as we don't want it to cause side effects of the original exchange
             final Exchange copy = prepareExchange(exchange);
 
             if (executorService != null) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/model/RouteConfigurationOnCompletionTest.java b/core/camel-core/src/test/java/org/apache/camel/model/RouteConfigurationOnCompletionTest.java
new file mode 100644
index 00000000000..2dddb921b73
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/model/RouteConfigurationOnCompletionTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.OnCompletionTest;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class RouteConfigurationOnCompletionTest extends ContextTestSupport {
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        RouteConfigurationDefinition routeConfigurationDefinition = new RouteConfigurationDefinition();
+        routeConfigurationDefinition.onCompletion().onCompleteOnly().to("log:ok").to("mock:ok");
+        routeConfigurationDefinition.onCompletion().onFailureOnly().to("log:fail").to("mock:fail");
+
+        CamelContext camelContext = super.createCamelContext();
+        camelContext.adapt(ModelCamelContext.class).addRouteConfiguration(routeConfigurationDefinition);
+
+        return camelContext;
+    }
+
+    @Test
+    public void testOk() throws Exception {
+        getMockEndpoint("mock:ok").expectedMessageCount(1);
+        getMockEndpoint("mock:fail").expectedMessageCount(0);
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testFail() throws Exception {
+        getMockEndpoint("mock:ok").expectedMessageCount(0);
+        getMockEndpoint("mock:fail").expectedMessageCount(1);
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        try {
+            template.sendBody("direct:start", "Kabom");
+            fail("Should have thrown exception");
+        } catch (Exception e) {
+            // expected
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testOkAndFail() throws Exception {
+        getMockEndpoint("mock:ok").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:fail").expectedBodiesReceived("Kabom");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        template.sendBody("direct:start", "Hello World");
+        try {
+            template.sendBody("direct:start", "Kabom");
+            fail("Should throw exception");
+        } catch (Exception e) {
+            // expected
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:start")
+                        .to("direct:end");
+
+                from("direct:end")
+                        // CAMEL-18835: apply the processor by this route and not the one above to
+                        // enforce multiple calls to the OnCompletionProcessor#onFailure callback
+                        .process(new OnCompletionTest.MyProcessor())
+                        .to("mock:result");
+            }
+        };
+    }
+
+}