You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/18 09:00:55 UTC
[camel] 13/23: CAMEL-13691: camel-resilience4j - WIP
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch CAMEL-13691
in repository https://gitbox.apache.org/repos/asf/camel.git
commit bc493f54ce930ee9d4d3ba8f94b80de1d3f9aab5
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Nov 17 12:41:07 2019 +0100
CAMEL-13691: camel-resilience4j - WIP
---
.../resilience4j/ResilienceProcessor.java | 19 +++--
.../resilience4j/ResilienceRouteRejectedTest.java | 80 ++++++++++++++++++++++
2 files changed, 94 insertions(+), 5 deletions(-)
diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 1b1a039..1f6dbe3 100644
--- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -28,6 +28,7 @@ import java.util.function.Supplier;
import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
+import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.timelimiter.TimeLimiter;
@@ -231,7 +232,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements CamelC
}
@ManagedOperation(description = "Transitions the state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing.")
- public void transitionToForceOpenState() {
+ public void transitionToForcedOpenState() {
if (circuitBreaker != null) {
circuitBreaker.transitionToForcedOpenState();
}
@@ -438,16 +439,27 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements CamelC
// the circuit breaker triggered a timeout (and there is no fallback) so lets mark the exchange as failed
exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, false);
exchange.setProperty(CircuitBreakerConstants.RESPONSE_TIMED_OUT, true);
exchange.setException(throwable);
return exchange;
+ } else if (throwable instanceof CallNotPermittedException) {
+ // the circuit breaker triggered a call rejected
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true);
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_REJECTED, true);
+ return exchange;
} else {
// throw exception so resilient4j know it was a failure
throw RuntimeExchangeException.wrapRuntimeException(throwable);
}
}
- // fallback route is handling the exception
+ // fallback route is handling the exception so its short-circuited
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true);
+ exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true);
// store the last to endpoint as the failure endpoint
if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
@@ -470,9 +482,6 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements CamelC
exchange.setException(e);
}
- exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false);
- exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true);
-
return exchange;
}
}
diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteRejectedTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteRejectedTest.java
new file mode 100644
index 0000000..80b87bd
--- /dev/null
+++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteRejectedTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.resilience4j;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class ResilienceRouteRejectedTest extends CamelTestSupport {
+
+ @Override
+ protected boolean useJmx() {
+ return true;
+ }
+
+ protected MBeanServer getMBeanServer() {
+ return context.getManagementStrategy().getManagementAgent().getMBeanServer();
+ }
+
+ @Test
+ public void testResilience() throws Exception {
+ // look inside jmx
+ // get the stats for the route
+ MBeanServer mbeanServer = getMBeanServer();
+
+ // context name
+ String name = context.getManagementName();
+
+ ObjectName on = ObjectName.getInstance("org.apache.camel:context=" + name + ",type=processors,name=\"myResilience\"");
+
+ // force it into open state
+ mbeanServer.invoke(on, "transitionToForcedOpenState", null, null);
+ String state = (String) mbeanServer.getAttribute(on, "CircuitBreakerState");
+ assertEquals("FORCED_OPEN", state);
+
+ // send message which should get rejected, so the message is not changed
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .circuitBreaker().id("myResilience")
+ .to("direct:foo")
+ .to("log:foo")
+ .end()
+ .to("log:result")
+ .to("mock:result");
+
+ from("direct:foo")
+ .transform().constant("Bye World");
+ }
+ };
+ }
+
+}