You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/18 16:05:53 UTC
svn commit: r956006 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/model/
main/java/org/apache/camel/processor/loadbalancer/
main/java/org/apache/camel/util/ test/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/async/
Author: davsclaus
Date: Fri Jun 18 14:05:52 2010
New Revision: 956006
URL: http://svn.apache.org/viewvc?rev=956006&view=rev
Log:
CAMEL-2723: Load balancers now support async routing engine. Well the topic could be improved to support it for real. But the others do it nicely.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDualMulticastTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDualTopicLoadBalanceTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRoundRobinLoadBalanceTest.java (contents, props changed)
- copied, changed from r955924, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSingleTopicLoadBalanceTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomLoadBalanceTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java?rev=956006&r1=956005&r2=956006&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalancerDefinition.java Fri Jun 18 14:05:52 2010
@@ -22,6 +22,7 @@ import javax.xml.bind.annotation.XmlAcce
import javax.xml.bind.annotation.XmlTransient;
import javax.xml.bind.annotation.XmlType;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.processor.loadbalancer.LoadBalancer;
@@ -128,4 +129,18 @@ public class LoadBalancerDefinition exte
loadBalancer.process(exchange);
}
+ public boolean process(Exchange exchange, final AsyncCallback callback) {
+ ObjectHelper.notNull(loadBalancer, "loadBalancer");
+ return loadBalancer.process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ // only handle the async case
+ if (doneSync) {
+ return;
+ } else {
+ callback.done(false);
+ }
+ }
+ });
+ }
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=956006&r1=956005&r2=956006&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java Fri Jun 18 14:05:52 2010
@@ -18,8 +18,11 @@ package org.apache.camel.processor.loadb
import java.util.List;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.util.ObjectHelper;
/**
@@ -93,9 +96,11 @@ public class FailOverLoadBalancer extend
return false;
}
- public void process(Exchange exchange) throws Exception {
- List<Processor> list = getProcessors();
- if (list.isEmpty()) {
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ boolean sync;
+
+ List<Processor> processors = getProcessors();
+ if (processors.isEmpty()) {
throw new IllegalStateException("No processors available to process " + exchange);
}
@@ -104,7 +109,7 @@ public class FailOverLoadBalancer extend
// pick the first endpoint to use
if (isRoundRobin()) {
- if (++counter >= list.size()) {
+ if (++counter >= processors.size()) {
counter = 0;
}
index = counter;
@@ -113,10 +118,24 @@ public class FailOverLoadBalancer extend
log.debug("Failover starting with endpoint index " + index);
}
- Processor processor = list.get(index);
+ Processor processor = processors.get(index);
+
+ // process the first time, which indicate if we should continue synchronously or not
+ sync = processExchange(processor, exchange, attempts, index, callback, processors);
+
+ // continue as long its being processed synchronously
+ if (!sync) {
+ if (log.isTraceEnabled()) {
+ log.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
+ }
+ // the remainder of the failover will be completed async
+ // so we break out now, then the callback will be invoked which then continue routing from where we left here
+ return false;
+ }
- // process the first time
- processExchange(processor, exchange, attempts);
+ if (log.isTraceEnabled()) {
+ log.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed synchronously");
+ }
// loop while we should fail over
while (shouldFailOver(exchange)) {
@@ -132,7 +151,7 @@ public class FailOverLoadBalancer extend
index++;
counter++;
- if (index >= list.size()) {
+ if (index >= processors.size()) {
// out of bounds
if (isRoundRobin()) {
log.debug("Failover is round robin enabled and therefore starting from the first endpoint");
@@ -147,9 +166,26 @@ public class FailOverLoadBalancer extend
// try again but prepare exchange before we failover
prepareExchangeForFailover(exchange);
- processor = list.get(index);
- processExchange(processor, exchange, attempts);
+ processor = processors.get(index);
+ sync = processExchange(processor, exchange, attempts, index, callback, processors);
+
+ // continue as long its being processed synchronously
+ if (!sync) {
+ if (log.isTraceEnabled()) {
+ log.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
+ }
+ // the remainder of the failover will be completed async
+ // so we break out now, then the callback will be invoked which then continue routing from where we left here
+ return false;
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed synchronously");
+ }
}
+
+ callback.done(true);
+ return true;
}
/**
@@ -167,17 +203,82 @@ public class FailOverLoadBalancer extend
exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
}
- private void processExchange(Processor processor, Exchange exchange, int attempt) {
+ private boolean processExchange(final Processor processor, final Exchange exchange,
+ final int attempts, final int index, final AsyncCallback callback, final List<Processor> processors) {
+ boolean sync;
+
if (processor == null) {
throw new IllegalStateException("No processors could be chosen to process " + exchange);
}
- try {
- if (log.isDebugEnabled()) {
- log.debug("Processing failover at attempt " + attempt + " for exchange: " + exchange);
- }
- processor.process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
+ if (log.isDebugEnabled()) {
+ log.debug("Processing failover at attempt " + attempts + " for exchange: " + exchange);
+ }
+
+ AsyncProcessor albp = AsyncProcessorTypeConverter.convert(processor);
+ sync = albp.process(exchange, new FailOverAsyncCallback(exchange, attempts, index, callback, processors));
+
+ return sync;
+ }
+
+ /**
+ * Failover logic to be executed asynchronously if one of the failover endpoints
+ * is a real {@link AsyncProcessor}.
+ */
+ private final class FailOverAsyncCallback implements AsyncCallback {
+
+ private final Exchange exchange;
+ private int attempts;
+ private int index;
+ private final AsyncCallback callback;
+ private final List<Processor> processors;
+
+ private FailOverAsyncCallback(Exchange exchange, int attempts, int index, AsyncCallback callback, List<Processor> processors) {
+ this.exchange = exchange;
+ this.attempts = attempts;
+ this.index = index;
+ this.callback = callback;
+ this.processors = processors;
+ }
+
+ public void done(boolean doneSync) {
+ // should we failover?
+ if (shouldFailOver(exchange)) {
+ attempts++;
+ // are we exhausted by attempts?
+ if (maximumFailoverAttempts > -1 && attempts > maximumFailoverAttempts) {
+ if (log.isDebugEnabled()) {
+ log.debug("Braking out of failover after " + attempts + " failover attempts");
+ }
+ callback.done(false);
+ }
+
+ index++;
+ counter++;
+
+ if (index >= processors.size()) {
+ // out of bounds
+ if (isRoundRobin()) {
+ log.debug("Failover is round robin enabled and therefore starting from the first endpoint");
+ index = 0;
+ counter = 0;
+ } else {
+ // no more processors to try
+ log.debug("Braking out of failover as we reach the end of endpoints to use for failover");
+ callback.done(false);
+ }
+ }
+
+ // try again but prepare exchange before we failover
+ prepareExchangeForFailover(exchange);
+ Processor processor = processors.get(index);
+
+ // try to failover using the next processor
+ AsyncProcessor albp = AsyncProcessorTypeConverter.convert(processor);
+ albp.process(exchange, this);
+ } else {
+ // we are done doing failover
+ callback.done(doneSync);
+ }
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java?rev=956006&r1=956005&r2=956006&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java Fri Jun 18 14:05:52 2010
@@ -18,6 +18,7 @@ package org.apache.camel.processor.loadb
import java.util.List;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Processor;
/**
@@ -25,9 +26,7 @@ import org.apache.camel.Processor;
*
* @version $Revision$
*/
-public interface LoadBalancer extends Processor {
-
- // TODO: Should leverage AsyncProcessor
+public interface LoadBalancer extends AsyncProcessor {
/**
* Adds a new processor to the load balancer
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java?rev=956006&r1=956005&r2=956006&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java Fri Jun 18 14:05:52 2010
@@ -20,9 +20,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -70,4 +72,8 @@ public abstract class LoadBalancerSuppor
removeProcessor(processor);
}
}
+
+ public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java?rev=956006&r1=956005&r2=956006&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java Fri Jun 18 14:05:52 2010
@@ -18,8 +18,12 @@ package org.apache.camel.processor.loadb
import java.util.List;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.util.AsyncProcessorHelper;
/**
* A base class for {@link LoadBalancer} implementations which choose a single
@@ -29,7 +33,9 @@ import org.apache.camel.Processor;
*/
public abstract class QueueLoadBalancer extends LoadBalancerSupport {
- public void process(Exchange exchange) throws Exception {
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
+ boolean sync;
+
List<Processor> list = getProcessors();
if (list.isEmpty()) {
throw new IllegalStateException("No processors available to process " + exchange);
@@ -38,8 +44,23 @@ public abstract class QueueLoadBalancer
if (processor == null) {
throw new IllegalStateException("No processors could be chosen to process " + exchange);
} else {
- processor.process(exchange);
+ AsyncProcessor albp = AsyncProcessorTypeConverter.convert(processor);
+ sync = albp.process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ // only handle the async case
+ if (doneSync) {
+ return;
+ }
+ callback.done(false);
+ }
+ });
}
+
+ return sync;
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
}
protected abstract Processor chooseProcessor(List<Processor> processors, Exchange exchange);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java?rev=956006&r1=956005&r2=956006&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java Fri Jun 18 14:05:52 2010
@@ -18,30 +18,43 @@ package org.apache.camel.processor.loadb
import java.util.List;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
/**
* A {@link LoadBalancer} implementations which sends to all destinations
- * (rather like JMS Topics).
- *
+ * (rather like JMS Topics).
+ * <p/>
+ * The {@link org.apache.camel.processor.MulticastProcessor} is more powerful as it offers
+ * option to run in parallel and decide whether or not to stop on failure etc.
+ *
* @version $Revision$
*/
public class TopicLoadBalancer extends LoadBalancerSupport {
- public void process(Exchange exchange) throws Exception {
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
List<Processor> list = getProcessors();
+ // too hard to do multiple async, so we do it sync
for (Processor processor : list) {
- Exchange copy = copyExchangeStrategy(processor, exchange);
- processor.process(copy);
+ try {
+ Exchange copy = copyExchangeStrategy(processor, exchange);
+ processor.process(copy);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ // stop on failure
+ break;
+ }
}
+ callback.done(true);
+ return true;
}
/**
* Strategy method to copy the exchange before sending to another endpoint.
* Derived classes such as the {@link org.apache.camel.processor.Pipeline Pipeline}
* will not clone the exchange
- *
+ *
* @param processor the processor that will send the exchange
* @param exchange the exchange
* @return the current exchange if no copying is required such as for a
@@ -54,5 +67,4 @@ public class TopicLoadBalancer extends L
public String toString() {
return "TopicLoadBalancer";
}
-
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java?rev=956006&r1=956005&r2=956006&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java Fri Jun 18 14:05:52 2010
@@ -40,7 +40,7 @@ public final class AsyncProcessorHelper
* for it to complete before returning. This can be used by {@link AsyncProcessor}
* objects to implement their sync version of the process method.
*/
- public static void process(AsyncProcessor processor, Exchange exchange) throws Exception {
+ public static void process(final AsyncProcessor processor, final Exchange exchange) throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
boolean sync = processor.process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
@@ -48,6 +48,11 @@ public final class AsyncProcessorHelper
latch.countDown();
}
}
+
+ @Override
+ public String toString() {
+ return "Done " + processor;
+ }
});
if (!sync) {
latch.await();
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomLoadBalanceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomLoadBalanceTest.java?rev=956006&r1=956005&r2=956006&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomLoadBalanceTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomLoadBalanceTest.java Fri Jun 18 14:05:52 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
@@ -65,15 +66,21 @@ public class CustomLoadBalanceTest exten
private class MyLoadBalancer extends LoadBalancerSupport {
- public void process(Exchange exchange) throws Exception {
+ public boolean process(Exchange exchange, AsyncCallback callback) {
String body = exchange.getIn().getBody(String.class);
- if ("x".equals(body)) {
- getProcessors().get(0).process(exchange);
- } else if ("y".equals(body)) {
- getProcessors().get(1).process(exchange);
- } else {
- getProcessors().get(2).process(exchange);
+ try {
+ if ("x".equals(body)) {
+ getProcessors().get(0).process(exchange);
+ } else if ("y".equals(body)) {
+ getProcessors().get(1).process(exchange);
+ } else {
+ getProcessors().get(2).process(exchange);
+ }
+ } catch (Throwable e) {
+ exchange.setException(e);
}
+ callback.done(true);
+ return true;
}
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDualMulticastTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDualMulticastTest.java?rev=956006&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDualMulticastTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDualMulticastTest.java Fri Jun 18 14:05:52 2010
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointDualMulticastTest extends ContextTestSupport {
+
+ public void testAsyncEndpoint() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+ String reply = template.requestBody("direct:start", "Hello Camel", String.class);
+ assertEquals("Bye Camel", reply);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ from("direct:start")
+ .to("mock:before")
+ .to("log:before")
+ .multicast()
+ .to("async:Hi Camel").to("async:Bye Camel")
+ .end()
+ // fully asynchronous support for multicasting is too complex
+ // so the main thread will block and wait for both to complete
+ // before it routes to the end
+ .to("log:after")
+ .to("mock:after")
+ .to("mock:result");
+ }
+ };
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDualMulticastTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDualMulticastTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDualTopicLoadBalanceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDualTopicLoadBalanceTest.java?rev=956006&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDualTopicLoadBalanceTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDualTopicLoadBalanceTest.java Fri Jun 18 14:05:52 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointDualTopicLoadBalanceTest extends ContextTestSupport {
+
+ public void testAsyncEndpoint() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello Camel");
+
+ template.sendBody("direct:start", "Hello Camel");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ from("direct:start")
+ .to("mock:before")
+ .to("log:before")
+ .loadBalance()
+ .topic()
+ .to("async:Bye Camel", "async:Bye World")
+ .end()
+ .to("log:after")
+ .to("mock:result");
+ }
+ };
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDualTopicLoadBalanceTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDualTopicLoadBalanceTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceTest.java?rev=956006&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceTest.java Fri Jun 18 14:05:52 2010
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointFailOverLoadBalanceTest extends ContextTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ public void testAsyncEndpoint() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:fail").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+ String reply = template.requestBody("direct:start", "Hello Camel", String.class);
+ assertEquals("Bye World", reply);
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ from("direct:start")
+ .to("mock:before")
+ .to("log:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ })
+ .loadBalance()
+ .failover()
+ // the last would succeed
+ // and make it complex by having a direct endpoint which is not a real async processor
+ .to("async:Bye Camel?failFirstAttempts=5", "direct:fail", "async:Bye Moon?failFirstAttempts=5", "async:Bye World")
+ .end()
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("log:after")
+ .to("mock:after")
+ .to("mock:result");
+
+ from("direct:fail")
+ .to("log:fail")
+ .to("mock:fail")
+ .throwException(new IllegalArgumentException("Damn"));
+ }
+ };
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRoundRobinLoadBalanceTest.java (from r955924, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRoundRobinLoadBalanceTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRoundRobinLoadBalanceTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=955924&r2=956006&rev=956006&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRoundRobinLoadBalanceTest.java Fri Jun 18 14:05:52 2010
@@ -24,19 +24,22 @@ import org.apache.camel.builder.RouteBui
/**
* @version $Revision$
*/
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointRoundRobinLoadBalanceTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
public void testAsyncEndpoint() throws Exception {
- getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
- getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
- getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel", "Hello World");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel", "Bye World");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel", "Bye World");
String reply = template.requestBody("direct:start", "Hello Camel", String.class);
assertEquals("Bye Camel", reply);
+ reply = template.requestBody("direct:start", "Hello World", String.class);
+ assertEquals("Bye World", reply);
+
assertMockEndpointsSatisfied();
assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
@@ -57,7 +60,10 @@ public class AsyncEndpointTest extends C
beforeThreadName = Thread.currentThread().getName();
}
})
- .to("async:Bye Camel")
+ .loadBalance()
+ .roundRobin()
+ .to("async:Bye Camel", "async:Bye World")
+ .end()
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
afterThreadName = Thread.currentThread().getName();
@@ -70,4 +76,4 @@ public class AsyncEndpointTest extends C
};
}
-}
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRoundRobinLoadBalanceTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRoundRobinLoadBalanceTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSingleTopicLoadBalanceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSingleTopicLoadBalanceTest.java?rev=956006&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSingleTopicLoadBalanceTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSingleTopicLoadBalanceTest.java Fri Jun 18 14:05:52 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointSingleTopicLoadBalanceTest extends ContextTestSupport {
+
+ public void testAsyncEndpoint() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello Camel");
+
+ template.sendBody("direct:start", "Hello Camel");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ from("direct:start")
+ .to("mock:before")
+ .to("log:before")
+ .loadBalance()
+ .topic()
+ .to("async:Bye Camel")
+ .end()
+ .to("log:after")
+ .to("mock:result");
+ }
+ };
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSingleTopicLoadBalanceTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSingleTopicLoadBalanceTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date