You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/18 12:34:58 UTC
svn commit: r955924 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/component/seda/ main/java/org/apache/camel/impl/
main/java/org/apache/camel/util/ test/java/org/apache/camel/impl/
test/java/org/apache/camel/processor/async/
Author: davsclaus
Date: Fri Jun 18 10:34:57 2010
New Revision: 955924
URL: http://svn.apache.org/viewvc?rev=955924&view=rev
Log:
CAMEL-2723: seda component migrated to support async routing engine.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProducer.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaInOnlyTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaTest.java
- copied, changed from r955852, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java?rev=955924&r1=955923&r2=955924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java Fri Jun 18 10:34:57 2010
@@ -18,17 +18,17 @@ package org.apache.camel.component.seda;
import java.util.Collection;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
/**
* A simple {@link org.apache.camel.Producer} which just appends to a {@link Collection} the {@link Exchange} object.
*
* @version $Revision$
*/
-public class CollectionProducer extends DefaultProducer implements Processor {
+public class CollectionProducer extends DefaultAsyncProducer {
protected final Collection<Exchange> queue;
public CollectionProducer(Endpoint endpoint, Collection<Exchange> queue) {
@@ -36,9 +36,10 @@ public class CollectionProducer extends
this.queue = queue;
}
- public void process(Exchange exchange) throws Exception {
+ public boolean process(Exchange exchange, AsyncCallback callback) {
Exchange copy = exchange.copy();
queue.add(copy);
+ callback.done(true);
+ return true;
}
-
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java?rev=955924&r1=955923&r2=955924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java Fri Jun 18 10:34:57 2010
@@ -20,6 +20,7 @@ import java.util.concurrent.BlockingQueu
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.WaitForTaskToComplete;
@@ -42,7 +43,7 @@ public class SedaProducer extends Collec
}
@Override
- public void process(final Exchange exchange) throws Exception {
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
// use a new copy of the exchange to route async and handover the on completion to the new copy
// so its the new copy that performs the on completion callback when its done
Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, true);
@@ -103,7 +104,12 @@ public class SedaProducer extends Collec
log.trace("Waiting for task to complete using timeout (ms): " + timeout + " at [" + endpoint.getEndpointUri() + "]");
}
// lets see if we can get the task done before the timeout
- boolean done = latch.await(timeout, TimeUnit.MILLISECONDS);
+ boolean done = false;
+ try {
+ done = latch.await(timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // ignore
+ }
if (!done) {
exchange.setException(new ExchangeTimedOutException(exchange, timeout));
// count down to indicate timeout
@@ -114,12 +120,21 @@ public class SedaProducer extends Collec
log.trace("Waiting for task to complete (blocking) at [" + endpoint.getEndpointUri() + "]");
}
// no timeout then wait until its done
- latch.await();
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ // ignore
+ }
}
} else {
// no wait, eg its a InOnly then just add to queue and return
queue.add(copy);
}
+
+ // we use OnCompletion on the Exchange to callback and wait for the Exchange to be done
+ // so we should just signal the callback we are done synchronously
+ callback.done(true);
+ return true;
}
@Override
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProducer.java?rev=955924&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProducer.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProducer.java Fri Jun 18 10:34:57 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.util.AsyncProcessorHelper;
+
+/**
+ * A default implementation of {@link org.apache.camel.Producer} for implementation inheritance,
+ * which can process {@link Exchange}s asynchronously.
+ *
+ * @version $Revision$
+ */
+public abstract class DefaultAsyncProducer extends DefaultProducer implements AsyncProcessor {
+
+ public DefaultAsyncProducer(Endpoint endpoint) {
+ super(endpoint);
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProducer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProducer.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java?rev=955924&r1=955923&r2=955924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java Fri Jun 18 10:34:57 2010
@@ -27,7 +27,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
/**
- * Helper methods for AsyncProcessor objects.
+ * Helper methods for {@link AsyncProcessor} objects.
*/
public final class AsyncProcessorHelper {
@@ -43,8 +43,8 @@ public final class AsyncProcessorHelper
public static void process(AsyncProcessor processor, Exchange exchange) throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
boolean sync = processor.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- if (!sync) {
+ public void done(boolean doneSync) {
+ if (!doneSync) {
latch.countDown();
}
}
@@ -61,7 +61,9 @@ public final class AsyncProcessorHelper
* @param processor the processor
* @param exchange the exchange
* @return a future handle for the task being executed asynchronously
+ * @deprecated will be removed in Camel 2.5
*/
+ @Deprecated
public static Future<Exchange> asyncProcess(final ExecutorService executor, final Processor processor, final Exchange exchange) {
Callable<Exchange> task = new Callable<Exchange>() {
public Exchange call() throws Exception {
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java?rev=955924&r1=955923&r2=955924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java Fri Jun 18 10:34:57 2010
@@ -288,7 +288,7 @@ public class DefaultProducerTemplateAsyn
// produce it async so we use a helper
Producer producer = endpoint.createProducer();
- // normally you will use a shared exectutor service with pools
+ // normally you will use a shared executor service with pools
ExecutorService executor = Executors.newSingleThreadExecutor();
// send it async with the help of this helper
Future<Exchange> future = AsyncProcessorHelper.asyncProcess(executor, producer, exchange);
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaInOnlyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaInOnlyTest.java?rev=955924&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaInOnlyTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaInOnlyTest.java Fri Jun 18 10:34:57 2010
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointSedaInOnlyTest extends ContextTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+ private static String sedaThreadName;
+ private static String route = "";
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ route = "";
+ }
+
+ public void testAsyncEndpoint() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+ template.sendBody("direct:start", "Hello Camel");
+ // we should run before the async processor that sets B
+ route += "A";
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(sedaThreadName));
+ assertFalse("Should use different threads", afterThreadName.equalsIgnoreCase(sedaThreadName));
+
+ assertEquals("AB", route);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ from("direct:start")
+ .to("mock:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("async:Bye Camel")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("seda:foo");
+
+ from("seda:foo")
+ .to("mock:after")
+ .to("log:after")
+ .delay(1000)
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ route += "B";
+ sedaThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("mock:result");
+ }
+ };
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaInOnlyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaInOnlyTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaTest.java (from r955852, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=955852&r2=955924&rev=955924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaTest.java Fri Jun 18 10:34:57 2010
@@ -24,7 +24,7 @@ import org.apache.camel.builder.RouteBui
/**
* @version $Revision$
*/
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointSedaTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
@@ -51,7 +51,6 @@ public class AsyncEndpointTest extends C
from("direct:start")
.to("mock:before")
- .to("log:before")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
beforeThreadName = Thread.currentThread().getName();
@@ -63,11 +62,13 @@ public class AsyncEndpointTest extends C
afterThreadName = Thread.currentThread().getName();
}
})
- .to("log:after")
+ .to("seda:foo");
+
+ from("seda:foo")
.to("mock:after")
.to("mock:result");
}
};
}
-}
+}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java?rev=955924&r1=955923&r2=955924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java Fri Jun 18 10:34:57 2010
@@ -22,46 +22,41 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @version $Revision$
*/
-public class MyAsyncProducer implements AsyncProcessor, Producer {
+public class MyAsyncProducer extends DefaultAsyncProducer {
private static final Log LOG = LogFactory.getLog(MyAsyncProducer.class);
private final ExecutorService executor = Executors.newSingleThreadExecutor();
- private final MyAsyncEndpoint endpoint;
private final AtomicInteger counter = new AtomicInteger();
public MyAsyncProducer(MyAsyncEndpoint endpoint) {
- this.endpoint = endpoint;
+ super(endpoint);
}
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
+ public MyAsyncEndpoint getEndpoint() {
+ return (MyAsyncEndpoint) super.getEndpoint();
}
public boolean process(final Exchange exchange, final AsyncCallback callback) {
executor.submit(new Callable<Object>() {
public Object call() throws Exception {
- LOG.info("Simulating a task which takes " + endpoint.getDelay() + " millis to reply");
- Thread.sleep(endpoint.getDelay());
+ LOG.info("Simulating a task which takes " + getEndpoint().getDelay() + " millis to reply");
+ Thread.sleep(getEndpoint().getDelay());
int count = counter.incrementAndGet();
- if (endpoint.getFailFirstAttempts() >= count) {
+ if (getEndpoint().getFailFirstAttempts() >= count) {
LOG.info("Simulating a failure at attempt " + count);
exchange.setException(new CamelExchangeException("Simulated error at attempt " + count, exchange));
} else {
- String reply = endpoint.getReply();
+ String reply = getEndpoint().getReply();
exchange.getOut().setBody(reply);
LOG.info("Setting reply " + reply);
}
@@ -77,29 +72,4 @@ public class MyAsyncProducer implements
return false;
}
- public MyAsyncEndpoint getEndpoint() {
- return endpoint;
- }
-
- public Exchange createExchange() {
- return new DefaultExchange(endpoint);
- }
-
- public Exchange createExchange(ExchangePattern pattern) {
- return new DefaultExchange(endpoint, pattern);
- }
-
- public Exchange createExchange(Exchange exchange) {
- return new DefaultExchange(exchange);
- }
-
- public void start() throws Exception {
- }
-
- public void stop() throws Exception {
- }
-
- public boolean isSingleton() {
- return true;
- }
}