You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/19 14:01:02 UTC
svn commit: r956216 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/
main/java/org/apache/camel/processor/loadbalancer/
test/java/org/apache/camel/processor/async/
Author: davsclaus
Date: Sat Jun 19 12:01:02 2010
New Revision: 956216
URL: http://svn.apache.org/viewvc?rev=956216&view=rev
Log:
CAMEL-2723: Reworked failover LB as to mirror the pipeline as its just a specialized Pipeline. This ensures the async routing engine works flawless. And on top others can understand and maintain the code. I was about to give beers if you could understand the previous code :)
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed3Test.java
- copied, changed from r956212, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=956216&r1=956215&r2=956216&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Sat Jun 19 12:01:02 2010
@@ -181,6 +181,9 @@ public class Pipeline extends MulticastP
nextExchange = createNextExchange(nextExchange);
sync = process(original, nextExchange, callback, processors, processor);
if (!sync) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
+ }
return;
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=956216&r1=956215&r2=956216&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java Sat Jun 19 12:01:02 2010
@@ -28,6 +28,10 @@ import org.apache.camel.util.ObjectHelpe
/**
* This FailOverLoadBalancer will failover to use next processor when an exception occurred
+ * <p/>
+ * This implementation mirrors the logic from the {@link org.apache.camel.processor.Pipeline} in the async variation
+ * as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the
+ * pipeline to ensure it works the same and the async routing engine is flawless.
*/
public class FailOverLoadBalancer extends LoadBalancerSupport {
@@ -45,6 +49,7 @@ public class FailOverLoadBalancer extend
public FailOverLoadBalancer(List<Class<?>> exceptions) {
this.exceptions = exceptions;
+ // validate its all exception types
for (Class<?> type : exceptions) {
if (!ObjectHelper.isAssignableFrom(Throwable.class, type)) {
throw new IllegalArgumentException("Class is not an instance of Throwable: " + type);
@@ -98,17 +103,13 @@ public class FailOverLoadBalancer extend
}
public boolean process(Exchange exchange, AsyncCallback callback) {
- boolean sync;
-
- List<Processor> processors = getProcessors();
- if (processors.isEmpty()) {
- throw new IllegalStateException("No processors available to process " + exchange);
- }
+ final List<Processor> processors = getProcessors();
final AtomicInteger index = new AtomicInteger();
final AtomicInteger attempts = new AtomicInteger();
+ boolean first = true;
- // pick the first endpoint to use
+ // get the next processor
if (isRoundRobin()) {
if (counter.incrementAndGet() >= processors.size()) {
counter.set(0);
@@ -119,19 +120,64 @@ public class FailOverLoadBalancer extend
log.debug("Failover starting with endpoint index " + index);
}
- Processor processor = processors.get(index.get());
+ while (first || shouldFailOver(exchange)) {
+ if (!first) {
+ attempts.incrementAndGet();
+ // are we exhausted by attempts?
+ if (maximumFailoverAttempts > -1 && attempts.get() > maximumFailoverAttempts) {
+ if (log.isDebugEnabled()) {
+ log.debug("Braking out of failover after " + attempts + " failover attempts");
+ }
+ break;
+ }
+
+ index.incrementAndGet();
+ counter.incrementAndGet();
+ } else {
+ // flip first switch
+ first = false;
+ }
+
+ if (index.get() >= processors.size()) {
+ // out of bounds
+ if (isRoundRobin()) {
+ log.debug("Failover is round robin enabled and therefore starting from the first endpoint");
+ index.set(0);
+ counter.set(0);
+ } else {
+ // no more processors to try
+ log.debug("Braking out of failover as we reach the end of endpoints to use for failover");
+ break;
+ }
+ }
- // process the failover
- sync = processExchange(processor, exchange, attempts, index, callback, processors);
+ // try again but prepare exchange before we failover
+ prepareExchangeForFailover(exchange);
+ Processor processor = processors.get(index.get());
+
+ // process the exchange
+ boolean sync = processExchange(processor, exchange, attempts, index, callback, processors);
+
+ // continue as long its being processed synchronously
+ if (!sync) {
+ if (log.isTraceEnabled()) {
+ log.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
+ }
+ // the remainder of the pipeline will be completed async
+ // so we break out now, then the callback will be invoked which then continue routing from where we left here
+ return false;
+ }
- // continue as long its being processed synchronously
- if (!sync) {
if (log.isTraceEnabled()) {
- log.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
+ log.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed synchronously");
}
- // the remainder of the failover will be completed async
- // so we break out now, then the callback will be invoked which then continue routing from where we left here
- return false;
+ }
+
+ if (log.isTraceEnabled()) {
+ // logging nextExchange as it contains the exchange that might have altered the payload and since
+ // we are logging the completion if will be confusing if we log the original instead
+ // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
+ log.trace("Failover complete for exchangeId: " + exchange.getExchangeId() + " >>> " + exchange);
}
callback.done(true);
@@ -164,8 +210,7 @@ public class FailOverLoadBalancer extend
}
AsyncProcessor albp = AsyncProcessorTypeConverter.convert(processor);
- boolean sync = albp.process(exchange, new FailOverAsyncCallback(exchange, attempts, index, callback, processors));
- return sync;
+ return albp.process(exchange, new FailOverAsyncCallback(exchange, attempts, index, callback, processors));
}
/**
@@ -189,16 +234,19 @@ public class FailOverLoadBalancer extend
}
public void done(boolean doneSync) {
- // should we failover?
- if (shouldFailOver(exchange)) {
+ // we only have to handle async completion of the pipeline
+ if (doneSync) {
+ return;
+ }
+
+ while (shouldFailOver(exchange)) {
attempts.incrementAndGet();
// are we exhausted by attempts?
if (maximumFailoverAttempts > -1 && attempts.get() > maximumFailoverAttempts) {
if (log.isDebugEnabled()) {
log.debug("Braking out of failover after " + attempts + " failover attempts");
}
- callback.done(doneSync);
- return;
+ break;
}
index.incrementAndGet();
@@ -213,8 +261,7 @@ public class FailOverLoadBalancer extend
} else {
// no more processors to try
log.debug("Braking out of failover as we reach the end of endpoints to use for failover");
- callback.done(doneSync);
- return;
+ break;
}
}
@@ -223,13 +270,20 @@ public class FailOverLoadBalancer extend
Processor processor = processors.get(index.get());
// try to failover using the next processor
- AsyncProcessor albp = AsyncProcessorTypeConverter.convert(processor);
- albp.process(exchange, this);
- } else {
- // we are done doing failover
- callback.done(doneSync);
+ doneSync = processExchange(processor, exchange, attempts, index, callback, processors);
+ if (!doneSync) {
+ if (log.isTraceEnabled()) {
+ log.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
+ }
+ // the remainder of the pipeline will be completed async
+ // so we break out now, then the callback will be invoked which then continue routing from where we left here
+ return;
+ }
}
- }
+
+ // signal callback we are done
+ callback.done(false);
+ };
}
public String toString() {
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java?rev=956216&r1=956215&r2=956216&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java Sat Jun 19 12:01:02 2010
@@ -30,18 +30,17 @@ public class AsyncEndpointFailOverLoadBa
private static String afterThreadName;
public void testAsyncEndpoint() throws Exception {
- // TODO: Fix me with async load balancer
- //getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
- //getMockEndpoint("mock:fail").expectedBodiesReceived("Hello Camel");
- //getMockEndpoint("mock:after").expectedBodiesReceived("Bye World");
- //getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:fail").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
- //String reply = template.requestBody("direct:start", "Hello Camel", String.class);
- //assertEquals("Bye World", reply);
+ String reply = template.requestBody("direct:start", "Hello Camel", String.class);
+ assertEquals("Bye World", reply);
- //assertMockEndpointsSatisfied();
+ assertMockEndpointsSatisfied();
- // assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
}
@Override
@@ -61,8 +60,8 @@ public class AsyncEndpointFailOverLoadBa
})
.loadBalance()
.failover()
- // first is sync, the 2nd is async based
- .to("direct:fail", "async:Bye World")
+ // first is sync, the 2nd is async based
+ .to("direct:fail", "async:Bye World")
.end()
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed3Test.java (from r956212, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed3Test.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed3Test.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java&r1=956212&r2=956216&rev=956216&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed3Test.java Sat Jun 19 12:01:02 2010
@@ -24,24 +24,23 @@ import org.apache.camel.builder.RouteBui
/**
* @version $Revision$
*/
-public class AsyncEndpointFailOverLoadBalanceMixed2Test extends ContextTestSupport {
+public class AsyncEndpointFailOverLoadBalanceMixed3Test extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
public void testAsyncEndpoint() throws Exception {
- // TODO: Fix me with async load balancer
- //getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
- //getMockEndpoint("mock:fail").expectedBodiesReceived("Hello Camel");
- //getMockEndpoint("mock:after").expectedBodiesReceived("Bye World");
- //getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:ok").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
- //String reply = template.requestBody("direct:start", "Hello Camel", String.class);
- //assertEquals("Bye World", reply);
+ String reply = template.requestBody("direct:start", "Hello Camel", String.class);
+ assertEquals("Bye World", reply);
- //assertMockEndpointsSatisfied();
+ assertMockEndpointsSatisfied();
- // assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
}
@Override
@@ -61,8 +60,8 @@ public class AsyncEndpointFailOverLoadBa
})
.loadBalance()
.failover()
- // first is sync, the 2nd is async based
- .to("direct:fail", "async:Bye World")
+ // first is async, the 2nd is sync based
+ .to("async:Bye World?failFirstAttempts=5", "direct:ok")
.end()
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
@@ -74,10 +73,10 @@ public class AsyncEndpointFailOverLoadBa
.to("mock:after")
.to("mock:result");
- from("direct:fail")
- .to("log:fail")
- .to("mock:fail")
- .throwException(new IllegalArgumentException("Damn"));
+ from("direct:ok")
+ .to("log:pok")
+ .to("mock:ok")
+ .transform(constant("Bye World"));
}
};
}