You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/07/16 14:14:25 UTC

svn commit: r794648 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/mock/ main/java/org/apache/camel/component/seda/ main/java/org/apache/camel/impl/ test/java/org/apache/camel/component/seda/

Author: davsclaus
Date: Thu Jul 16 12:14:24 2009
New Revision: 794648

URL: http://svn.apache.org/viewvc?rev=794648&view=rev
Log:
CAMEL-1835: added timeout option to seda producer.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=794648&r1=794647&r2=794648&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java Thu Jul 16 12:14:24 2009
@@ -81,7 +81,6 @@
     private Object propertyValue;
     private Object actualProperty;
     private Processor reporter;
-    private int collectMaximumExchanges = -1;
 
     public MockEndpoint(String endpointUri, Component component) {
         super(endpointUri, component);
@@ -768,14 +767,6 @@
         this.reporter = reporter;
     }
 
-    public int getCollectMaximumExchanges() {
-        return collectMaximumExchanges;
-    }
-
-    public void setCollectMaximumExchanges(int collectMaximumExchanges) {
-        this.collectMaximumExchanges = collectMaximumExchanges;
-    }
-
     // Implementation methods
     // -------------------------------------------------------------------------
     private void init() {
@@ -792,7 +783,6 @@
         expectedMinimumCount = -1;
         expectedBodyValues = null;
         actualBodyValues = new ArrayList();
-        collectMaximumExchanges = -1;
     }
 
     protected synchronized void onExchange(Exchange exchange) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=794648&r1=794647&r2=794648&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Thu Jul 16 12:14:24 2009
@@ -45,6 +45,7 @@
     private int size = 1000;
     private int concurrentConsumers = 1;
     private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
+    private long timeout = 30000;
     private Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>();
     private Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>();
 
@@ -72,7 +73,7 @@
     }
     
     public Producer createProducer() throws Exception {
-        return new SedaProducer(this, getQueue(), getWaitForTaskToComplete());
+        return new SedaProducer(this, getQueue(), getWaitForTaskToComplete(), getTimeout());
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
@@ -114,6 +115,14 @@
         this.waitForTaskToComplete = waitForTaskToComplete;
     }
 
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
     public boolean isSingleton() {
         return true;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java?rev=794648&r1=794647&r2=794648&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java Thu Jul 16 12:14:24 2009
@@ -18,8 +18,10 @@
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.WaitForTaskToComplete;
 import org.apache.camel.impl.SynchronizationAdapter;
 import org.apache.camel.util.ExchangeHelper;
@@ -30,11 +32,13 @@
 public class SedaProducer extends CollectionProducer {
     private final SedaEndpoint endpoint;
     private final WaitForTaskToComplete waitForTaskToComplete;
+    private final long timeout;
 
-    public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete) {
+    public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout) {
         super(endpoint, queue);
         this.endpoint = endpoint;
         this.waitForTaskToComplete = waitForTaskToComplete;
+        this.timeout = timeout;
     }
 
     @Override
@@ -53,14 +57,6 @@
         if (wait == WaitForTaskToComplete.Always
             || (wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange))) {
 
-            // only check for if there is a consumer if its the seda endpoint where we exepect a consumer in the same
-            // camel context. If you use the vm component the consumer could be in another camel context.
-            // for seda we want to check that a consumer exists otherwise we end up waiting forever for the response.
-            if (endpoint.getEndpointUri().startsWith("seda") && endpoint.getConsumers().isEmpty()) {
-                throw new IllegalStateException("Cannot send to endpoint: " + endpoint.getEndpointUri() + " as no consumers is registered."
-                    + " With no consumers we end up waiting forever for the reply, as there are no consumers to process our exchange: " + exchange);
-            }
-
             // latch that waits until we are complete
             final CountDownLatch latch = new CountDownLatch(1);
 
@@ -78,7 +74,11 @@
             });
 
             queue.add(copy);
