You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/12 14:27:18 UTC

svn commit: r933214 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/seda/ main/java/org/apache/camel/impl/ main/java/org/apache/camel/spi/ test/java/org/apache/camel/component/seda/ test/java/org/apache/camel/component/vm/ test/j...

Author: davsclaus
Date: Mon Apr 12 12:27:17 2010
New Revision: 933214

URL: http://svn.apache.org/viewvc?rev=933214&view=rev
Log:
CAMEL-2623: seda/vm now works better in async request/reply with request thread, which waits for the reply is being triggered at the right moment.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOnlyChainedTest.java
      - copied, changed from r933163, camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutBigChainedTest.java
      - copied, changed from r933190, camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTimeoutTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedWithOnCompletionTest.java
      - copied, changed from r933163, camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOnlyChainedTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOutChainedTest.java
      - copied, changed from r933163, camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOutChainedTimeoutTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmTimeoutIssueTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SynchronizationAdapter.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastUnitOfWorkTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java?rev=933214&r1=933213&r2=933214&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java Mon Apr 12 12:27:17 2010
@@ -84,6 +84,13 @@ public class SedaProducer extends Collec
                 }
 
                 @Override
+                public boolean allowHandover() {
+                    // do not allow handover as we want to seda producer to have its completion triggered
+                    // at this point in the routing (at this leg), instead of at the very last (this ensure timeout is honored)
+                    return false;
+                }
+
+                @Override
                 public String toString() {
                     return "onDone at [" + endpoint.getEndpointUri() + "]";
                 }
@@ -99,6 +106,8 @@ public class SedaProducer extends Collec
                 boolean done = latch.await(timeout, TimeUnit.MILLISECONDS);
                 if (!done) {
                     exchange.setException(new ExchangeTimedOutException(exchange, timeout));
+                    // count down to indicate timeout
+                    latch.countDown();
                 }
             } else {
                 if (log.isTraceEnabled()) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=933214&r1=933213&r2=933214&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java Mon Apr 12 12:27:17 2010
@@ -18,6 +18,7 @@ package org.apache.camel.impl;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
@@ -28,6 +29,7 @@ import org.apache.camel.Message;
 import org.apache.camel.Service;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.SynchronizationVetoable;
 import org.apache.camel.spi.TracedRouteNodes;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.util.EventHelper;
@@ -99,7 +101,6 @@ public class DefaultUnitOfWork implement
         if (synchronizations == null) {
             synchronizations = new ArrayList<Synchronization>();
         }
-        // must add to top of list so we run last added first (FILO)
         if (LOG.isTraceEnabled()) {
             LOG.trace("Adding synchronization " + synchronization);
         }
@@ -117,12 +118,29 @@ public class DefaultUnitOfWork implement
             return;
         }
 
-        for (Synchronization synchronization : synchronizations) {
-            target.addOnCompletion(synchronization);
-        }
+        Iterator<Synchronization> it = synchronizations.iterator();
+        while (it.hasNext()) {
+            Synchronization synchronization = it.next();
+
+            boolean handover = true;
+            if (synchronization instanceof SynchronizationVetoable) {
+                SynchronizationVetoable veto = (SynchronizationVetoable) synchronization;
+                handover = veto.allowHandover();
+            }
 
-        // clear this list as its handed over to the other exchange
-        this.synchronizations.clear();
+            if (handover) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Handover synchronization " + synchronization + " to Exchange: " + target);
+                }
+                target.addOnCompletion(synchronization);
+                // remove it if its handed over
+                it.remove();
+            } else {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Handover not allow for synchronization " + synchronization);
+                }
+            }
+        }
     }
 
     public void done(Exchange exchange) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SynchronizationAdapter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SynchronizationAdapter.java?rev=933214&r1=933213&r2=933214&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SynchronizationAdapter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SynchronizationAdapter.java Mon Apr 12 12:27:17 2010
