You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ro...@apache.org on 2009/02/09 22:18:16 UTC
svn commit: r742739 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/component/seda/
test/java/org/apache/camel/component/seda/
Author: romkal
Date: Mon Feb 9 21:18:15 2009
New Revision: 742739
URL: http://svn.apache.org/viewvc?rev=742739&view=rev
Log:
CAMEL-1297: Seda component can configure concurrent consumers parameter
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConfigureTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerStartStopTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=742739&r1=742738&r2=742739&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Mon Feb 9 21:18:15 2009
@@ -39,6 +39,7 @@
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception {
- return new SedaEndpoint(uri, this, parameters);
+ int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, 1);
+ return new SedaEndpoint(uri, this, createQueue(uri, parameters), consumers);
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=742739&r1=742738&r2=742739&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Mon Feb 9 21:18:15 2009
@@ -17,6 +17,10 @@
package org.apache.camel.component.seda;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
@@ -39,7 +43,7 @@
private SedaEndpoint endpoint;
private AsyncProcessor processor;
- private Thread thread;
+ private ExecutorService executor;
public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
this.endpoint = endpoint;
@@ -88,14 +92,23 @@
}
protected void doStart() throws Exception {
- thread = new Thread(this, getThreadName(endpoint.getEndpointUri()));
- thread.setDaemon(true);
- thread.start();
+ int concurrentConsumers = endpoint.getConcurrentConsumers();
+ executor = Executors.newFixedThreadPool(concurrentConsumers, new ThreadFactory() {
+
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, getThreadName(endpoint.getEndpointUri()));
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ for (int i = 0; i < concurrentConsumers; i++) {
+ executor.execute(this);
+ }
}
protected void doStop() throws Exception {
- thread.join();
- thread = null;
+ executor.shutdownNow();
+ executor = null;
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=742739&r1=742738&r2=742739&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Mon Feb 9 21:18:15 2009
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.camel.Component;
@@ -40,24 +41,31 @@
public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint {
private BlockingQueue<Exchange> queue;
private int size = 1000;
+ private int concurrentConsumers = 1;
public SedaEndpoint() {
}
public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) {
+ this(endpointUri, component, queue, 1);
+ }
+
+ public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) {
super(endpointUri, component);
this.queue = queue;
+ this.concurrentConsumers = concurrentConsumers;
}
- public SedaEndpoint(String uri, SedaComponent component, Map parameters) {
- this(uri, component, component.createQueue(uri, parameters));
+ public SedaEndpoint(String endpointUri, BlockingQueue<Exchange> queue) {
+ this(endpointUri, queue, 1);
}
- public SedaEndpoint(String endpointUri, BlockingQueue<Exchange> queue) {
+ public SedaEndpoint(String endpointUri, BlockingQueue<Exchange> queue, int concurrentConsumers) {
super(endpointUri);
this.queue = queue;
+ this.concurrentConsumers = concurrentConsumers;
}
-
+
public Producer createProducer() throws Exception {
return new CollectionProducer(this, getQueue());
}
@@ -72,7 +80,7 @@
}
return queue;
}
-
+
public void setQueue(BlockingQueue<Exchange> queue) {
this.queue = queue;
}
@@ -85,6 +93,14 @@
this.size = size;
}
+ public void setConcurrentConsumers(int concurrentConsumers) {
+ this.concurrentConsumers = concurrentConsumers;
+ }
+
+ public int getConcurrentConsumers() {
+ return concurrentConsumers;
+ }
+
public boolean isSingleton() {
return true;
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConfigureTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConfigureTest.java?rev=742739&r1=742738&r2=742739&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConfigureTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConfigureTest.java Mon Feb 9 21:18:15 2009
@@ -33,4 +33,9 @@
LinkedBlockingQueue blockingQueue = assertIsInstanceOf(LinkedBlockingQueue.class, queue);
assertEquals("remainingCapacity", 2000, blockingQueue.remainingCapacity());
}
+
+ public void testConcurrentConsumersConfigured() {
+ SedaEndpoint endpoint = resolveMandatoryEndpoint("seda:foo?concurrentConsumers=5", SedaEndpoint.class);
+ assertEquals("concurrentConsumers", 5, endpoint.getConcurrentConsumers());
+ }
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerStartStopTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerStartStopTest.java?rev=742739&r1=742738&r2=742739&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerStartStopTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerStartStopTest.java Mon Feb 9 21:18:15 2009
@@ -22,6 +22,7 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
+import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -31,6 +32,7 @@
*/
public class SedaConsumerStartStopTest extends ContextTestSupport {
+ private static final String SEDA_QUEUE_CONSUMERS_5 = "seda:queue?concurrentConsumers=5";
private PollingConsumer consumer;
public boolean isUseRouteBuilder() {
@@ -86,4 +88,29 @@
assertMockEndpointsSatisfied();
}
+
+ public void testConcurrentConsumers() throws Exception {
+ context.addRoutes(new RouteBuilder(context) {
+
+ @Override
+ public void configure() throws Exception {
+ from(SEDA_QUEUE_CONSUMERS_5).delay(500).to("mock:result");
+
+ }
+ });
+ context.start();
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(10);
+
+ for (int i = 0; i < 10; i++) {
+ sendBody(SEDA_QUEUE_CONSUMERS_5, i);
+ }
+
+ Thread.sleep(800);
+ assertEquals(5, mock.getReceivedCounter());
+
+ Thread.sleep(700);
+ assertEquals(10, mock.getReceivedCounter());
+ }
}