You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ch...@apache.org on 2007/09/21 19:33:56 UTC

svn commit: r578206 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/component/seda/ main/java/org/apache/camel/component/vm/ test/java/org/apache/camel/component/file/

Author: chirino
Date: Fri Sep 21 10:33:54 2007
New Revision: 578206

URL: http://svn.apache.org/viewvc?rev=578206&view=rev
Log:
Seda exchange once again completes an exchange when a producer enqueues it's message on the seda queue.. use the thread(x) DSL method if you want to do async processing of an exchange that will be completed later

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=578206&r1=578205&r2=578206&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Fri Sep 21 10:33:54 2007
@@ -31,8 +31,8 @@
  * @version $Revision: 1.1 $
  */
 public class SedaComponent extends DefaultComponent {
-    public BlockingQueue<SedaEndpoint.Entry> createQueue() {
-        return new LinkedBlockingQueue<SedaEndpoint.Entry>(1000);
+    public BlockingQueue<Exchange> createQueue() {
+        return new LinkedBlockingQueue<Exchange>(1000);
     }
 
     @Override

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=578206&r1=578205&r2=578206&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Fri Sep 21 10:33:54 2007
@@ -51,30 +51,17 @@
 
     public void run() {
         while (!isStopping()) {
-            final SedaEndpoint.Entry entry;
+            final Exchange exchange;
             try {
-                entry = endpoint.getQueue().poll(1000, TimeUnit.MILLISECONDS);
+                exchange = endpoint.getQueue().poll(1000, TimeUnit.MILLISECONDS);
             } catch (InterruptedException e) {
                 break;
             }
-            if (entry != null && !isStopping()) {
-                processor.process(entry.getExchange(), new AsyncCallback() {
+            if (exchange != null && !isStopping()) {
+                processor.process(exchange, new AsyncCallback() {
                     public void done(boolean sync) {
-                        if (entry.getCallback() != null) {
-                            entry.getCallback().done(false);
-                        } else {
-                            Throwable e = entry.getExchange().getException();
-                            if (e != null) {
-                                if (e instanceof AlreadyStoppedException) {
-                                    LOG.debug("Ignoring failed message due to shutdown: " + e, e);
-                                } else {
-                                    LOG.error(e);
-                                }
-                            }
-                        }
                     }
                 });
-
             }
         }
     }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=578206&r1=578205&r2=578206&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Fri Sep 21 10:33:54 2007
@@ -39,47 +39,24 @@
  * @version $Revision: 519973 $
  */
 public class SedaEndpoint extends DefaultEndpoint<Exchange> {
-    
-    static public class Entry {
-        Exchange exchange;
-        AsyncCallback callback;
         
-        public Entry(Exchange exchange, AsyncCallback callback) {
-            this.exchange = exchange;
-            this.callback = callback;
-        }
-        
-        public Exchange getExchange() {
-            return exchange;
-        }
-        public void setExchange(Exchange exchange) {
-            this.exchange = exchange;
-        }
-        public AsyncCallback getCallback() {
-            return callback;
-        }
-        public void setCallback(AsyncCallback callback) {
-            this.callback = callback;
-        }
-        
-    }
-    
     private final class SedaProducer extends DefaultProducer implements AsyncProcessor {
         private SedaProducer(Endpoint endpoint) {
             super(endpoint);
         }
         public void process(Exchange exchange) {
-            queue.add(new Entry(createExchange(exchange), null));
+            queue.add(exchange.copy());
         }
         public boolean process(Exchange exchange, AsyncCallback callback) {
-            queue.add(new Entry(createExchange(exchange), callback));
-            return false;
+            queue.add(exchange.copy());
+            callback.done(true);
+            return true;
         }
     }
 
-    private BlockingQueue<Entry> queue;
+    private BlockingQueue<Exchange> queue;
 
-    public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Entry> queue) {
+    public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) {
         super(endpointUri, component);
         this.queue = queue;
     }
@@ -96,7 +73,7 @@
         return new SedaConsumer(this, processor);
     }
 
-    public BlockingQueue<Entry> getQueue() {
+    public BlockingQueue<Exchange> getQueue() {
         return queue;
     }
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java?rev=578206&r1=578205&r2=578206&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java Fri Sep 21 10:33:54 2007
@@ -24,7 +24,6 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.component.seda.SedaComponent;
 import org.apache.camel.component.seda.SedaEndpoint;
-import org.apache.camel.component.seda.SedaEndpoint.Entry;
 
 /**
  * An implementation of the <a href="http://activemq.apache.org/camel/vm.html">VM components</a>
@@ -39,13 +38,13 @@
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception {
-        BlockingQueue<SedaEndpoint.Entry> blockingQueue = getBlockingQueue(uri);
+        BlockingQueue<Exchange> blockingQueue = getBlockingQueue(uri);
         return new SedaEndpoint(uri, this, blockingQueue);
     }
 
-    protected BlockingQueue<Entry> getBlockingQueue(String uri) {
+    protected BlockingQueue<Exchange> getBlockingQueue(String uri) {
         synchronized (queues) {
-            BlockingQueue<Entry> answer = queues.get(uri);
+            BlockingQueue<Exchange> answer = queues.get(uri);
             if (answer == null) {
                 answer = createQueue();
                 queues.put(uri, answer);

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java?rev=578206&r1=578205&r2=578206&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java Fri Sep 21 10:33:54 2007
@@ -76,8 +76,8 @@
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from(uri).to("seda:a");
-                from("seda:a").process(new Processor() {
+                from(uri).thread(1).to("direct:a");
+                from("direct:a").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         file.set((File)exchange.getIn().getBody());
                         // Simulate a processing delay..