You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/17 08:13:41 UTC
svn commit: r955488 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/async/
Author: davsclaus
Date: Thu Jun 17 06:13:40 2010
New Revision: 955488
URL: http://svn.apache.org/viewvc?rev=955488&view=rev
Log:
CAMEL-2723: Added more tests.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterDeadLetterChannelTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionContinueTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterTest.java
- copied, changed from r955480, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWFailedTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java
- copied, changed from r955480, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java?rev=955488&r1=955487&r2=955488&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java Thu Jun 17 06:13:40 2010
@@ -97,10 +97,15 @@ public final class UnitOfWorkProcessor e
private void doneUow(DefaultUnitOfWork uow, Exchange exchange) {
// unit of work is done
- exchange.getUnitOfWork().done(exchange);
+ try {
+ exchange.getUnitOfWork().done(exchange);
+ } catch (Throwable e) {
+ LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange
+ + ". This exception will be ignored.");
+ }
try {
uow.stop();
- } catch (Exception e) {
+ } catch (Throwable e) {
LOG.warn("Exception occurred during stopping UnitOfWork for Exchange: " + exchange
+ ". This exception will be ignored.");
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterDeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterDeadLetterChannelTest.java?rev=955488&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterDeadLetterChannelTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterDeadLetterChannelTest.java Thu Jun 17 06:13:40 2010
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointFailedAfterDeadLetterChannelTest extends ContextTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ public void testAsyncEndpoint() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:dead").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+
+ template.sendBody("direct:start", "Hello Camel");
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(2).useOriginalMessage());
+
+ from("direct:start")
+ .to("mock:before")
+ .to("log:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("async:Bye Camel")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("log:after")
+ .to("mock:after")
+ .throwException(new IllegalArgumentException("Damn"))
+ .to("mock:result");
+ }
+ };
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterDeadLetterChannelTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterDeadLetterChannelTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionContinueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionContinueTest.java?rev=955488&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionContinueTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionContinueTest.java Thu Jun 17 06:13:40 2010
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointFailedAfterOnExceptionContinueTest extends ContextTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ public void testAsyncEndpoint() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+ // and we should keep the exception so we know what caused the failure
+ getMockEndpoint("mock:result").message(0).property(Exchange.EXCEPTION_CAUGHT).isInstanceOf(IllegalArgumentException.class);
+
+ String reply = template.requestBody("direct:start", "Hello Camel", String.class);
+ assertEquals("Bye Camel", reply);
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ // tell Camel to handle and continue when this exception is thrown
+ onException(IllegalArgumentException.class).continued(true);
+
+ from("direct:start")
+ .to("mock:before")
+ .to("log:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("async:Bye Camel")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("log:after")
+ .to("mock:after")
+ .throwException(new IllegalArgumentException("Damn"))
+ .to("mock:result");
+ }
+ };
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionContinueTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionContinueTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionTest.java?rev=955488&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionTest.java Thu Jun 17 06:13:40 2010
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointFailedAfterOnExceptionTest extends ContextTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ public void testAsyncEndpoint() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:error").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+
+ String reply = template.requestBody("direct:start", "Hello Camel", String.class);
+ assertEquals("FAIL", reply);
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ onException(IllegalArgumentException.class).handled(true).to("mock:error").transform(constant("FAIL"));
+
+ from("direct:start")
+ .to("mock:before")
+ .to("log:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("async:Bye Camel")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("log:after")
+ .to("mock:after")
+ .throwException(new IllegalArgumentException("Damn"))
+ .to("mock:result");
+ }
+ };
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterOnExceptionTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterTest.java (from r955480, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=955480&r2=955488&rev=955488&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointFailedAfterTest.java Thu Jun 17 06:13:40 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor.async;
+import org.apache.camel.CamelExecutionException;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -24,7 +25,7 @@ import org.apache.camel.builder.RouteBui
/**
* @version $Revision$
*/
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointFailedAfterTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
@@ -32,10 +33,15 @@ public class AsyncEndpointTest extends C
public void testAsyncEndpoint() throws Exception {
getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
- getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedMessageCount(0);
- String reply = template.requestBody("direct:start", "Hello Camel", String.class);
- assertEquals("Bye Camel", reply);
+ try {
+ template.requestBody("direct:start", "Hello Camel", String.class);
+ fail("Should have thrown an exception");
+ } catch (CamelExecutionException e) {
+ assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+ assertEquals("Damn", e.getCause().getMessage());
+ }
assertMockEndpointsSatisfied();
@@ -65,9 +71,10 @@ public class AsyncEndpointTest extends C
})
.to("log:after")
.to("mock:after")
+ .throwException(new IllegalArgumentException("Damn"))
.to("mock:result");
}
};
}
-}
+}
\ No newline at end of file
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWFailedTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWFailedTest.java?rev=955488&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWFailedTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWFailedTest.java Thu Jun 17 06:13:40 2010
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.SynchronizationAdapter;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointUoWFailedTest extends ContextTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+ private MySynchronization sync = new MySynchronization();
+
+ public void testAsyncEndpoint() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+
+ try {
+ template.requestBody("direct:start", "Hello Camel", String.class);
+ fail("Should have thrown an exception");
+ } catch (CamelExecutionException e) {
+ assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+ assertEquals("Damn", e.getCause().getMessage());
+ }
+
+ assertMockEndpointsSatisfied();
+
+ // wait a bit to ensure UoW has been run
+ Thread.sleep(1000);
+
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ assertEquals(0, sync.isOnComplete());
+ assertEquals(1, sync.isOnFailure());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ from("direct:start")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName = Thread.currentThread().getName();
+ exchange.addOnCompletion(sync);
+ }
+ })
+ .to("mock:before")
+ .to("log:before")
+ .to("async:Bye Camel")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("log:after")
+ .to("mock:after")
+ .throwException(new IllegalArgumentException("Damn"))
+ .to("mock:result");
+ }
+ };
+ }
+
+ private class MySynchronization extends SynchronizationAdapter {
+
+ private AtomicInteger onComplete = new AtomicInteger();
+ private AtomicInteger onFailure = new AtomicInteger();
+
+ public void onComplete(Exchange exchange) {
+ onComplete.incrementAndGet();
+ }
+
+ @Override
+ public void onFailure(Exchange exchange) {
+ onFailure.incrementAndGet();
+ }
+
+ public int isOnComplete() {
+ return onComplete.get();
+ }
+
+ public int isOnFailure() {
+ return onFailure.get();
+ }
+
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWFailedTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWFailedTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java (from r955480, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=955480&r2=955488&rev=955488&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java Thu Jun 17 06:13:40 2010
@@ -16,18 +16,23 @@
*/
package org.apache.camel.processor.async;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.SynchronizationAdapter;
/**
* @version $Revision$
*/
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointUoWTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
+ private MySynchronization sync = new MySynchronization();
public void testAsyncEndpoint() throws Exception {
getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
@@ -39,7 +44,12 @@ public class AsyncEndpointTest extends C
assertMockEndpointsSatisfied();
+ // wait a bit to ensure UoW has been run
+ Thread.sleep(1000);
+
assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ assertEquals(1, sync.isOnComplete());
+ assertEquals(0, sync.isOnFailure());
}
@Override
@@ -50,13 +60,14 @@ public class AsyncEndpointTest extends C
context.addComponent("async", new MyAsyncComponent());
from("direct:start")
- .to("mock:before")
- .to("log:before")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
beforeThreadName = Thread.currentThread().getName();
+ exchange.addOnCompletion(sync);
}
})
+ .to("mock:before")
+ .to("log:before")
.to("async:Bye Camel")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
@@ -70,4 +81,28 @@ public class AsyncEndpointTest extends C
};
}
-}
+ private class MySynchronization extends SynchronizationAdapter {
+
+ private AtomicInteger onComplete = new AtomicInteger();
+ private AtomicInteger onFailure = new AtomicInteger();
+
+ public void onComplete(Exchange exchange) {
+ onComplete.incrementAndGet();
+ }
+
+ @Override
+ public void onFailure(Exchange exchange) {
+ onFailure.incrementAndGet();
+ }
+
+ public int isOnComplete() {
+ return onComplete.get();
+ }
+
+ public int isOnFailure() {
+ return onFailure.get();
+ }
+
+ }
+
+}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java?rev=955488&r1=955487&r2=955488&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java Thu Jun 17 06:13:40 2010
@@ -28,6 +28,7 @@ import org.apache.camel.impl.DefaultEndp
public class MyAsyncEndpoint extends DefaultEndpoint {
private String reply;
+ private long delay = 500;
public MyAsyncEndpoint(String endpointUri, Component component) {
super(endpointUri, component);
@@ -52,4 +53,12 @@ public class MyAsyncEndpoint extends Def
public void setReply(String reply) {
this.reply = reply;
}
+
+ public long getDelay() {
+ return delay;
+ }
+
+ public void setDelay(long delay) {
+ this.delay = delay;
+ }
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java?rev=955488&r1=955487&r2=955488&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java Thu Jun 17 06:13:40 2010
@@ -50,9 +50,9 @@ public class MyAsyncProducer implements
public boolean process(final Exchange exchange, final AsyncCallback callback) {
executor.submit(new Callable<Object>() {
public Object call() throws Exception {
- LOG.info("Simulating a task which takes 2 sec to reply");
+ LOG.info("Simulating a task which takes " + endpoint.getDelay() + " millis to reply");
- Thread.sleep(2000);
+ Thread.sleep(endpoint.getDelay());
String reply = endpoint.getReply();
exchange.getOut().setBody(reply);