@@ -18,6 +18,7 @@ package org.apache.camel.impl;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.SynchronizationVetoable;
 
 /**
  * Simple {@link Synchronization} adapter with empty methods for easier overriding
@@ -25,7 +26,7 @@ import org.apache.camel.spi.Synchronizat
  *
  * @version $Revision$
  */
-public class SynchronizationAdapter implements Synchronization {
+public class SynchronizationAdapter implements SynchronizationVetoable {
 
     public void onComplete(Exchange exchange) {
         onDone(exchange);
@@ -38,4 +39,10 @@ public class SynchronizationAdapter impl
     public void onDone(Exchange exchange) {
         // noop
     }
+
+    public boolean allowHandover() {
+        // allow by default
+        return true;
+    }
+
 }

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java?rev=933214&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java Mon Apr 12 12:27:17 2010
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+/**
+ * A vetoable {@link org.apache.camel.spi.Synchronization}.
+ * <p/>
+ * When using {@link org.apache.camel.spi.Synchronization} they are normally executed
+ * when the {@link org.apache.camel.Exchange} complete at the end. If the {@link org.apache.camel.Exchange}
+ * is processed asynchronously the {@link org.apache.camel.spi.Synchronization} will be handed
+ * over to the next thread. This ensures for example the file consumer will delete the processed file at the very
+ * end, when the {@link org.apache.camel.Exchange} has been completed succesfully.
+ * <p/>
+ * However there may be situations where you do not want to handover certain {@link org.apache.camel.spi.Synchronization}
+ * , such as when doing asynchronously request/reply over SEDA or VM endpoints.
+ *
+ * @version $Revision$
+ */
+public interface SynchronizationVetoable extends Synchronization {
+
+    /**
+     * Whether or not handover this synchronization is allowed.
+     * <p/>
+     * For example when an {@link org.apache.camel.Exchange} is being routed
+     * from one thread to another thread, such as using toAsync etc.
+     *
+     * @return <tt>true</tt> to allow handover, <tt>false</tt> to deny.
+     */
+    boolean allowHandover();
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/SynchronizationVetoable.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOnlyChainedTest.java (from r933163, camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOnlyChainedTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOnlyChainedTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java&r1=933163&r2=933214&rev=933214&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOnlyChainedTest.java Mon Apr 12 12:27:17 2010
@@ -24,15 +24,14 @@ import static org.apache.camel.language.
 /**
  * @version $Revision$
  */
-public class SedaInOutChainedTest extends ContextTestSupport {
+public class SedaInOnlyChainedTest extends ContextTestSupport {
 
-    public void testInOutSedaChained() throws Exception {
+    public void testInOnlySedaChained() throws Exception {
         getMockEndpoint("mock:a").expectedBodiesReceived("start");
         getMockEndpoint("mock:b").expectedBodiesReceived("start-a");
         getMockEndpoint("mock:c").expectedBodiesReceived("start-a-b");
 
-        String reply = template.requestBody("seda:a", "start", String.class);
-        assertEquals("start-a-b-c", reply);
+        template.sendBody("seda:a", "start");
 
         assertMockEndpointsSatisfied();
     }
@@ -42,12 +41,12 @@ public class SedaInOutChainedTest extend
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("seda:a").to("mock:a").transform(simple("${body}-a")).to("seda:b");
+                from("seda:a").to("mock:a").setBody(simple("${body}-a")).to("seda:b");
 
-                from("seda:b").to("mock:b").transform(simple("${body}-b")).to("seda:c");
+                from("seda:b").to("mock:b").setBody(simple("${body}-b")).to("seda:c");
 
-                from("seda:c").to("mock:c").transform(simple("${body}-c"));
+                from("seda:c").to("mock:c").setBody(simple("${body}-c"));
             }
         };
     }
-}
+}
\ No newline at end of file

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutBigChainedTest.java (from r933190, camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutBigChainedTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutBigChainedTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java&r1=933190&r2=933214&rev=933214&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutBigChainedTest.java Mon Apr 12 12:27:17 2010
@@ -24,15 +24,20 @@ import static org.apache.camel.language.
 /**
  * @version $Revision$
  */
-public class SedaInOutChainedTest extends ContextTestSupport {
+public class SedaInOutBigChainedTest extends ContextTestSupport {
 
-    public void testInOutSedaChained() throws Exception {
+    public void testInOutBigSedaChained() throws Exception {
         getMockEndpoint("mock:a").expectedBodiesReceived("start");
         getMockEndpoint("mock:b").expectedBodiesReceived("start-a");
         getMockEndpoint("mock:c").expectedBodiesReceived("start-a-b");
+        getMockEndpoint("mock:d").expectedBodiesReceived("start-a-b-c");
+        getMockEndpoint("mock:e").expectedBodiesReceived("start-a-b-c-d");
+        getMockEndpoint("mock:f").expectedBodiesReceived("start-a-b-c-d-e");
+        getMockEndpoint("mock:g").expectedBodiesReceived("start-a-b-c-d-e-f");
+        getMockEndpoint("mock:h").expectedBodiesReceived("start-a-b-c-d-e-f-g");
 
         String reply = template.requestBody("seda:a", "start", String.class);
-        assertEquals("start-a-b-c", reply);
+        assertEquals("start-a-b-c-d-e-f-g-h", reply);
 
         assertMockEndpointsSatisfied();
     }
@@ -46,8 +51,18 @@ public class SedaInOutChainedTest extend
 
                 from("seda:b").to("mock:b").transform(simple("${body}-b")).to("seda:c");
 
-                from("seda:c").to("mock:c").transform(simple("${body}-c"));
+                from("seda:c").to("mock:c").transform(simple("${body}-c")).to("seda:d");
+
+                from("seda:d").to("mock:d").transform(simple("${body}-d")).to("seda:e");
+
+                from("seda:e").to("mock:e").transform(simple("${body}-e")).to("seda:f");
+
+                from("seda:f").to("mock:f").transform(simple("${body}-f")).to("seda:g");
+
+                from("seda:g").to("mock:g").transform(simple("${body}-g")).to("seda:h");
+
+                from("seda:h").to("mock:h").transform(simple("${body}-h"));
             }
         };
     }
-}
+}
\ No newline at end of file

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTimeoutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTimeoutTest.java?rev=933214&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTimeoutTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTimeoutTest.java Mon Apr 12 12:27:17 2010
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.StopWatch;
+
+/**
+ * @version $Revision$
+ */
+public class SedaInOutChainedTimeoutTest extends ContextTestSupport {
+
+    public void testSedaInOutChainedTimeout() throws Exception {
+        // time timeout after 2 sec should trigger a immediately reply
+        StopWatch watch = new StopWatch();
+        try {
+            template.requestBody("seda:a?timeout=5000", "Hello World");
+            fail("Should have thrown an exception");
+        } catch (CamelExecutionException e) {
+            ExchangeTimedOutException cause = assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause());
+            assertEquals(2000, cause.getTimeout());
+        }
+        long delta = watch.stop();
+
+        assertTrue("Should be faster than 3000 millis, was: " + delta, delta < 3000);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(noErrorHandler());
+
+                from("seda:a")
+                        .to("mock:a")
+                        // this timeout will trigger an exception to occur
+                        .to("seda:b?timeout=2000")
+                        .to("mock:a2");
+
+                from("seda:b")
+                        .to("mock:b")
+                        .delay(3000)
+                        .transform().constant("Bye World");
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTimeoutTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTimeoutTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedWithOnCompletionTest.java (from r933163, camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedWithOnCompletionTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedWithOnCompletionTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java&r1=933163&r2=933214&rev=933214&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedWithOnCompletionTest.java Mon Apr 12 12:27:17 2010
@@ -17,19 +17,23 @@
 package org.apache.camel.component.seda;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.SynchronizationAdapter;
 
 import static org.apache.camel.language.simple.SimpleLanguage.simple;
 
 /**
  * @version $Revision$
  */
-public class SedaInOutChainedTest extends ContextTestSupport {
+public class SedaInOutChainedWithOnCompletionTest extends ContextTestSupport {
 
-    public void testInOutSedaChained() throws Exception {
+    public void testInOutSedaChainedWithCustomOnCompletion() throws Exception {
         getMockEndpoint("mock:a").expectedBodiesReceived("start");
         getMockEndpoint("mock:b").expectedBodiesReceived("start-a");
-        getMockEndpoint("mock:c").expectedBodiesReceived("start-a-b");
+        // the onCustomCompletion should be send very last (as it will be handed over)
+        getMockEndpoint("mock:c").expectedBodiesReceived("start-a-b", "onCustomCompletion");
 
         String reply = template.requestBody("seda:a", "start", String.class);
         assertEquals("start-a-b-c", reply);
@@ -42,7 +46,17 @@ public class SedaInOutChainedTest extend
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("seda:a").to("mock:a").transform(simple("${body}-a")).to("seda:b");
+                from("seda:a").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        // should come in last
+                        exchange.addOnCompletion(new SynchronizationAdapter() {
+                            @Override
+                            public void onDone(Exchange exchange) {
+                                template.sendBody("mock:c", "onCustomCompletion");
+                            }
+                        });
+                    }
+                }).to("mock:a").transform(simple("${body}-a")).to("seda:b");
 
                 from("seda:b").to("mock:b").transform(simple("${body}-b")).to("seda:c");
 
@@ -50,4 +64,4 @@ public class SedaInOutChainedTest extend
             }
         };
     }
-}
+}
\ No newline at end of file

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOnlyChainedTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOnlyChainedTest.java?rev=933214&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOnlyChainedTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOnlyChainedTest.java Mon Apr 12 12:27:17 2010
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vm;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+import static org.apache.camel.language.simple.SimpleLanguage.simple;
+
+/**
+ * @version $Revision$
+ */
+public class VmInOnlyChainedTest extends ContextTestSupport {
+
+    public void testInOnlyVmChained() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceived("start");
+        getMockEndpoint("mock:b").expectedBodiesReceived("start-a");
+        getMockEndpoint("mock:c").expectedBodiesReceived("start-a-b");
+
+        template.sendBody("vm:a", "start");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("vm:a").to("mock:a").setBody(simple("${body}-a")).to("vm:b");
+
+                from("vm:b").to("mock:b").setBody(simple("${body}-b")).to("vm:c");
+
+                from("vm:c").to("mock:c").setBody(simple("${body}-c"));
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOnlyChainedTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOnlyChainedTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOutChainedTest.java (from r933163, camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOutChainedTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOutChainedTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java&r1=933163&r2=933214&rev=933214&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOutChainedTest.java Mon Apr 12 12:27:17 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.seda;
+package org.apache.camel.component.vm;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
@@ -24,14 +24,14 @@ import static org.apache.camel.language.
 /**
  * @version $Revision$
  */
-public class SedaInOutChainedTest extends ContextTestSupport {
+public class VmInOutChainedTest extends ContextTestSupport {
 
-    public void testInOutSedaChained() throws Exception {
+    public void testInOutVmChained() throws Exception {
         getMockEndpoint("mock:a").expectedBodiesReceived("start");
         getMockEndpoint("mock:b").expectedBodiesReceived("start-a");
         getMockEndpoint("mock:c").expectedBodiesReceived("start-a-b");
 
-        String reply = template.requestBody("seda:a", "start", String.class);
+        String reply = template.requestBody("vm:a", "start", String.class);
         assertEquals("start-a-b-c", reply);
 
         assertMockEndpointsSatisfied();
@@ -42,12 +42,12 @@ public class SedaInOutChainedTest extend
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("seda:a").to("mock:a").transform(simple("${body}-a")).to("seda:b");
+                from("vm:a").to("mock:a").transform(simple("${body}-a")).to("vm:b");
 
-                from("seda:b").to("mock:b").transform(simple("${body}-b")).to("seda:c");
+                from("vm:b").to("mock:b").transform(simple("${body}-b")).to("vm:c");
 
-                from("seda:c").to("mock:c").transform(simple("${body}-c"));
+                from("vm:c").to("mock:c").transform(simple("${body}-c"));
             }
         };
     }
-}
+}
\ No newline at end of file

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOutChainedTimeoutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOutChainedTimeoutTest.java?rev=933214&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOutChainedTimeoutTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOutChainedTimeoutTest.java Mon Apr 12 12:27:17 2010
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vm;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.StopWatch;
+
+/**
+ * @version $Revision$
+ */
+public class VmInOutChainedTimeoutTest extends ContextTestSupport {
+
+    public void testVmInOutChainedTimeout() throws Exception {
+        // time timeout after 2 sec should trigger a immediately reply
+        StopWatch watch = new StopWatch();
+        try {
+            template.requestBody("vm:a?timeout=5000", "Hello World");
+            fail("Should have thrown an exception");
+        } catch (CamelExecutionException e) {
+            ExchangeTimedOutException cause = assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause());
+            assertEquals(2000, cause.getTimeout());
+        }
+        long delta = watch.stop();
+
+        assertTrue("Should be faster than 3000 millis, was: " + delta, delta < 3000);
+
+        Thread.sleep(2000);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(noErrorHandler());
+
+                from("vm:a")
+                        .to("mock:a")
+                        // this timeout will trigger an exception to occur
+                        .to("vm:b?timeout=2000")
+                        .to("mock:a2");
+
+                from("vm:b")
+                        .to("mock:b")
+                        .delay(3000)
+                        .transform().constant("Bye World");
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOutChainedTimeoutTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmInOutChainedTimeoutTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmTimeoutIssueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmTimeoutIssueTest.java?rev=933214&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmTimeoutIssueTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmTimeoutIssueTest.java Mon Apr 12 12:27:17 2010
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vm;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class VmTimeoutIssueTest extends ContextTestSupport {
+
+    public void testVmTimeoutWithAnotherVm() throws Exception {
+        try {
+            template.requestBody("vm:start1?timeout=4000", "Hello");
+            fail("Should have thrown an exception");
+        } catch (CamelExecutionException e) {
+            ExchangeTimedOutException cause = assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause());
+            assertEquals(2000, cause.getTimeout());
+        }
+    }
+
+    public void testVmTimeoutWithProcessor() throws Exception {
+        try {
+            template.requestBody("vm:start2?timeout=4000", "Hello");
+            fail("Should have thrown an exception");
+        } catch (CamelExecutionException e) {
+            ExchangeTimedOutException cause = assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause());
+            assertEquals(2000, cause.getTimeout());
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(noErrorHandler());
+
+                from("vm:start1?timeout=4000")
+                        .to("log:AFTER_START1")
+                        .to("vm:end?timeout=2000")
+                        .to("log:AFTER_END");
+
+                from("vm:start2?timeout=4000")
+                        .to("log:AFTER_START2")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                // this exception will trigger to stop asap
+                                throw new ExchangeTimedOutException(exchange, 2000);
+                            }
+                        })
+                        .to("log:AFTER_PROCESSOR");
+
+                from("vm:end")
+                    .delay(3000).transform().constant("Bye World");
+            }
+        };
+    }
+
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmTimeoutIssueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmTimeoutIssueTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastUnitOfWorkTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastUnitOfWorkTest.java?rev=933214&r1=933213&r2=933214&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastUnitOfWorkTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastUnitOfWorkTest.java Mon Apr 12 12:27:17 2010
@@ -41,10 +41,9 @@ public class MulticastUnitOfWorkTest ext
 
         assertMockEndpointsSatisfied();
 
-        // the UoW is shared for multicast with direct
-        // so B should be the last
-        assertEquals("onCompleteB", sync);
-        assertEquals("onCompleteB", lastOne);
+        // will run B and then A, where A will be the last one
+        assertEquals("onCompleteA", sync);
+        assertEquals("onCompleteA", lastOne);
     }
 
     @Override