-            latch.await();
+            // lets see if we can get the task done before the timeout
+            boolean done = latch.await(timeout, TimeUnit.MILLISECONDS);
+            if (!done) {
+                exchange.setException(new ExchangeTimedOutException(exchange, timeout));
+            }
         } else {
             // no wait, eg its a InOnly then just add to queue and return
             queue.add(copy);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=794648&r1=794647&r2=794648&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Thu Jul 16 12:14:24 2009
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.impl;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -31,7 +30,6 @@
 import org.apache.camel.Message;
 import org.apache.camel.NoSuchEndpointException;
 import org.apache.camel.Processor;
-import org.apache.camel.Producer;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.ExchangeHelper;

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java?rev=794648&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java Thu Jul 16 12:14:24 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultProducerTemplate;
+
+/**
+ * @version $Revision$
+ */
+public class SedaConcurrentTest extends ContextTestSupport {
+
+    public void testSedaConcurrentInOnly() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(20);
+
+        // should at least take 3 sec 
+        mock.setMinimumResultWaitTime(3000);
+
+        for (int i = 0; i < 20; i++) {
+            template.sendBody("seda:foo", "Message " + i);
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testSedaConcurrentInOnlyWithAsync() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(20);
+
+        // should at least take 3 sec
+        mock.setMinimumResultWaitTime(3000);
+
+        for (int i = 0; i < 20; i++) {
+            template.asyncSendBody("seda:foo", "Message " + i);
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testSedaConcurrentInOut() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(20);
+        mock.allMessages().body().startsWith("Bye");
+
+        // should at least take 3 sec
+        mock.setMinimumResultWaitTime(3000);
+
+        ExecutorService executors = Executors.newFixedThreadPool(10);
+        List<Object> replies = new ArrayList<Object>(20);
+        for (int i = 0; i < 20; i++) {
+            final int num = i;
+            Object out = executors.submit(new Callable<Object>() {
+                public Object call() throws Exception {
+                    return template.requestBody("seda:bar", "Message " + num);
+                }
+            });
+            replies.add(out);
+        }
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(20, replies.size());
+    }
+
+    public void testSedaConcurrentInOutWithAsync() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(20);
+        mock.allMessages().body().startsWith("Bye");
+
+        // should at least take 3 sec
+        mock.setMinimumResultWaitTime(3000);
+
+        // use our own template that has a higher thread pool than default camel that uses 5
+        ProducerTemplate pt = new DefaultProducerTemplate(context, Executors.newFixedThreadPool(10));
+
+        List<Future> replies = new ArrayList<Future>(20);
+        for (int i = 0; i < 20; i++) {
+            Future<Object> out = pt.asyncRequestBody("seda:bar", "Message " + i);
+            replies.add(out);
+        }
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(20, replies.size());
+        for (int i = 0; i < 20; i++) {
+            String out = (String) replies.get(i).get();
+            assertTrue(out.startsWith("Bye"));
+        }
+
+        pt.stop();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo?concurrentConsumers=10")
+                    .to("mock:before").delay(2000).to("mock:result");
+
+                from("seda:bar?concurrentConsumers=10")
+                    .to("mock:before").delay(2000).transform(body().prepend("Bye ")).to("mock:result");
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java?rev=794648&r1=794647&r2=794648&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java Thu Jul 16 12:14:24 2009
@@ -18,6 +18,7 @@
 
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.builder.RouteBuilder;
 
 /**
@@ -33,9 +34,9 @@
     public void testInOut() throws Exception {
         try {
             template.requestBody("direct:start", "Hello World");
+            fail("Should throw an exception");
         } catch (CamelExecutionException e) {
-            assertIsInstanceOf(IllegalStateException.class, e.getCause());
-            assertTrue(e.getCause().getMessage().startsWith("Cannot send to endpoint: seda://foo as no consumers is registered."));
+            assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause());
         }
     }
 
@@ -44,7 +45,7 @@
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start").to("seda:foo");
+                from("direct:start").to("seda:foo?timeout=1000");
             }
         };
     }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java?rev=794648&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java Thu Jul 16 12:14:24 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class SedaTimeoutTest extends ContextTestSupport {
+
+    public void testSedaNoTineout() throws Exception {
+        Future<String> out = template.asyncRequestBody("seda:foo", "World", String.class);
+        assertEquals("Bye World", out.get());
+    }
+
+    public void testSedaTineout() throws Exception {
+        Future<String> out = template.asyncRequestBody("seda:foo?timeout=1000", "World", String.class);
+        try {
+            out.get();
+            fail("Should have thrown an exception");
+        } catch (ExecutionException e) {
+            assertIsInstanceOf(CamelExecutionException.class, e.getCause());
+            assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause().getCause());
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo").to("mock:before").delay(3000).transform(body().prepend("Bye ")).to("mock:result");
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date