You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/18 12:34:58 UTC

svn commit: r955924 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/seda/ main/java/org/apache/camel/impl/ main/java/org/apache/camel/util/ test/java/org/apache/camel/impl/ test/java/org/apache/camel/processor/async/

Author: davsclaus
Date: Fri Jun 18 10:34:57 2010
New Revision: 955924

URL: http://svn.apache.org/viewvc?rev=955924&view=rev
Log:
CAMEL-2723: seda component migrated to support async routing engine.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProducer.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaInOnlyTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaTest.java
      - copied, changed from r955852, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java?rev=955924&r1=955923&r2=955924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java Fri Jun 18 10:34:57 2010
@@ -18,17 +18,17 @@ package org.apache.camel.component.seda;
 
 import java.util.Collection;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
 
 /**
  * A simple {@link org.apache.camel.Producer} which just appends to a {@link Collection} the {@link Exchange} object.
  *
  * @version $Revision$
  */
-public class CollectionProducer extends DefaultProducer implements Processor {
+public class CollectionProducer extends DefaultAsyncProducer {
     protected final Collection<Exchange> queue;
 
     public CollectionProducer(Endpoint endpoint, Collection<Exchange> queue) {
@@ -36,9 +36,10 @@ public class CollectionProducer extends 
         this.queue = queue;
     }
 
-    public void process(Exchange exchange) throws Exception {
+    public boolean process(Exchange exchange, AsyncCallback callback) {
         Exchange copy = exchange.copy();
         queue.add(copy);
+        callback.done(true);
+        return true;
     }
-
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java?rev=955924&r1=955923&r2=955924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java Fri Jun 18 10:34:57 2010
@@ -20,6 +20,7 @@ import java.util.concurrent.BlockingQueu
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.WaitForTaskToComplete;
@@ -42,7 +43,7 @@ public class SedaProducer extends Collec
     }
 
     @Override
-    public void process(final Exchange exchange) throws Exception {
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
         // use a new copy of the exchange to route async and handover the on completion to the new copy
         // so its the new copy that performs the on completion callback when its done
         Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, true);
@@ -103,7 +104,12 @@ public class SedaProducer extends Collec
                     log.trace("Waiting for task to complete using timeout (ms): " + timeout + " at [" + endpoint.getEndpointUri() + "]");
                 }
                 // lets see if we can get the task done before the timeout
-                boolean done = latch.await(timeout, TimeUnit.MILLISECONDS);
+                boolean done = false;
+                try {
+                    done = latch.await(timeout, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
                 if (!done) {
                     exchange.setException(new ExchangeTimedOutException(exchange, timeout));
                     // count down to indicate timeout
@@ -114,12 +120,21 @@ public class SedaProducer extends Collec
                     log.trace("Waiting for task to complete (blocking) at [" + endpoint.getEndpointUri() + "]");
                 }
                 // no timeout then wait until its done
-                latch.await();
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                    // ignore
+                }
             }
         } else {
             // no wait, eg its a InOnly then just add to queue and return
             queue.add(copy);
         }
