You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/19 14:01:02 UTC

svn commit: r956216 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/ main/java/org/apache/camel/processor/loadbalancer/ test/java/org/apache/camel/processor/async/

Author: davsclaus
Date: Sat Jun 19 12:01:02 2010
New Revision: 956216

URL: http://svn.apache.org/viewvc?rev=956216&view=rev
Log:
CAMEL-2723: Reworked failover LB as to mirror the pipeline as its just a specialized Pipeline. This ensures the async routing engine works flawless. And on top others can understand and maintain the code. I was about to give beers if you could understand the previous code :)

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed3Test.java
      - copied, changed from r956212, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=956216&r1=956215&r2=956216&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Sat Jun 19 12:01:02 2010
@@ -181,6 +181,9 @@ public class Pipeline extends MulticastP
                     nextExchange = createNextExchange(nextExchange);
                     sync = process(original, nextExchange, callback, processors, processor);
                     if (!sync) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
+                        }
                         return;
                     }
                 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=956216&r1=956215&r2=956216&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java Sat Jun 19 12:01:02 2010
@@ -28,6 +28,10 @@ import org.apache.camel.util.ObjectHelpe
 
 /**
  * This FailOverLoadBalancer will failover to use next processor when an exception occurred
+ * <p/>
+ * This implementation mirrors the logic from the {@link org.apache.camel.processor.Pipeline} in the async variation
+ * as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the
+ * pipeline to ensure it works the same and the async routing engine is flawless.
  */
 public class FailOverLoadBalancer extends LoadBalancerSupport {
 
@@ -45,6 +49,7 @@ public class FailOverLoadBalancer extend
     public FailOverLoadBalancer(List<Class<?>> exceptions) {
         this.exceptions = exceptions;
 
+        // validate its all exception types
         for (Class<?> type : exceptions) {
             if (!ObjectHelper.isAssignableFrom(Throwable.class, type)) {
                 throw new IllegalArgumentException("Class is not an instance of Throwable: " + type);
@@ -98,17 +103,13 @@ public class FailOverLoadBalancer extend
     }
 
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        boolean sync;
-
-        List<Processor> processors = getProcessors();
-        if (processors.isEmpty()) {
-            throw new IllegalStateException("No processors available to process " + exchange);
-        }
+        final List<Processor> processors = getProcessors();
 
         final AtomicInteger index = new AtomicInteger();
         final AtomicInteger attempts = new AtomicInteger();
+        boolean first = true;
 
-        // pick the first endpoint to use
+        // get the next processor
         if (isRoundRobin()) {
             if (counter.incrementAndGet() >= processors.size()) {
                 counter.set(0);
@@ -119,19 +120,64 @@ public class FailOverLoadBalancer extend
             log.debug("Failover starting with endpoint index " + index);
         }
 
-        Processor processor = processors.get(index.get());
+        while (first || shouldFailOver(exchange)) {
+            if (!first) {
+                attempts.incrementAndGet();
+                // are we exhausted by attempts?
+                if (maximumFailoverAttempts > -1 && attempts.get() > maximumFailoverAttempts) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Braking out of failover after " + attempts + " failover attempts");
+                    }
+                    break;
+                }
+
+                index.incrementAndGet();
+                counter.incrementAndGet();
+            } else {
+                // flip first switch
+                first = false;
+            }
+
+            if (index.get() >= processors.size()) {
+                // out of bounds
+                if (isRoundRobin()) {
+                    log.debug("Failover is round robin enabled and therefore starting from the first endpoint");
+                    index.set(0);
+                    counter.set(0);
+                } else {
+                    // no more processors to try
+                    log.debug("Braking out of failover as we reach the end of endpoints to use for failover");
+                    break;
+                }
+            }
 
-        // process the failover
-        sync = processExchange(processor, exchange, attempts, index, callback, processors);
+            // try again but prepare exchange before we failover
+            prepareExchangeForFailover(exchange);
+            Processor processor = processors.get(index.get());
+
+            // process the exchange
+            boolean sync = processExchange(processor, exchange, attempts, index, callback, processors);
+
+            // continue as long its being processed synchronously
+            if (!sync) {
+                if (log.isTraceEnabled()) {
+                    log.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
+                }
+                // the remainder of the pipeline will be completed async
+                // so we break out now, then the callback will be invoked which then continue routing from where we left here
+                return false;
+            }
 
-        // continue as long its being processed synchronously
-        if (!sync) {
             if (log.isTraceEnabled()) {
-                log.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
+                log.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed synchronously");
             }
-            // the remainder of the failover will be completed async
-            // so we break out now, then the callback will be invoked which then continue routing from where we left here
-            return false;
+        }
+
+        if (log.isTraceEnabled()) {
+            // logging nextExchange as it contains the exchange that might have altered the payload and since
+            // we are logging the completion if will be confusing if we log the original instead
+            // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
+            log.trace("Failover complete for exchangeId: " + exchange.getExchangeId() + " >>> " + exchange);
         }
 
         callback.done(true);
@@ -164,8 +210,7 @@ public class FailOverLoadBalancer extend
         }
 
         AsyncProcessor albp = AsyncProcessorTypeConverter.convert(processor);
-        boolean sync = albp.process(exchange, new FailOverAsyncCallback(exchange, attempts, index, callback, processors));
-        return sync;
+        return albp.process(exchange, new FailOverAsyncCallback(exchange, attempts, index, callback, processors));
     }
 
     /**
@@ -189,16 +234,19 @@ public class FailOverLoadBalancer extend
         }
 
         public void done(boolean doneSync) {
-            // should we failover?
-            if (shouldFailOver(exchange)) {
+            // we only have to handle async completion of the pipeline
+            if (doneSync) {
+                return;
+            }
+
+            while (shouldFailOver(exchange)) {
                 attempts.incrementAndGet();
                 // are we exhausted by attempts?
                 if (maximumFailoverAttempts > -1 && attempts.get() > maximumFailoverAttempts) {
                     if (log.isDebugEnabled()) {
                         log.debug("Braking out of failover after " + attempts + " failover attempts");
                     }
-                    callback.done(doneSync);
-                    return;
+                    break;
                 }
 
                 index.incrementAndGet();
@@ -213,8 +261,7 @@ public class FailOverLoadBalancer extend
                     } else {
                         // no more processors to try
                         log.debug("Braking out of failover as we reach the end of endpoints to use for failover");
-                        callback.done(doneSync);
-                        return;
+                        break;
                     }
                 }
 
@@ -223,13 +270,20 @@ public class FailOverLoadBalancer extend
                 Processor processor = processors.get(index.get());
 
                 // try to failover using the next processor
-                AsyncProcessor albp = AsyncProcessorTypeConverter.convert(processor);
-                albp.process(exchange, this);
-            } else {
-                // we are done doing failover
-                callback.done(doneSync);
+                doneSync = processExchange(processor, exchange, attempts, index, callback, processors);
+                if (!doneSync) {
+                    if (log.isTraceEnabled()) {
+                        log.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
+                    }
+                    // the remainder of the pipeline will be completed async
+                    // so we break out now, then the callback will be invoked which then continue routing from where we left here
+                    return;
+                }
             }
-        }
+
+            // signal callback we are done
+            callback.done(false);
+        };
     }
 
     public String toString() {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java?rev=956216&r1=956215&r2=956216&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java Sat Jun 19 12:01:02 2010
@@ -30,18 +30,17 @@ public class AsyncEndpointFailOverLoadBa
     private static String afterThreadName;
 
     public void testAsyncEndpoint() throws Exception {
-        // TODO: Fix me with async load balancer
-        //getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
-        //getMockEndpoint("mock:fail").expectedBodiesReceived("Hello Camel");
-        //getMockEndpoint("mock:after").expectedBodiesReceived("Bye World");
-        //getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:fail").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
 
-        //String reply = template.requestBody("direct:start", "Hello Camel", String.class);
-        //assertEquals("Bye World", reply);
+        String reply = template.requestBody("direct:start", "Hello Camel", String.class);
+        assertEquals("Bye World", reply);
 
-        //assertMockEndpointsSatisfied();
+        assertMockEndpointsSatisfied();
 
-        // assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
     }
 
     @Override
@@ -61,8 +60,8 @@ public class AsyncEndpointFailOverLoadBa
                         })
                         .loadBalance()
                         .failover()
-                                // first is sync, the 2nd is async based
-                        .to("direct:fail", "async:Bye World")
+                            // first is sync, the 2nd is async based
+                            .to("direct:fail", "async:Bye World")
                         .end()
                         .process(new Processor() {
                             public void process(Exchange exchange) throws Exception {

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed3Test.java (from r956212, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed3Test.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed3Test.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java&r1=956212&r2=956216&rev=956216&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed3Test.java Sat Jun 19 12:01:02 2010
@@ -24,24 +24,23 @@ import org.apache.camel.builder.RouteBui
 /**
  * @version $Revision$
  */
-public class AsyncEndpointFailOverLoadBalanceMixed2Test extends ContextTestSupport {
+public class AsyncEndpointFailOverLoadBalanceMixed3Test extends ContextTestSupport {
 
     private static String beforeThreadName;
     private static String afterThreadName;
 
     public void testAsyncEndpoint() throws Exception {
-        // TODO: Fix me with async load balancer
-        //getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
-        //getMockEndpoint("mock:fail").expectedBodiesReceived("Hello Camel");
-        //getMockEndpoint("mock:after").expectedBodiesReceived("Bye World");
-        //getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:ok").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
 
-        //String reply = template.requestBody("direct:start", "Hello Camel", String.class);
-        //assertEquals("Bye World", reply);
+        String reply = template.requestBody("direct:start", "Hello Camel", String.class);
+        assertEquals("Bye World", reply);
 
-        //assertMockEndpointsSatisfied();
+        assertMockEndpointsSatisfied();
 
-        // assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
     }
 
     @Override
@@ -61,8 +60,8 @@ public class AsyncEndpointFailOverLoadBa
                         })
                         .loadBalance()
                         .failover()
-                                // first is sync, the 2nd is async based
-                        .to("direct:fail", "async:Bye World")
+                            // first is async, the 2nd is sync based
+                            .to("async:Bye World?failFirstAttempts=5", "direct:ok")
                         .end()
                         .process(new Processor() {
                             public void process(Exchange exchange) throws Exception {
@@ -74,10 +73,10 @@ public class AsyncEndpointFailOverLoadBa
                         .to("mock:after")
                         .to("mock:result");
 
-                from("direct:fail")
-                        .to("log:fail")
-                        .to("mock:fail")
-                        .throwException(new IllegalArgumentException("Damn"));
+                from("direct:ok")
+                        .to("log:pok")
+                        .to("mock:ok")
+                        .transform(constant("Bye World"));
             }
         };
     }