You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/02/26 15:13:21 UTC
svn commit: r1450179 - in /camel/trunk/camel-core: ./
src/main/java/org/apache/camel/component/direct/DirectProducer.java
src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
Author: davsclaus
Date: Tue Feb 26 14:13:21 2013
New Revision: 1450179
URL: http://svn.apache.org/r1450179
Log:
CAMEL-6102: direct-vm component - Should support the async routing engine
Modified:
camel/trunk/camel-core/ (props changed)
camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
Propchange: camel/trunk/camel-core/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Tue Feb 26 14:13:21 2013
@@ -2,6 +2,7 @@
.checkstyle
.ruleset
target
+classes
.settings
.classpath
.project
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java?rev=1450179&r1=1450178&r2=1450179&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java Tue Feb 26 14:13:21 2013
@@ -20,7 +20,7 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.util.AsyncProcessorConverterHelper;
import org.apache.camel.util.AsyncProcessorHelper;
import org.slf4j.Logger;
@@ -31,9 +31,9 @@ import org.slf4j.LoggerFactory;
*
* @version
*/
-public class DirectProducer extends DefaultProducer implements AsyncProcessor {
+public class DirectProducer extends DefaultAsyncProducer {
private static final transient Logger LOG = LoggerFactory.getLogger(DirectProducer.class);
- private DirectEndpoint endpoint;
+ private final DirectEndpoint endpoint;
public DirectProducer(DirectEndpoint endpoint) {
super(endpoint);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java?rev=1450179&r1=1450178&r2=1450179&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java Tue Feb 26 14:13:21 2013
@@ -16,33 +16,53 @@
*/
package org.apache.camel.component.directvm;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.util.AsyncProcessorConverterHelper;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The direct-vm producer
*/
-public class DirectVmProducer extends DefaultProducer {
+public class DirectVmProducer extends DefaultAsyncProducer {
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(DirectVmProducer.class);
+ private DirectVmEndpoint endpoint;
public DirectVmProducer(DirectVmEndpoint endpoint) {
super(endpoint);
+ this.endpoint = endpoint;
}
@Override
- public DirectVmEndpoint getEndpoint() {
- return (DirectVmEndpoint) super.getEndpoint();
+ public void process(Exchange exchange) throws Exception {
+ // send to consumer
+ DirectVmConsumer consumer = endpoint.getComponent().getConsumer(endpoint);
+ if (consumer == null) {
+ LOG.warn("No consumers available on endpoint: " + endpoint + " to process: " + exchange);
+ throw new CamelExchangeException("No consumers available on endpoint: " + endpoint, exchange);
+ } else {
+ consumer.getProcessor().process(exchange);
+ }
}
@Override
- public void process(Exchange exchange) throws Exception {
+ public boolean process(Exchange exchange, AsyncCallback callback) {
// send to consumer
- DirectVmConsumer consumer = getEndpoint().getComponent().getConsumer(getEndpoint());
+ DirectVmConsumer consumer = endpoint.getComponent().getConsumer(endpoint);
if (consumer == null) {
- log.warn("No consumers available on endpoint: " + getEndpoint() + " to process: " + exchange);
- throw new CamelExchangeException("No consumers available on endpoint: " + getEndpoint(), exchange);
+ LOG.warn("No consumers available on endpoint: " + endpoint + " to process: " + exchange);
+ exchange.setException(new CamelExchangeException("No consumers available on endpoint: " + endpoint, exchange));
+ callback.done(true);
+ return true;
} else {
- consumer.getProcessor().process(exchange);
+ AsyncProcessor processor = AsyncProcessorConverterHelper.convert(consumer.getProcessor());
+ return AsyncProcessorHelper.process(processor, exchange, callback);
}
}
}