You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/18 09:00:55 UTC

[camel] 13/23: CAMEL-13691: camel-resilience4j - WIP

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch CAMEL-13691
in repository https://gitbox.apache.org/repos/asf/camel.git

commit bc493f54ce930ee9d4d3ba8f94b80de1d3f9aab5
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Nov 17 12:41:07 2019 +0100

    CAMEL-13691: camel-resilience4j - WIP
---
 .../resilience4j/ResilienceProcessor.java          | 19 +++--
 .../resilience4j/ResilienceRouteRejectedTest.java  | 80 ++++++++++++++++++++++
 2 files changed, 94 insertions(+), 5 deletions(-)

diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 1b1a039..1f6dbe3 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -28,6 +28,7 @@ import java.util.function.Supplier;
 
 import io.github.resilience4j.bulkhead.Bulkhead;
 import io.github.resilience4j.bulkhead.BulkheadConfig;
+import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
 import io.github.resilience4j.circuitbreaker.CircuitBreaker;
 import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
 import io.github.resilience4j.timelimiter.TimeLimiter;
@@ -231,7 +232,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements CamelC
     }
 
     @ManagedOperation(description = "Transitions the state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing.")
-    public void transitionToForceOpenState() {
+    public void transitionToForcedOpenState() {
         if (circuitBreaker != null) {
             circuitBreaker.transitionToForcedOpenState();
         }
@@ -438,16 +439,27 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements CamelC
                     // the circuit breaker triggered a timeout (and there is no fallback) so lets mark the exchange as failed
                     exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
                     exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
+                    exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, false);
                     exchange.setProperty(CircuitBreakerConstants.RESPONSE_TIMED_OUT, true);
                     exchange.setException(throwable);
                     return exchange;
+                } else if (throwable instanceof CallNotPermittedException) {
+                    // the circuit breaker triggered a call rejected
+                    exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
+                    exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
+                    exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true);
+                    exchange.setProperty(CircuitBreakerConstants.RESPONSE_REJECTED, true);
+                    return exchange;
                 } else {
                     // throw exception so resilient4j know it was a failure
                     throw RuntimeExchangeException.wrapRuntimeException(throwable);
                 }
             }
 
-            // fallback route is handling the exception
+            // fallback route is handling the exception so its short-circuited
+            exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
+            exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true);
+            exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true);
 
             // store the last to endpoint as the failure endpoint
             if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
@@ -470,9 +482,6 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements CamelC
                 exchange.setException(e);
             }
 
-            exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
-            exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true);
-
             return exchange;
         }
     }
diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteRejectedTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteRejectedTest.java
new file mode 100644
index 0000000..80b87bd
--- /dev/null
+++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteRejectedTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.resilience4j;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class ResilienceRouteRejectedTest extends CamelTestSupport {
+
+    @Override
+    protected boolean useJmx() {
+        return true;
+    }
+
+    protected MBeanServer getMBeanServer() {
+        return context.getManagementStrategy().getManagementAgent().getMBeanServer();
+    }
+
+    @Test
+    public void testResilience() throws Exception {
+        // look inside jmx
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+
+        // context name
+        String name = context.getManagementName();
+
+        ObjectName on = ObjectName.getInstance("org.apache.camel:context=" + name + ",type=processors,name=\"myResilience\"");
+
+        // force it into open state
+        mbeanServer.invoke(on, "transitionToForcedOpenState", null, null);
+        String state = (String) mbeanServer.getAttribute(on, "CircuitBreakerState");
+        assertEquals("FORCED_OPEN", state);
+
+        // send message which should get rejected, so the message is not changed
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .circuitBreaker().id("myResilience")
+                        .to("direct:foo")
+                        .to("log:foo")
+                    .end()
+                    .to("log:result")
+                    .to("mock:result");
+
+                from("direct:foo")
+                    .transform().constant("Bye World");
+            }
+        };
+    }
+
+}