You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/02/14 12:26:29 UTC

svn commit: r744482 - in /camel/branches/camel-1.x: ./ camel-core/src/main/java/org/apache/camel/component/seda/ camel-core/src/test/java/org/apache/camel/component/seda/

Author: davsclaus
Date: Sat Feb 14 11:26:29 2009
New Revision: 744482

URL: http://svn.apache.org/viewvc?rev=744482&view=rev
Log:
Merged revisions 742739 via svnmerge from 
https://svn.apache.org/repos/asf/camel/trunk

........
  r742739 | romkal | 2009-02-09 22:18:15 +0100 (Mon, 09 Feb 2009) | 1 line
  
  CAMEL-1297: Seda component can configure concurrent consumers parameter
........

Modified:
    camel/branches/camel-1.x/   (props changed)
    camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
    camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/seda/SedaConfigureTest.java
    camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerStartStopTest.java

Propchange: camel/branches/camel-1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 14 11:26:29 2009
@@ -1 +1 @@
-/camel/trunk:739733,739904,740251,740295,740306,740596,740663,741848,742231,742705,742854,742856,742898,742906,743613,743762,743773,743920,743959-743960,744123
+/camel/trunk:739733,739904,740251,740295,740306,740596,740663,741848,742231,742705,742739,742854,742856,742898,742906,743613,743762,743773,743920,743959-743960,744123

Propchange: camel/branches/camel-1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=744482&r1=744481&r2=744482&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (original)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Sat Feb 14 11:26:29 2009
@@ -39,6 +39,7 @@
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception {
-        return new SedaEndpoint(uri, this, parameters);
+        int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, 1);
+        return new SedaEndpoint(uri, this, createQueue(uri, parameters), consumers);
     }
 }

Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=744482&r1=744481&r2=744482&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Sat Feb 14 11:26:29 2009
@@ -17,6 +17,9 @@
 package org.apache.camel.component.seda;
 
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.AsyncCallback;
@@ -39,7 +42,7 @@
 
     private SedaEndpoint endpoint;
     private AsyncProcessor processor;
-    private Thread thread;
+    private ExecutorService executor;
 
     public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
         this.endpoint = endpoint;
@@ -88,14 +91,23 @@
     }
 
     protected void doStart() throws Exception {
-        thread = new Thread(this, getThreadName(endpoint.getEndpointUri()));
-        thread.setDaemon(true);
-        thread.start();
+        int concurrentConsumers = endpoint.getConcurrentConsumers();
+        executor = Executors.newFixedThreadPool(concurrentConsumers, new ThreadFactory() {
+        
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, getThreadName(endpoint.getEndpointUri()));
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
+        for (int i = 0; i < concurrentConsumers; i++) {
+            executor.execute(this);
+        }
     }
 
     protected void doStop() throws Exception {
-        thread.join();
-        thread = null;
+        executor.shutdownNow();
+        executor = null;
     }
 
 }

Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=744482&r1=744481&r2=744482&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Sat Feb 14 11:26:29 2009
@@ -18,7 +18,6 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.camel.Component;
@@ -39,22 +38,29 @@
  */
 public class SedaEndpoint extends DefaultEndpoint<Exchange> implements BrowsableEndpoint<Exchange> {
     private BlockingQueue<Exchange> queue;
+    private int concurrentConsumers = 1;
 
     public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) {
+        this(endpointUri, component, queue, 1);
+    }
+
+    public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) {
         super(endpointUri, component);
         this.queue = queue;
+        this.concurrentConsumers = concurrentConsumers;
     }
 
-    public SedaEndpoint(String uri, SedaComponent component, Map parameters) {
-        this(uri, component, component.createQueue(uri, parameters));
+    public SedaEndpoint(String endpointUri, BlockingQueue<Exchange> queue) {
+        this(endpointUri, queue, 1);
     }
 
-    public SedaEndpoint(String endpointUri, BlockingQueue<Exchange> queue) {
+    public SedaEndpoint(String endpointUri, BlockingQueue<Exchange> queue, int concurrentConsumers) {
         super(endpointUri);
         ObjectHelper.notNull(queue, "queue");
         this.queue = queue;
+        this.concurrentConsumers = concurrentConsumers;
     }
-
+    
     public Producer createProducer() throws Exception {
         return new CollectionProducer(this, getQueue());
     }
@@ -66,7 +72,15 @@
     public BlockingQueue<Exchange> getQueue() {
         return queue;
     }
-
+    
+    public void setConcurrentConsumers(int concurrentConsumers) {
+        this.concurrentConsumers = concurrentConsumers;
+    }
+    
+    public int getConcurrentConsumers() {
+        return concurrentConsumers;
+    }
+    
     public boolean isSingleton() {
         return true;
     }

Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/seda/SedaConfigureTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/seda/SedaConfigureTest.java?rev=744482&r1=744481&r2=744482&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/seda/SedaConfigureTest.java (original)
+++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/seda/SedaConfigureTest.java Sat Feb 14 11:26:29 2009
@@ -33,4 +33,9 @@
         LinkedBlockingQueue blockingQueue = assertIsInstanceOf(LinkedBlockingQueue.class, queue);
         assertEquals("remainingCapacity", 2000, blockingQueue.remainingCapacity());
     }
+    
+    public void testConcurrentConsumersConfigured() {
+        SedaEndpoint endpoint = resolveMandatoryEndpoint("seda:foo?concurrentConsumers=5", SedaEndpoint.class);
+        assertEquals("concurrentConsumers", 5, endpoint.getConcurrentConsumers());
+    }
 }

Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerStartStopTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerStartStopTest.java?rev=744482&r1=744481&r2=744482&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerStartStopTest.java (original)
+++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerStartStopTest.java Sat Feb 14 11:26:29 2009
@@ -22,6 +22,7 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.PollingConsumer;
+import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
@@ -31,6 +32,7 @@
  */
 public class SedaConsumerStartStopTest extends ContextTestSupport {
 
+    private static final String SEDA_QUEUE_CONSUMERS_5 = "seda:queue?concurrentConsumers=5";
     private PollingConsumer consumer;
 
     public boolean isUseRouteBuilder() {
@@ -86,4 +88,29 @@
 
         assertMockEndpointsSatisfied();
     }
-}
\ No newline at end of file
+
+    public void testConcurrentConsumers() throws Exception {
+        context.addRoutes(new RouteBuilder(context) {
+        
+            @Override
+            public void configure() throws Exception {
+                from(SEDA_QUEUE_CONSUMERS_5).delayer(500).to("mock:result");
+        
+            }
+        });
+        context.start();
+        
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(10);
+        
+        for (int i = 0; i < 10; i++) {
+            sendBody(SEDA_QUEUE_CONSUMERS_5, i);
+        }
+        
+        Thread.sleep(800);
+        assertEquals(5, mock.getReceivedCounter());
+        
+        Thread.sleep(700);
+        assertEquals(10, mock.getReceivedCounter());
+    }
+}