You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/07/16 14:14:25 UTC
svn commit: r794648 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/component/mock/
main/java/org/apache/camel/component/seda/ main/java/org/apache/camel/impl/
test/java/org/apache/camel/component/seda/
Author: davsclaus
Date: Thu Jul 16 12:14:24 2009
New Revision: 794648
URL: http://svn.apache.org/viewvc?rev=794648&view=rev
Log:
CAMEL-1835: added timeout option to seda producer.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=794648&r1=794647&r2=794648&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java Thu Jul 16 12:14:24 2009
@@ -81,7 +81,6 @@
private Object propertyValue;
private Object actualProperty;
private Processor reporter;
- private int collectMaximumExchanges = -1;
public MockEndpoint(String endpointUri, Component component) {
super(endpointUri, component);
@@ -768,14 +767,6 @@
this.reporter = reporter;
}
- public int getCollectMaximumExchanges() {
- return collectMaximumExchanges;
- }
-
- public void setCollectMaximumExchanges(int collectMaximumExchanges) {
- this.collectMaximumExchanges = collectMaximumExchanges;
- }
-
// Implementation methods
// -------------------------------------------------------------------------
private void init() {
@@ -792,7 +783,6 @@
expectedMinimumCount = -1;
expectedBodyValues = null;
actualBodyValues = new ArrayList();
- collectMaximumExchanges = -1;
}
protected synchronized void onExchange(Exchange exchange) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=794648&r1=794647&r2=794648&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Thu Jul 16 12:14:24 2009
@@ -45,6 +45,7 @@
private int size = 1000;
private int concurrentConsumers = 1;
private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
+ private long timeout = 30000;
private Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>();
private Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>();
@@ -72,7 +73,7 @@
}
public Producer createProducer() throws Exception {
- return new SedaProducer(this, getQueue(), getWaitForTaskToComplete());
+ return new SedaProducer(this, getQueue(), getWaitForTaskToComplete(), getTimeout());
}
public Consumer createConsumer(Processor processor) throws Exception {
@@ -114,6 +115,14 @@
this.waitForTaskToComplete = waitForTaskToComplete;
}
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
public boolean isSingleton() {
return true;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java?rev=794648&r1=794647&r2=794648&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java Thu Jul 16 12:14:24 2009
@@ -18,8 +18,10 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.WaitForTaskToComplete;
import org.apache.camel.impl.SynchronizationAdapter;
import org.apache.camel.util.ExchangeHelper;
@@ -30,11 +32,13 @@
public class SedaProducer extends CollectionProducer {
private final SedaEndpoint endpoint;
private final WaitForTaskToComplete waitForTaskToComplete;
+ private final long timeout;
- public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete) {
+ public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout) {
super(endpoint, queue);
this.endpoint = endpoint;
this.waitForTaskToComplete = waitForTaskToComplete;
+ this.timeout = timeout;
}
@Override
@@ -53,14 +57,6 @@
if (wait == WaitForTaskToComplete.Always
|| (wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange))) {
- // only check for if there is a consumer if its the seda endpoint where we exepect a consumer in the same
- // camel context. If you use the vm component the consumer could be in another camel context.
- // for seda we want to check that a consumer exists otherwise we end up waiting forever for the response.
- if (endpoint.getEndpointUri().startsWith("seda") && endpoint.getConsumers().isEmpty()) {
- throw new IllegalStateException("Cannot send to endpoint: " + endpoint.getEndpointUri() + " as no consumers is registered."
- + " With no consumers we end up waiting forever for the reply, as there are no consumers to process our exchange: " + exchange);
- }
-
// latch that waits until we are complete
final CountDownLatch latch = new CountDownLatch(1);
@@ -78,7 +74,11 @@
});
queue.add(copy);
- latch.await();
+ // lets see if we can get the task done before the timeout
+ boolean done = latch.await(timeout, TimeUnit.MILLISECONDS);
+ if (!done) {
+ exchange.setException(new ExchangeTimedOutException(exchange, timeout));
+ }
} else {
// no wait, eg its a InOnly then just add to queue and return
queue.add(copy);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=794648&r1=794647&r2=794648&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Thu Jul 16 12:14:24 2009
@@ -16,7 +16,6 @@
*/
package org.apache.camel.impl;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@@ -31,7 +30,6 @@
import org.apache.camel.Message;
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.Processor;
-import org.apache.camel.Producer;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.ExchangeHelper;
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java?rev=794648&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java Thu Jul 16 12:14:24 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultProducerTemplate;
+
+/**
+ * @version $Revision$
+ */
+public class SedaConcurrentTest extends ContextTestSupport {
+
+ public void testSedaConcurrentInOnly() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(20);
+
+ // should at least take 3 sec
+ mock.setMinimumResultWaitTime(3000);
+
+ for (int i = 0; i < 20; i++) {
+ template.sendBody("seda:foo", "Message " + i);
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testSedaConcurrentInOnlyWithAsync() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(20);
+
+ // should at least take 3 sec
+ mock.setMinimumResultWaitTime(3000);
+
+ for (int i = 0; i < 20; i++) {
+ template.asyncSendBody("seda:foo", "Message " + i);
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testSedaConcurrentInOut() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(20);
+ mock.allMessages().body().startsWith("Bye");
+
+ // should at least take 3 sec
+ mock.setMinimumResultWaitTime(3000);
+
+ ExecutorService executors = Executors.newFixedThreadPool(10);
+ List<Object> replies = new ArrayList<Object>(20);
+ for (int i = 0; i < 20; i++) {
+ final int num = i;
+ Object out = executors.submit(new Callable<Object>() {
+ public Object call() throws Exception {
+ return template.requestBody("seda:bar", "Message " + num);
+ }
+ });
+ replies.add(out);
+ }
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals(20, replies.size());
+ }
+
+ public void testSedaConcurrentInOutWithAsync() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(20);
+ mock.allMessages().body().startsWith("Bye");
+
+ // should at least take 3 sec
+ mock.setMinimumResultWaitTime(3000);
+
+ // use our own template that has a higher thread pool than default camel that uses 5
+ ProducerTemplate pt = new DefaultProducerTemplate(context, Executors.newFixedThreadPool(10));
+
+ List<Future> replies = new ArrayList<Future>(20);
+ for (int i = 0; i < 20; i++) {
+ Future<Object> out = pt.asyncRequestBody("seda:bar", "Message " + i);
+ replies.add(out);
+ }
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals(20, replies.size());
+ for (int i = 0; i < 20; i++) {
+ String out = (String) replies.get(i).get();
+ assertTrue(out.startsWith("Bye"));
+ }
+
+ pt.stop();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:foo?concurrentConsumers=10")
+ .to("mock:before").delay(2000).to("mock:result");
+
+ from("seda:bar?concurrentConsumers=10")
+ .to("mock:before").delay(2000).transform(body().prepend("Bye ")).to("mock:result");
+ }
+ };
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java?rev=794648&r1=794647&r2=794648&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java Thu Jul 16 12:14:24 2009
@@ -18,6 +18,7 @@
import org.apache.camel.CamelExecutionException;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.builder.RouteBuilder;
/**
@@ -33,9 +34,9 @@
public void testInOut() throws Exception {
try {
template.requestBody("direct:start", "Hello World");
+ fail("Should throw an exception");
} catch (CamelExecutionException e) {
- assertIsInstanceOf(IllegalStateException.class, e.getCause());
- assertTrue(e.getCause().getMessage().startsWith("Cannot send to endpoint: seda://foo as no consumers is registered."));
+ assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause());
}
}
@@ -44,7 +45,7 @@
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start").to("seda:foo");
+ from("direct:start").to("seda:foo?timeout=1000");
}
};
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java?rev=794648&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java Thu Jul 16 12:14:24 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class SedaTimeoutTest extends ContextTestSupport {
+
+ public void testSedaNoTineout() throws Exception {
+ Future<String> out = template.asyncRequestBody("seda:foo", "World", String.class);
+ assertEquals("Bye World", out.get());
+ }
+
+ public void testSedaTineout() throws Exception {
+ Future<String> out = template.asyncRequestBody("seda:foo?timeout=1000", "World", String.class);
+ try {
+ out.get();
+ fail("Should have thrown an exception");
+ } catch (ExecutionException e) {
+ assertIsInstanceOf(CamelExecutionException.class, e.getCause());
+ assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause().getCause());
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:foo").to("mock:before").delay(3000).transform(body().prepend("Bye ")).to("mock:result");
+ }
+ };
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date