You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bv...@apache.org on 2022/12/24 16:06:59 UTC

[camel] branch main updated: CAMEL-18835: OnCompletionProcessor#onFailure callback fires more than once

This is an automated email from the ASF dual-hosted git repository.

bvahdat pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 51b44036065 CAMEL-18835: OnCompletionProcessor#onFailure callback fires more than once
51b44036065 is described below

commit 51b440360653e72073ea30055d790d7477aebb2a
Author: Babak Vahdat <bv...@apache.org>
AuthorDate: Sat Dec 24 12:44:57 2022 +0100

    CAMEL-18835: OnCompletionProcessor#onFailure callback fires more than once
---
 .../camel/processor/OnCompletionProcessor.java     |  63 +++++++------
 .../model/RouteConfigurationOnCompletionTest.java  | 105 +++++++++++++++++++++
 2 files changed, 138 insertions(+), 30 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
index b7de659df48..58c708c29cc 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
@@ -217,7 +217,7 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
         Exchange answer;
 
         if (isCreateCopy()) {
-            // for asynchronous routing we must use a copy as we dont want it
+            // for asynchronous routing we must use a copy as we don't want it
             // to cause side effects of the original exchange
             // (the original thread will run in parallel)
             answer = ExchangeHelper.createCorrelatedCopy(exchange, false);
@@ -277,31 +277,12 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
         }
 
         @Override
-        @SuppressWarnings("unchecked")
         public void onComplete(final Exchange exchange) {
-            String currentRouteId = ExchangeHelper.getRouteId(exchange);
-            if (!routeScoped && currentRouteId != null && !routeId.equals(currentRouteId)) {
-                return;
-            }
-
-            if (routeScoped) {
-                // check if we visited the route
-                List<String> routeIds = exchange.getProperty(ExchangePropertyKey.ON_COMPLETION_ROUTE_IDS, List.class);
-                if (routeIds == null || !routeIds.contains(routeId)) {
-                    return;
-                }
-            }
-
-            if (onFailureOnly) {
+            if (shouldSkip(exchange, onFailureOnly)) {
                 return;
             }
 
-            if (onWhen != null && !onWhen.matches(exchange)) {
-                // predicate did not match so do not route the onComplete
-                return;
-            }
-
-            // must use a copy as we dont want it to cause side effects of the original exchange
+            // must use a copy as we don't want it to cause side effects of the original exchange
             final Exchange copy = prepareExchange(exchange);
 
             if (executorService != null) {
@@ -321,16 +302,11 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
 
         @Override
         public void onFailure(final Exchange exchange) {
-            if (onCompleteOnly) {
-                return;
-            }
-
-            if (onWhen != null && !onWhen.matches(exchange)) {
-                // predicate did not match so do not route the onComplete
+            if (shouldSkip(exchange, onCompleteOnly)) {
                 return;
             }
 
-            // must use a copy as we dont want it to cause side effects of the original exchange
+            // must use a copy as we don't want it to cause side effects of the original exchange
             final Exchange copy = prepareExchange(exchange);
             final Exception original = copy.getException();
             if (original != null) {
@@ -358,6 +334,33 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
             }
         }
 
+        @SuppressWarnings("unchecked")
+        private boolean shouldSkip(Exchange exchange, boolean onCompleteOrOnFailureOnly) {
+            String currentRouteId = ExchangeHelper.getRouteId(exchange);
+            if (!routeScoped && currentRouteId != null && !routeId.equals(currentRouteId)) {
+                return true;
+            }
+
+            if (routeScoped) {
+                // check if we visited the route
+                List<String> routeIds = exchange.getProperty(ExchangePropertyKey.ON_COMPLETION_ROUTE_IDS, List.class);
+                if (routeIds == null || !routeIds.contains(routeId)) {
+                    return true;
+                }
+            }
+
+            if (onCompleteOrOnFailureOnly) {
+                return true;
+            }
+
+            if (onWhen != null && !onWhen.matches(exchange)) {
+                // predicate did not match so do not route the onComplete
+                return true;
+            }
+
+            return false;
+        }
+
         @Override
         public String toString() {
             if (!onCompleteOnly && !onFailureOnly) {
@@ -432,7 +435,7 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
                 return;
             }
 
-            // must use a copy as we dont want it to cause side effects of the original exchange
+            // must use a copy as we don't want it to cause side effects of the original exchange
             final Exchange copy = prepareExchange(exchange);
 
             if (executorService != null) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/model/RouteConfigurationOnCompletionTest.java b/core/camel-core/src/test/java/org/apache/camel/model/RouteConfigurationOnCompletionTest.java
new file mode 100644
index 00000000000..a589ad5baf9
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/model/RouteConfigurationOnCompletionTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.builder.RouteConfigurationBuilder;
+import org.apache.camel.processor.OnCompletionTest;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class RouteConfigurationOnCompletionTest extends ContextTestSupport {
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        camelContext.addRoutes(new RouteConfigurationBuilder() {
+            @Override
+            public void configuration() throws Exception {
+                routeConfiguration().onCompletion().onCompleteOnly().to("log:ok").to("mock:ok");
+                routeConfiguration().onCompletion().onFailureOnly().to("log:fail").to("mock:fail");
+            }
+        });
+
+        return camelContext;
+    }
+
+    @Test
+    public void testOk() throws Exception {
+        getMockEndpoint("mock:ok").expectedMessageCount(1);
+        getMockEndpoint("mock:fail").expectedMessageCount(0);
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testFail() throws Exception {
+        getMockEndpoint("mock:ok").expectedMessageCount(0);
+        getMockEndpoint("mock:fail").expectedMessageCount(1);
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        try {
+            template.sendBody("direct:start", "Kabom");
+            fail("Should have thrown exception");
+        } catch (Exception e) {
+            // expected
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testOkAndFail() throws Exception {
+        getMockEndpoint("mock:ok").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:fail").expectedBodiesReceived("Kabom");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        template.sendBody("direct:start", "Hello World");
+        try {
+            template.sendBody("direct:start", "Kabom");
+            fail("Should throw exception");
+        } catch (Exception e) {
+            // expected
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:start")
+                        .to("direct:end");
+
+                from("direct:end")
+                        // CAMEL-18835: apply the processor by this route and not the one above to
+                        // enforce multiple calls to the OnCompletionProcessor#onFailure callback
+                        .process(new OnCompletionTest.MyProcessor())
+                        .to("mock:result");
+            }
+        };
+    }
+
+}