You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/01/17 09:59:56 UTC
[3/3] camel git commit: Fixed typo in names. This closes #1400
Fixed typo in names. This closes #1400
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/35137521
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/35137521
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/35137521
Branch: refs/heads/master
Commit: 351375214872adc7a16ac18839cf9a8552ad4dc8
Parents: 72438fd
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jan 17 10:26:38 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jan 17 10:26:38 2017 +0100
----------------------------------------------------------------------
.../impl/ThrottingExceptionHalfOpenHandler.java | 31 -----
.../ThrottlingExceptionHalfOpenHandler.java | 32 +++++
.../impl/ThrottlingExceptionRoutePolicy.java | 36 ++---
...ptionRoutePolicyHalfOpenHandlerSedaTest.java | 133 -------------------
...ExceptionRoutePolicyHalfOpenHandlerTest.java | 132 ------------------
...rottingExceptionRoutePolicyHalfOpenTest.java | 121 -----------------
.../camel/processor/ThrottleException.java | 38 ------
.../camel/processor/ThrottlingException.java | 38 ++++++
...ptionRoutePolicyHalfOpenHandlerSedaTest.java | 133 +++++++++++++++++++
...ExceptionRoutePolicyHalfOpenHandlerTest.java | 132 ++++++++++++++++++
...ottlingExceptionRoutePolicyHalfOpenTest.java | 121 +++++++++++++++++
.../ThrottlingExceptionRoutePolicyTest.java | 8 +-
12 files changed, 477 insertions(+), 478 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
deleted file mode 100644
index 82cff2c..0000000
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottingExceptionHalfOpenHandler.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.impl;
-
-/**
- * used by the {@link ThrottlingExceptionRoutePolicy} to allow custom code
- * to handle the half open circuit state and how to determine if a route
- * should be closed
- *
- */
-public interface ThrottingExceptionHalfOpenHandler {
- /**
- * check the state of the Camel route
- * @return true to close the route and false to leave open
- */
- boolean isReadyToBeClosed();
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java
new file mode 100644
index 0000000..d9d6269
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+/**
+ * Used by the {@link ThrottlingExceptionRoutePolicy} to allow custom code
+ * to handle the half open circuit state and how to determine if a route
+ * should be closed
+ *
+ */
+public interface ThrottlingExceptionHalfOpenHandler {
+
+ /**
+ * Check the state of the Camel route
+ * @return true to close the route and false to leave open
+ */
+ boolean isReadyToBeClosed();
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
index 04813b8..cda8018 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy}
+ * Modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy}
* this {@link RoutePolicy} will stop consuming from an endpoint based on the type of exceptions that are
* thrown and the threshold setting.
*
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
* to determine if the processes that cause the route to be open are now available
*/
public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implements CamelContextAware {
- private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicy.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicy.class);
private static final int STATE_CLOSED = 0;
private static final int STATE_HALF_OPEN = 1;
@@ -68,7 +68,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
// handler for half open circuit
// can be used instead of resuming route
// to check on resources
- private ThrottingExceptionHalfOpenHandler halfOpenHandler;
+ private ThrottlingExceptionHalfOpenHandler halfOpenHandler;
// stateful information
private Timer halfOpenTimer;
@@ -96,7 +96,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
@Override
public void onInit(Route route) {
- log.debug("initializing ThrottlingExceptionRoutePolicy route policy...");
+ LOG.debug("initializing ThrottlingExceptionRoutePolicy route policy...");
logState();
}
@@ -127,7 +127,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
boolean answer = false;
if (exchange.getException() != null) {
- log.debug("exception occured on route: checking to see if I handle that");
+ LOG.debug("exception occured on route: checking to see if I handle that");
if (throttledExceptions == null || throttledExceptions.isEmpty()) {
// if no exceptions defined then always fail
// (ie) assume we throttle on all exceptions
@@ -143,9 +143,9 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
}
}
- if (log.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
String exceptionName = exchange.getException() == null ? "none" : exchange.getException().getClass().getSimpleName();
- log.debug("hasFailed ({}) with Throttled Exception: {} for exchangeId: {}", answer, exceptionName, exchange.getExchangeId());
+ LOG.debug("hasFailed ({}) with Throttled Exception: {} for exchangeId: {}", answer, exceptionName, exchange.getExchangeId());
}
return answer;
}
@@ -157,31 +157,31 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
if (state.get() == STATE_CLOSED) {
if (failureLimitReached) {
- log.debug("opening circuit...");
+ LOG.debug("opening circuit...");
openCircuit(route);
}
} else if (state.get() == STATE_HALF_OPEN) {
if (failureLimitReached) {
- log.debug("opening circuit...");
+ LOG.debug("opening circuit...");
openCircuit(route);
} else {
- log.debug("closing circuit...");
+ LOG.debug("closing circuit...");
closeCircuit(route);
}
} else if (state.get() == STATE_OPEN) {
long elapsedTimeSinceOpened = System.currentTimeMillis() - openedAt;
if (halfOpenAfter <= elapsedTimeSinceOpened) {
- log.debug("checking an open circuit...");
+ LOG.debug("checking an open circuit...");
if (halfOpenHandler != null) {
if (halfOpenHandler.isReadyToBeClosed()) {
- log.debug("closing circuit...");
+ LOG.debug("closing circuit...");
closeCircuit(route);
} else {
- log.debug("opening circuit...");
+ LOG.debug("opening circuit...");
openCircuit(route);
}
} else {
- log.debug("half opening circuit...");
+ LOG.debug("half opening circuit...");
halfOpenCircuit(route);
}
}
@@ -254,8 +254,8 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
}
private void logState() {
- if (log.isDebugEnabled()) {
- log.debug(dumpState());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(dumpState());
}
}
@@ -293,11 +293,11 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
}
}
- public ThrottingExceptionHalfOpenHandler getHalfOpenHandler() {
+ public ThrottlingExceptionHalfOpenHandler getHalfOpenHandler() {
return halfOpenHandler;
}
- public void setHalfOpenHandler(ThrottingExceptionHalfOpenHandler halfOpenHandler) {
+ public void setHalfOpenHandler(ThrottlingExceptionHalfOpenHandler halfOpenHandler) {
this.halfOpenHandler = halfOpenHandler;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
deleted file mode 100644
index bace3a0..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest extends ContextTestSupport {
- private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerSedaTest.class);
-
- private String url = "seda:foo?concurrentConsumers=20";
- private MockEndpoint result;
-
- @Before
- public void setUp() throws Exception {
- super.setUp();
- this.setUseRouteBuilder(true);
- result = getMockEndpoint("mock:result");
-
- context.getShutdownStrategy().setTimeout(1);
- }
-
- @Test
- public void testHalfOpenCircuit() throws Exception {
- result.expectedMessageCount(2);
- List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
- result.expectedBodiesReceivedInAnyOrder(bodies);
-
- result.whenAnyExchangeReceived(new Processor() {
- @Override
- public void process(Exchange exchange) throws Exception {
- String msg = exchange.getIn().getBody(String.class);
- exchange.setException(new ThrottleException(msg));
- }
- });
-
- // send two messages which will fail
- sendMessage("Message One");
- sendMessage("Message Two");
-
- // wait long enough to
- // have the route shutdown
- Thread.sleep(3000);
-
- // send more messages
- // but should get there (yet)
- // due to open circuit
- // SEDA will queue it up
- log.debug("sending message three");
- sendMessage("Message Three");
-
- assertMockEndpointsSatisfied();
-
- result.reset();
- result.expectedMessageCount(2);
- bodies = Arrays.asList(new String[]{"Message Three", "Message Four"});
- result.expectedBodiesReceivedInAnyOrder(bodies);
-
- // wait long enough for
- // half open attempt
- Thread.sleep(4000);
-
- // send message
- // should get through
- log.debug("sending message four");
- sendMessage("Message Four");
-
- assertMockEndpointsSatisfied();
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- int threshold = 2;
- long failureWindow = 30;
- long halfOpenAfter = 5000;
- ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
- policy.setHalfOpenHandler(new AlwaysCloseHandler());
-
- from(url)
- .routePolicy(policy)
- .log("${body}")
- .to("log:foo?groupSize=10")
- .to("mock:result");
- }
- };
- }
-
- public class AlwaysCloseHandler implements ThrottingExceptionHalfOpenHandler {
-
- @Override
- public boolean isReadyToBeClosed() {
- return true;
- }
-
- }
-
- protected void sendMessage(String bodyText) {
- try {
- template.sendBody(url, bodyText);
- } catch (Exception e) {
- log.debug("Error sending:" + e.getCause().getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
deleted file mode 100644
index b572888..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenHandlerTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ThrottingExceptionRoutePolicyHalfOpenHandlerTest extends ContextTestSupport {
- private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenHandlerTest.class);
-
- private String url = "direct:start";
- private MockEndpoint result;
-
- @Before
- public void setUp() throws Exception {
- super.setUp();
- this.setUseRouteBuilder(true);
- result = getMockEndpoint("mock:result");
-
- context.getShutdownStrategy().setTimeout(1);
- }
-
- @Test
- public void testHalfOpenCircuit() throws Exception {
- result.expectedMessageCount(2);
- List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
- result.expectedBodiesReceivedInAnyOrder(bodies);
-
- result.whenAnyExchangeReceived(new Processor() {
- @Override
- public void process(Exchange exchange) throws Exception {
- String msg = exchange.getIn().getBody(String.class);
- exchange.setException(new ThrottleException(msg));
- }
- });
-
- // send two messages which will fail
- sendMessage("Message One");
- sendMessage("Message Two");
-
- // wait long enough to
- // have the route shutdown
- Thread.sleep(3000);
-
- // send more messages
- // but never should get there
- // due to open circuit
- log.debug("sending message three");
- sendMessage("Message Three");
-
- assertMockEndpointsSatisfied();
-
- result.reset();
- result.expectedMessageCount(1);
- bodies = Arrays.asList(new String[]{"Message Four"});
- result.expectedBodiesReceivedInAnyOrder(bodies);
-
- // wait long enough for
- // half open attempt
- Thread.sleep(4000);
-
- // send message
- // should get through
- log.debug("sending message four");
- sendMessage("Message Four");
-
- assertMockEndpointsSatisfied();
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- int threshold = 2;
- long failureWindow = 30;
- long halfOpenAfter = 5000;
- ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
- policy.setHalfOpenHandler(new AlwaysCloseHandler());
-
- from(url)
- .routePolicy(policy)
- .log("${body}")
- .to("log:foo?groupSize=10")
- .to("mock:result");
- }
- };
- }
-
- public class AlwaysCloseHandler implements ThrottingExceptionHalfOpenHandler {
-
- @Override
- public boolean isReadyToBeClosed() {
- return true;
- }
-
- }
-
- protected void sendMessage(String bodyText) {
- try {
- template.sendBody(url, bodyText);
- } catch (Exception e) {
- log.debug("Error sending:" + e.getCause().getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
deleted file mode 100644
index 8f4b993..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottingExceptionRoutePolicyHalfOpenTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ThrottingExceptionRoutePolicyHalfOpenTest extends ContextTestSupport {
- private static Logger log = LoggerFactory.getLogger(ThrottingExceptionRoutePolicyHalfOpenTest.class);
-
- private String url = "direct:start";
- private MockEndpoint result;
-
- @Before
- public void setUp() throws Exception {
- super.setUp();
- this.setUseRouteBuilder(true);
- result = getMockEndpoint("mock:result");
-
- context.getShutdownStrategy().setTimeout(1);
- }
-
- @Test
- public void testHalfOpenCircuit() throws Exception {
- result.reset();
- result.expectedMessageCount(2);
- List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
- result.expectedBodiesReceivedInAnyOrder(bodies);
-
- result.whenAnyExchangeReceived(new Processor() {
- @Override
- public void process(Exchange exchange) throws Exception {
- String msg = exchange.getIn().getBody(String.class);
- exchange.setException(new ThrottleException(msg));
- }
- });
-
- // send two messages which will fail
- sendMessage("Message One");
- sendMessage("Message Two");
-
- // wait long enough to
- // have the route shutdown
- Thread.sleep(3000);
-
- // send more messages
- // but never should get there
- // due to open circuit
- log.debug("sending message three");
- sendMessage("Message Three");
-
- assertMockEndpointsSatisfied();
-
- result.reset();
- result.expectedMessageCount(1);
- bodies = Arrays.asList(new String[]{"Message Four"});
- result.expectedBodiesReceivedInAnyOrder(bodies);
-
- // wait long enough for
- // half open attempt
- Thread.sleep(4000);
-
- // send message
- // should get through
- log.debug("sending message four");
- sendMessage("Message Four");
-
- assertMockEndpointsSatisfied();
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- int threshold = 2;
- long failureWindow = 30;
- long halfOpenAfter = 5000;
- ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
-
- from(url)
- .routePolicy(policy)
- .to("log:foo?groupSize=10")
- .to("mock:result");
- }
- };
- }
-
- protected void sendMessage(String bodyText) {
- try {
- template.sendBody(url, bodyText);
- } catch (Exception e) {
- log.debug("Error sending:" + e.getCause().getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
deleted file mode 100644
index e779907..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottleException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor;
-
-public class ThrottleException extends RuntimeException {
-
- private static final long serialVersionUID = 1993185881371058773L;
-
- public ThrottleException() {
- super();
- }
-
- public ThrottleException(String message) {
- super(message);
- }
-
- public ThrottleException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public ThrottleException(Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottlingException.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingException.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingException.java
new file mode 100644
index 0000000..101e9d1
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+public class ThrottlingException extends RuntimeException {
+
+ private static final long serialVersionUID = 1993185881371058773L;
+
+ public ThrottlingException() {
+ super();
+ }
+
+ public ThrottlingException(String message) {
+ super(message);
+ }
+
+ public ThrottlingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ThrottlingException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
new file mode 100644
index 0000000..625989a
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest extends ContextTestSupport {
+ private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.class);
+
+ private String url = "seda:foo?concurrentConsumers=20";
+ private MockEndpoint result;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.setUseRouteBuilder(true);
+ result = getMockEndpoint("mock:result");
+
+ context.getShutdownStrategy().setTimeout(1);
+ }
+
+ @Test
+ public void testHalfOpenCircuit() throws Exception {
+ result.expectedMessageCount(2);
+ List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ result.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String msg = exchange.getIn().getBody(String.class);
+ exchange.setException(new ThrottlingException(msg));
+ }
+ });
+
+ // send two messages which will fail
+ sendMessage("Message One");
+ sendMessage("Message Two");
+
+ // wait long enough to
+ // have the route shutdown
+ Thread.sleep(3000);
+
+ // send more messages
+ // but should get there (yet)
+ // due to open circuit
+ // SEDA will queue it up
+ log.debug("sending message three");
+ sendMessage("Message Three");
+
+ assertMockEndpointsSatisfied();
+
+ result.reset();
+ result.expectedMessageCount(2);
+ bodies = Arrays.asList(new String[]{"Message Three", "Message Four"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ // wait long enough for
+ // half open attempt
+ Thread.sleep(4000);
+
+ // send message
+ // should get through
+ log.debug("sending message four");
+ sendMessage("Message Four");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 5000;
+ ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+ policy.setHalfOpenHandler(new AlwaysCloseHandler());
+
+ from(url)
+ .routePolicy(policy)
+ .log("${body}")
+ .to("log:foo?groupSize=10")
+ .to("mock:result");
+ }
+ };
+ }
+
+ public class AlwaysCloseHandler implements ThrottlingExceptionHalfOpenHandler {
+
+ @Override
+ public boolean isReadyToBeClosed() {
+ return true;
+ }
+
+ }
+
+ protected void sendMessage(String bodyText) {
+ try {
+ template.sendBody(url, bodyText);
+ } catch (Exception e) {
+ log.debug("Error sending:" + e.getCause().getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java
new file mode 100644
index 0000000..6684b6e
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThrottlingExceptionRoutePolicyHalfOpenHandlerTest extends ContextTestSupport {
+ private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.class);
+
+ private String url = "direct:start";
+ private MockEndpoint result;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.setUseRouteBuilder(true);
+ result = getMockEndpoint("mock:result");
+
+ context.getShutdownStrategy().setTimeout(1);
+ }
+
+ @Test
+ public void testHalfOpenCircuit() throws Exception {
+ result.expectedMessageCount(2);
+ List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ result.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String msg = exchange.getIn().getBody(String.class);
+ exchange.setException(new ThrottlingException(msg));
+ }
+ });
+
+ // send two messages which will fail
+ sendMessage("Message One");
+ sendMessage("Message Two");
+
+ // wait long enough to
+ // have the route shutdown
+ Thread.sleep(3000);
+
+ // send more messages
+ // but never should get there
+ // due to open circuit
+ log.debug("sending message three");
+ sendMessage("Message Three");
+
+ assertMockEndpointsSatisfied();
+
+ result.reset();
+ result.expectedMessageCount(1);
+ bodies = Arrays.asList(new String[]{"Message Four"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ // wait long enough for
+ // half open attempt
+ Thread.sleep(4000);
+
+ // send message
+ // should get through
+ log.debug("sending message four");
+ sendMessage("Message Four");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 5000;
+ ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+ policy.setHalfOpenHandler(new AlwaysCloseHandler());
+
+ from(url)
+ .routePolicy(policy)
+ .log("${body}")
+ .to("log:foo?groupSize=10")
+ .to("mock:result");
+ }
+ };
+ }
+
+ public class AlwaysCloseHandler implements ThrottlingExceptionHalfOpenHandler {
+
+ @Override
+ public boolean isReadyToBeClosed() {
+ return true;
+ }
+
+ }
+
+ protected void sendMessage(String bodyText) {
+ try {
+ template.sendBody(url, bodyText);
+ } catch (Exception e) {
+ log.debug("Error sending:" + e.getCause().getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java
new file mode 100644
index 0000000..82c4f5e
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThrottlingExceptionRoutePolicyHalfOpenTest extends ContextTestSupport {
+ private static Logger log = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicyHalfOpenTest.class);
+
+ private String url = "direct:start";
+ private MockEndpoint result;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.setUseRouteBuilder(true);
+ result = getMockEndpoint("mock:result");
+
+ context.getShutdownStrategy().setTimeout(1);
+ }
+
+ @Test
+ public void testHalfOpenCircuit() throws Exception {
+ result.reset();
+ result.expectedMessageCount(2);
+ List<String> bodies = Arrays.asList(new String[]{"Message One", "Message Two"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ result.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String msg = exchange.getIn().getBody(String.class);
+ exchange.setException(new ThrottlingException(msg));
+ }
+ });
+
+ // send two messages which will fail
+ sendMessage("Message One");
+ sendMessage("Message Two");
+
+ // wait long enough to
+ // have the route shutdown
+ Thread.sleep(3000);
+
+ // send more messages
+ // but never should get there
+ // due to open circuit
+ log.debug("sending message three");
+ sendMessage("Message Three");
+
+ assertMockEndpointsSatisfied();
+
+ result.reset();
+ result.expectedMessageCount(1);
+ bodies = Arrays.asList(new String[]{"Message Four"});
+ result.expectedBodiesReceivedInAnyOrder(bodies);
+
+ // wait long enough for
+ // half open attempt
+ Thread.sleep(4000);
+
+ // send message
+ // should get through
+ log.debug("sending message four");
+ sendMessage("Message Four");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ int threshold = 2;
+ long failureWindow = 30;
+ long halfOpenAfter = 5000;
+ ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);
+
+ from(url)
+ .routePolicy(policy)
+ .to("log:foo?groupSize=10")
+ .to("mock:result");
+ }
+ };
+ }
+
+ protected void sendMessage(String bodyText) {
+ try {
+ template.sendBody(url, bodyText);
+ } catch (Exception e) {
+ log.debug("Error sending:" + e.getCause().getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/35137521/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
index 91f4606..a2e982a 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java
@@ -24,7 +24,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.ThrottingExceptionHalfOpenHandler;
+import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler;
import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
import org.junit.Before;
import org.junit.Test;
@@ -47,7 +47,6 @@ public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
context.getShutdownStrategy().setTimeout(1);
}
-
@Test
public void testThrottlingRoutePolicyClosed() throws Exception {
result.expectedMinimumMessageCount(size);
@@ -60,7 +59,6 @@ public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
assertMockEndpointsSatisfied();
}
-
@Test
public void testOpenCircuitToPreventMessageThree() throws Exception {
result.reset();
@@ -72,7 +70,7 @@ public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
@Override
public void process(Exchange exchange) throws Exception {
String msg = exchange.getIn().getBody(String.class);
- exchange.setException(new ThrottleException(msg));
+ exchange.setException(new ThrottlingException(msg));
}
});
@@ -115,7 +113,7 @@ public class ThrottlingExceptionRoutePolicyTest extends ContextTestSupport {
};
}
- public class NeverCloseHandler implements ThrottingExceptionHalfOpenHandler {
+ public class NeverCloseHandler implements ThrottlingExceptionHalfOpenHandler {
@Override
public boolean isReadyToBeClosed() {