You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/19 11:35:47 UTC
svn commit: r956208 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/loadbalancer/
test/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/async/ test/resources/
Author: davsclaus
Date: Sat Jun 19 09:35:46 2010
New Revision: 956208
URL: http://svn.apache.org/viewvc?rev=956208&view=rev
Log:
CAMEL-2723: Fixed load balancer with async. Still one issue left when using mixed and the first is sync based.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixedTest.java (contents, props changed)
- copied, changed from r956009, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceOnlyAsyncTest.java
- copied, changed from r956009, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceTest.java
Removed:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalanceWrappedExceptionNoLuckTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverMaximumFailoverAttemptsTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java
camel/trunk/camel-core/src/test/resources/log4j.properties
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=956208&r1=956207&r2=956208&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java Sat Jun 19 09:35:46 2010
@@ -17,6 +17,7 @@
package org.apache.camel.processor.loadbalancer;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
@@ -35,7 +36,7 @@ public class FailOverLoadBalancer extend
private int maximumFailoverAttempts = -1;
// stateful counter
- private int counter = -1;
+ private final AtomicInteger counter = new AtomicInteger(-1);
public FailOverLoadBalancer() {
this.exceptions = null;
@@ -104,23 +105,23 @@ public class FailOverLoadBalancer extend
throw new IllegalStateException("No processors available to process " + exchange);
}
- int index = 0;
- int attempts = 0;
+ final AtomicInteger index = new AtomicInteger();
+ final AtomicInteger attempts = new AtomicInteger();
// pick the first endpoint to use
if (isRoundRobin()) {
- if (++counter >= processors.size()) {
- counter = 0;
+ if (counter.incrementAndGet() >= processors.size()) {
+ counter.set(0);
}
- index = counter;
+ index.set(counter.get());
}
if (log.isDebugEnabled()) {
log.debug("Failover starting with endpoint index " + index);
}
- Processor processor = processors.get(index);
+ Processor processor = processors.get(index.get());
- // process the first time, which indicate if we should continue synchronously or not
+ // process the failover
sync = processExchange(processor, exchange, attempts, index, callback, processors);
// continue as long its being processed synchronously
@@ -133,57 +134,6 @@ public class FailOverLoadBalancer extend
return false;
}
- if (log.isTraceEnabled()) {
- log.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed synchronously");
- }
-
- // loop while we should fail over
- while (shouldFailOver(exchange)) {
- attempts++;
- // are we exhausted by attempts?
- if (maximumFailoverAttempts > -1 && attempts > maximumFailoverAttempts) {
- if (log.isDebugEnabled()) {
- log.debug("Braking out of failover after " + attempts + " failover attempts");
- }
- break;
- }
-
- index++;
- counter++;
-
- if (index >= processors.size()) {
- // out of bounds
- if (isRoundRobin()) {
- log.debug("Failover is round robin enabled and therefore starting from the first endpoint");
- index = 0;
- counter = 0;
- } else {
- // no more processors to try
- log.debug("Braking out of failover as we reach the end of endpoints to use for failover");
- break;
- }
- }
-
- // try again but prepare exchange before we failover
- prepareExchangeForFailover(exchange);
- processor = processors.get(index);
- sync = processExchange(processor, exchange, attempts, index, callback, processors);
-
- // continue as long its being processed synchronously
- if (!sync) {
- if (log.isTraceEnabled()) {
- log.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously");
- }
- // the remainder of the failover will be completed async
- // so we break out now, then the callback will be invoked which then continue routing from where we left here
- return false;
- }
-
- if (log.isTraceEnabled()) {
- log.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed synchronously");
- }
- }
-
callback.done(true);
return true;
}
@@ -203,10 +153,9 @@ public class FailOverLoadBalancer extend
exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
}
- private boolean processExchange(final Processor processor, final Exchange exchange,
- final int attempts, final int index, final AsyncCallback callback, final List<Processor> processors) {
- boolean sync;
-
+ private boolean processExchange(Processor processor, Exchange exchange,
+ AtomicInteger attempts, AtomicInteger index,
+ AsyncCallback callback, List<Processor> processors) {
if (processor == null) {
throw new IllegalStateException("No processors could be chosen to process " + exchange);
}
@@ -215,8 +164,7 @@ public class FailOverLoadBalancer extend
}
AsyncProcessor albp = AsyncProcessorTypeConverter.convert(processor);
- sync = albp.process(exchange, new FailOverAsyncCallback(exchange, attempts, index, callback, processors));
-
+ boolean sync = albp.process(exchange, new FailOverAsyncCallback(exchange, attempts, index, callback, processors));
return sync;
}
@@ -227,12 +175,12 @@ public class FailOverLoadBalancer extend
private final class FailOverAsyncCallback implements AsyncCallback {
private final Exchange exchange;
- private int attempts;
- private int index;
+ private final AtomicInteger attempts;
+ private final AtomicInteger index;
private final AsyncCallback callback;
private final List<Processor> processors;
- private FailOverAsyncCallback(Exchange exchange, int attempts, int index, AsyncCallback callback, List<Processor> processors) {
+ private FailOverAsyncCallback(Exchange exchange, AtomicInteger attempts, AtomicInteger index, AsyncCallback callback, List<Processor> processors) {
this.exchange = exchange;
this.attempts = attempts;
this.index = index;
@@ -243,34 +191,35 @@ public class FailOverLoadBalancer extend
public void done(boolean doneSync) {
// should we failover?
if (shouldFailOver(exchange)) {
- attempts++;
+ attempts.incrementAndGet();
// are we exhausted by attempts?
- if (maximumFailoverAttempts > -1 && attempts > maximumFailoverAttempts) {
+ if (maximumFailoverAttempts > -1 && attempts.get() > maximumFailoverAttempts) {
if (log.isDebugEnabled()) {
log.debug("Braking out of failover after " + attempts + " failover attempts");
}
- callback.done(false);
+ callback.done(doneSync);
+ return;
}
- index++;
- counter++;
+ index.incrementAndGet();
+ counter.incrementAndGet();
- if (index >= processors.size()) {
+ if (index.get() >= processors.size()) {
// out of bounds
if (isRoundRobin()) {
log.debug("Failover is round robin enabled and therefore starting from the first endpoint");
- index = 0;
- counter = 0;
+ index.set(0);
+ counter.set(0);
} else {
// no more processors to try
log.debug("Braking out of failover as we reach the end of endpoints to use for failover");
- callback.done(false);
+ callback.done(doneSync);
}
}
// try again but prepare exchange before we failover
prepareExchangeForFailover(exchange);
- Processor processor = processors.get(index);
+ Processor processor = processors.get(index.get());
// try to failover using the next processor
AsyncProcessor albp = AsyncProcessorTypeConverter.convert(processor);
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalanceWrappedExceptionNoLuckTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalanceWrappedExceptionNoLuckTest.java?rev=956208&r1=956207&r2=956208&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalanceWrappedExceptionNoLuckTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalanceWrappedExceptionNoLuckTest.java Sat Jun 19 09:35:46 2010
@@ -66,6 +66,7 @@ public class FailOverLoadBalanceWrappedE
try {
template.sendBody("direct:start", "Hello World");
+ fail("Should have thrown exception");
} catch (CamelExecutionException e) {
assertEquals("Forced", e.getCause().getMessage());
assertIsInstanceOf(IOException.class, e.getCause());
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverMaximumFailoverAttemptsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverMaximumFailoverAttemptsTest.java?rev=956208&r1=956207&r2=956208&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverMaximumFailoverAttemptsTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverMaximumFailoverAttemptsTest.java Sat Jun 19 09:35:46 2010
@@ -64,18 +64,22 @@ public class FailoverMaximumFailoverAtte
to("direct:bad", "direct:bad2", "direct:bad3", "direct:good");
from("direct:bad")
+ .to("log:bad")
.to("mock:bad")
.throwException(new IllegalArgumentException("Damn"));
from("direct:bad2")
+ .to("log:bad2")
.to("mock:bad2")
.throwException(new IllegalArgumentException("Damn Again"));
from("direct:bad3")
+ .to("log:bad3")
.to("mock:bad3")
.throwException(new IllegalArgumentException("Damn Again Again"));
from("direct:good")
+ .to("log:good")
.to("mock:good");
}
};
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java?rev=956208&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java Sat Jun 19 09:35:46 2010
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointFailOverLoadBalanceMixed2Test extends ContextTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ public void testAsyncEndpoint() throws Exception {
+ // TODO: Fix me with async load balancer
+ //getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ //getMockEndpoint("mock:fail").expectedBodiesReceived("Hello Camel");
+ //getMockEndpoint("mock:after").expectedBodiesReceived("Bye World");
+ //getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+ //String reply = template.requestBody("direct:start", "Hello Camel", String.class);
+ //assertEquals("Bye World", reply);
+
+ //assertMockEndpointsSatisfied();
+
+ // assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ from("direct:start")
+ .to("mock:before")
+ .to("log:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ })
+ .loadBalance()
+ .failover()
+ // first is sync, the 2nd is async based
+ .to("direct:fail", "async:Bye World")
+ .end()
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ // because the first is a sync then it will wait and thus use the same thread to continue
+ afterThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("log:after")
+ .to("mock:after")
+ .to("mock:result");
+
+ from("direct:fail")
+ .to("log:fail")
+ .to("mock:fail")
+ .throwException(new IllegalArgumentException("Damn"));
+ }
+ };
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixed2Test.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixedTest.java (from r956009, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixedTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixedTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceTest.java&r1=956009&r2=956208&rev=956208&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixedTest.java Sat Jun 19 09:35:46 2010
@@ -24,7 +24,7 @@ import org.apache.camel.builder.RouteBui
/**
* @version $Revision$
*/
-public class AsyncEndpointFailOverLoadBalanceTest extends ContextTestSupport {
+public class AsyncEndpointFailOverLoadBalanceMixedTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixedTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceMixedTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceOnlyAsyncTest.java (from r956009, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceOnlyAsyncTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceOnlyAsyncTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceTest.java&r1=956009&r2=956208&rev=956208&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailOverLoadBalanceOnlyAsyncTest.java Sat Jun 19 09:35:46 2010
@@ -24,14 +24,13 @@ import org.apache.camel.builder.RouteBui
/**
* @version $Revision$
*/
-public class AsyncEndpointFailOverLoadBalanceTest extends ContextTestSupport {
+public class AsyncEndpointFailOverLoadBalanceOnlyAsyncTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
public void testAsyncEndpoint() throws Exception {
getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
- getMockEndpoint("mock:fail").expectedBodiesReceived("Hello Camel");
getMockEndpoint("mock:after").expectedBodiesReceived("Bye World");
getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
@@ -61,8 +60,7 @@ public class AsyncEndpointFailOverLoadBa
.loadBalance()
.failover()
// the last would succeed
- // and make it complex by having a direct endpoint which is not a real async processor
- .to("async:Bye Camel?failFirstAttempts=5", "direct:fail", "async:Bye Moon?failFirstAttempts=5", "async:Bye World")
+ .to("async:Bye Camel?failFirstAttempts=5", "async:Bye Moon?failFirstAttempts=5", "async:Bye World")
.end()
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
@@ -72,11 +70,6 @@ public class AsyncEndpointFailOverLoadBa
.to("log:after")
.to("mock:after")
.to("mock:result");
-
- from("direct:fail")
- .to("log:fail")
- .to("mock:fail")
- .throwException(new IllegalArgumentException("Damn"));
}
};
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java?rev=956208&r1=956207&r2=956208&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java Sat Jun 19 09:35:46 2010
@@ -28,7 +28,7 @@ import org.apache.camel.impl.DefaultEndp
public class MyAsyncEndpoint extends DefaultEndpoint {
private String reply;
- private long delay = 500;
+ private long delay = 1000;
private int failFirstAttempts;
public MyAsyncEndpoint(String endpointUri, Component component) {
Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=956208&r1=956207&r2=956208&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ camel/trunk/camel-core/src/test/resources/log4j.properties Sat Jun 19 09:35:46 2010
@@ -28,6 +28,7 @@ log4j.logger.org.apache.activemq.spring=
#log4j.logger.org.apache.camel.component.mock=DEBUG
#log4j.logger.org.apache.camel.component.file=TRACE
#log4j.logger.org.apache.camel.processor.Pipeline=TRACE
+#log4j.logger.org.apache.camel.processor.loadbalancer=TRACE
log4j.logger.org.apache.camel.impl.converter=WARN
log4j.logger.org.apache.camel.management=WARN
log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN