You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/02/26 15:13:21 UTC

svn commit: r1450179 - in /camel/trunk/camel-core: ./ src/main/java/org/apache/camel/component/direct/DirectProducer.java src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java

Author: davsclaus
Date: Tue Feb 26 14:13:21 2013
New Revision: 1450179

URL: http://svn.apache.org/r1450179
Log:
CAMEL-6102: direct-vm component - Should support the async routing engine

Modified:
    camel/trunk/camel-core/   (props changed)
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java

Propchange: camel/trunk/camel-core/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Tue Feb 26 14:13:21 2013
@@ -2,6 +2,7 @@
 .checkstyle
 .ruleset
 target
+classes
 .settings
 .classpath
 .project

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java?rev=1450179&r1=1450178&r2=1450179&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java Tue Feb 26 14:13:21 2013
@@ -20,7 +20,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.slf4j.Logger;
@@ -31,9 +31,9 @@ import org.slf4j.LoggerFactory;
  *
  * @version 
  */
-public class DirectProducer extends DefaultProducer implements AsyncProcessor {
+public class DirectProducer extends DefaultAsyncProducer {
     private static final transient Logger LOG = LoggerFactory.getLogger(DirectProducer.class);
-    private DirectEndpoint endpoint;
+    private final DirectEndpoint endpoint;
 
     public DirectProducer(DirectEndpoint endpoint) {
         super(endpoint);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java?rev=1450179&r1=1450178&r2=1450179&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java Tue Feb 26 14:13:21 2013
@@ -16,33 +16,53 @@
  */
 package org.apache.camel.component.directvm;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.util.AsyncProcessorConverterHelper;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The direct-vm producer
  */
-public class DirectVmProducer extends DefaultProducer {
+public class DirectVmProducer extends DefaultAsyncProducer {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(DirectVmProducer.class);
+    private DirectVmEndpoint endpoint;
 
     public DirectVmProducer(DirectVmEndpoint endpoint) {
         super(endpoint);
+        this.endpoint = endpoint;
     }
 
     @Override
-    public DirectVmEndpoint getEndpoint() {
-        return (DirectVmEndpoint) super.getEndpoint();
+    public void process(Exchange exchange) throws Exception {
+        // send to consumer
+        DirectVmConsumer consumer = endpoint.getComponent().getConsumer(endpoint);
+        if (consumer == null) {
+            LOG.warn("No consumers available on endpoint: " + endpoint + " to process: " + exchange);
+            throw new CamelExchangeException("No consumers available on endpoint: " + endpoint, exchange);
+        } else {
+            consumer.getProcessor().process(exchange);
+        }
     }
 
     @Override
-    public void process(Exchange exchange) throws Exception {
+    public boolean process(Exchange exchange, AsyncCallback callback) {
         // send to consumer
-        DirectVmConsumer consumer = getEndpoint().getComponent().getConsumer(getEndpoint());
+        DirectVmConsumer consumer = endpoint.getComponent().getConsumer(endpoint);
         if (consumer == null) {
-            log.warn("No consumers available on endpoint: " + getEndpoint() + " to process: " + exchange);
-            throw new CamelExchangeException("No consumers available on endpoint: " + getEndpoint(), exchange);
+            LOG.warn("No consumers available on endpoint: " + endpoint + " to process: " + exchange);
+            exchange.setException(new CamelExchangeException("No consumers available on endpoint: " + endpoint, exchange));
+            callback.done(true);
+            return true;
         } else {
-            consumer.getProcessor().process(exchange);
+            AsyncProcessor processor = AsyncProcessorConverterHelper.convert(consumer.getProcessor());
+            return AsyncProcessorHelper.process(processor, exchange, callback);
         }
     }
 }