+
+        // we use OnCompletion on the Exchange to callback and wait for the Exchange to be done
+        // so we should just signal the callback we are done synchronously
+        callback.done(true);
+        return true;
     }
 
     @Override

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProducer.java?rev=955924&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProducer.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProducer.java Fri Jun 18 10:34:57 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.util.AsyncProcessorHelper;
+
+/**
+ * A default implementation of {@link org.apache.camel.Producer} for implementation inheritance,
+ * which can process {@link Exchange}s asynchronously.
+ *
+ * @version $Revision$
+ */
+public abstract class DefaultAsyncProducer extends DefaultProducer implements AsyncProcessor {
+
+    public DefaultAsyncProducer(Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProducer.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java?rev=955924&r1=955923&r2=955924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java Fri Jun 18 10:34:57 2010
@@ -27,7 +27,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 
 /**
- * Helper methods for AsyncProcessor objects.
+ * Helper methods for {@link AsyncProcessor} objects.
  */
 public final class AsyncProcessorHelper {
 
@@ -43,8 +43,8 @@ public final class AsyncProcessorHelper 
     public static void process(AsyncProcessor processor, Exchange exchange) throws Exception {
         final CountDownLatch latch = new CountDownLatch(1);
         boolean sync = processor.process(exchange, new AsyncCallback() {
-            public void done(boolean sync) {
-                if (!sync) {
+            public void done(boolean doneSync) {
+                if (!doneSync) {
                     latch.countDown();
                 }
             }
@@ -61,7 +61,9 @@ public final class AsyncProcessorHelper 
      * @param processor the processor
      * @param exchange  the exchange
      * @return a future handle for the task being executed asynchronously
+     * @deprecated will be removed in Camel 2.5
      */
+    @Deprecated
     public static Future<Exchange> asyncProcess(final ExecutorService executor, final Processor processor, final Exchange exchange) {
         Callable<Exchange> task = new Callable<Exchange>() {
             public Exchange call() throws Exception {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java?rev=955924&r1=955923&r2=955924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java Fri Jun 18 10:34:57 2010
@@ -288,7 +288,7 @@ public class DefaultProducerTemplateAsyn
 
         // produce it async so we use a helper
         Producer producer = endpoint.createProducer();
-        // normally you will use a shared exectutor service with pools
+        // normally you will use a shared executor service with pools
         ExecutorService executor = Executors.newSingleThreadExecutor();
         // send it async with the help of this helper
         Future<Exchange> future = AsyncProcessorHelper.asyncProcess(executor, producer, exchange);

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaInOnlyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaInOnlyTest.java?rev=955924&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaInOnlyTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaInOnlyTest.java Fri Jun 18 10:34:57 2010
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointSedaInOnlyTest extends ContextTestSupport {
+
+    private static String beforeThreadName;
+    private static String afterThreadName;
+    private static String sedaThreadName;
+    private static String route = "";
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        route = "";
+    }
+
+    public void testAsyncEndpoint() throws Exception {
+        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+        template.sendBody("direct:start", "Hello Camel");
+        // we should run before the async processor that sets B
+        route += "A";
+
+        assertMockEndpointsSatisfied();
+
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(sedaThreadName));
+        assertFalse("Should use different threads", afterThreadName.equalsIgnoreCase(sedaThreadName));
+
+        assertEquals("AB", route);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.addComponent("async", new MyAsyncComponent());
+
+                from("direct:start")
+                        .to("mock:before")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                beforeThreadName = Thread.currentThread().getName();
+                            }
+                        })
+                        .to("async:Bye Camel")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                afterThreadName = Thread.currentThread().getName();
+                            }
+                        })
+                        .to("seda:foo");
+
+                from("seda:foo")
+                        .to("mock:after")
+                        .to("log:after")
+                        .delay(1000)
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                route += "B";
+                                sedaThreadName = Thread.currentThread().getName();
+                            }
+                        })
+                        .to("mock:result");
+            }
+        };
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaInOnlyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaInOnlyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaTest.java (from r955852, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=955852&r2=955924&rev=955924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSedaTest.java Fri Jun 18 10:34:57 2010
@@ -24,7 +24,7 @@ import org.apache.camel.builder.RouteBui
 /**
  * @version $Revision$
  */
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointSedaTest extends ContextTestSupport {
 
     private static String beforeThreadName;
     private static String afterThreadName;
@@ -51,7 +51,6 @@ public class AsyncEndpointTest extends C
 
                 from("direct:start")
                         .to("mock:before")
-                        .to("log:before")
                         .process(new Processor() {
                             public void process(Exchange exchange) throws Exception {
                                 beforeThreadName = Thread.currentThread().getName();
@@ -63,11 +62,13 @@ public class AsyncEndpointTest extends C
                                 afterThreadName = Thread.currentThread().getName();
                             }
                         })
-                        .to("log:after")
+                        .to("seda:foo");
+
+                from("seda:foo")
                         .to("mock:after")
                         .to("mock:result");
             }
         };
     }
 
-}
+}
\ No newline at end of file

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java?rev=955924&r1=955923&r2=955924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java Fri Jun 18 10:34:57 2010
@@ -22,46 +22,41 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision$
  */
-public class MyAsyncProducer implements AsyncProcessor, Producer {
+public class MyAsyncProducer extends DefaultAsyncProducer {
 
     private static final Log LOG = LogFactory.getLog(MyAsyncProducer.class);
     private final ExecutorService executor = Executors.newSingleThreadExecutor();
-    private final MyAsyncEndpoint endpoint;
     private final AtomicInteger counter = new AtomicInteger();
 
     public MyAsyncProducer(MyAsyncEndpoint endpoint) {
-        this.endpoint = endpoint;
+        super(endpoint);
     }
 
-    public void process(Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
+    public MyAsyncEndpoint getEndpoint() {
+        return (MyAsyncEndpoint) super.getEndpoint();
     }
 
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         executor.submit(new Callable<Object>() {
             public Object call() throws Exception {
-                LOG.info("Simulating a task which takes " + endpoint.getDelay() + " millis to reply");
-                Thread.sleep(endpoint.getDelay());
+                LOG.info("Simulating a task which takes " + getEndpoint().getDelay() + " millis to reply");
+                Thread.sleep(getEndpoint().getDelay());
 
                 int count = counter.incrementAndGet();
-                if (endpoint.getFailFirstAttempts() >= count) {
+                if (getEndpoint().getFailFirstAttempts() >= count) {
                     LOG.info("Simulating a failure at attempt " + count);
                     exchange.setException(new CamelExchangeException("Simulated error at attempt " + count, exchange));
                 } else {
-                    String reply = endpoint.getReply();
+                    String reply = getEndpoint().getReply();
                     exchange.getOut().setBody(reply);
                     LOG.info("Setting reply " + reply);
                 }
@@ -77,29 +72,4 @@ public class MyAsyncProducer implements 
         return false;
     }
 
-    public MyAsyncEndpoint getEndpoint() {
-        return endpoint;
-    }
-
-    public Exchange createExchange() {
-        return new DefaultExchange(endpoint);
-    }
-
-    public Exchange createExchange(ExchangePattern pattern) {
-        return new DefaultExchange(endpoint, pattern);
-    }
-
-    public Exchange createExchange(Exchange exchange) {
-        return new DefaultExchange(exchange);
-    }
-
-    public void start() throws Exception {
-    }
-
-    public void stop() throws Exception {
-    }
-
-    public boolean isSingleton() {
-        return true;
-    }
 }