You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/09/16 08:09:46 UTC

svn commit: r997593 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/seda/ test/java/org/apache/camel/component/seda/

Author: ningjiang
Date: Thu Sep 16 06:09:46 2010
New Revision: 997593

URL: http://svn.apache.org/viewvc?rev=997593&view=rev
Log:
CAMEL-3119 Seda endpoint now supports to track multip consumers at the runtime

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaEndpointTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=997593&r1=997592&r2=997593&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Thu Sep 16 06:09:46 2010
@@ -51,7 +51,6 @@ public class SedaConsumer extends Servic
     private SedaEndpoint endpoint;
     private AsyncProcessor processor;
     private ExecutorService executor;
-    private MulticastProcessor multicast;
     private ExceptionHandler exceptionHandler;
 
     public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
@@ -155,9 +154,9 @@ public class SedaConsumer extends Servic
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Multicasting to " + endpoint.getConsumers().size() + " consumers for Exchange: " + exchange);
             }
-
+           
             // use a multicast processor to process it
-            MulticastProcessor mp = getMulticastProcessor();
+            MulticastProcessor mp = endpoint.getConumserMulticastProcessor();
 
             // and use the asynchronous routing engine to support it
             AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() {
@@ -175,22 +174,6 @@ public class SedaConsumer extends Servic
         }
     }
 
-    protected synchronized MulticastProcessor getMulticastProcessor() {
-        if (multicast == null) {
-            int size = endpoint.getConsumers().size();
-
-            List<Processor> processors = new ArrayList<Processor>(size);
-            for (SedaConsumer consumer : endpoint.getConsumers()) {
-                processors.add(consumer.getProcessor());
-            }
-
-            ExecutorService multicastExecutor = endpoint.getCamelContext().getExecutorServiceStrategy()
-                                                    .newFixedThreadPool(this, endpoint.getEndpointUri() + "(multicast)", size);
-            multicast = new MulticastProcessor(endpoint.getCamelContext(), processors, null, true, multicastExecutor, false, false, 0);
-        }
-        return multicast;
-    }
-
     protected void doStart() throws Exception {
         int poolSize = endpoint.getConcurrentConsumers();
         executor = endpoint.getCamelContext().getExecutorServiceStrategy()
@@ -206,11 +189,6 @@ public class SedaConsumer extends Servic
         // must shutdown executor on stop to avoid overhead of having them running
         endpoint.getCamelContext().getExecutorServiceStrategy().shutdown(executor);
         executor = null;
-
-        if (multicast != null) {
-            ServiceHelper.stopService(multicast);
-            multicast = null;
-        }
     }
 
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=997593&r1=997592&r2=997593&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Thu Sep 16 06:09:46 2010
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.camel.Component;
@@ -32,6 +33,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.WaitForTaskToComplete;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.spi.BrowsableEndpoint;
 
 /**
@@ -45,11 +47,13 @@ public class SedaEndpoint extends Defaul
     private volatile BlockingQueue<Exchange> queue;
     private int size;
     private int concurrentConsumers = 1;
+    private volatile ExecutorService multicastExecutor;
     private boolean multipleConsumers;
     private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
     private long timeout = 30000;
     private volatile Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>();
     private volatile Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>();
+    private volatile MulticastProcessor conumserMulticastProcessor;
 
     public SedaEndpoint() {
     }
@@ -95,6 +99,30 @@ public class SedaEndpoint extends Defaul
         return queue;
     }
     
+    protected synchronized MulticastProcessor getConumserMulticastProcessor() {
+        return conumserMulticastProcessor;
+    }
+    
+    protected synchronized void updateMulticastProcessor() {
+        int size = getConsumers().size();
+        if (size == 0) {
+            // stop the multicastExecutor
+            multicastExecutor.shutdown();
+            multicastExecutor = null;
+        }
+        if (size == 1 && multicastExecutor == null) {
+            multicastExecutor = getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, getEndpointUri() + "(multicast)");
+        }
+        List<Processor> processors = new ArrayList<Processor>(size);
+        for (SedaConsumer consumer : getConsumers()) {
+            processors.add(consumer.getProcessor());
+        }
+        conumserMulticastProcessor = new MulticastProcessor(getCamelContext(), processors, null, true, multicastExecutor, false, false, 0);
+   
+    }
+    
+    
+    
     public void setQueue(BlockingQueue<Exchange> queue) {
         this.queue = queue;
         this.size = queue.remainingCapacity();
@@ -179,10 +207,12 @@ public class SedaEndpoint extends Defaul
 
     void onStarted(SedaConsumer consumer) {
         consumers.add(consumer);
+        updateMulticastProcessor();
     }
 
     void onStopped(SedaConsumer consumer) {
         consumers.remove(consumer);
+        updateMulticastProcessor();
     }
 
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaEndpointTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaEndpointTest.java?rev=997593&r1=997592&r2=997593&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaEndpointTest.java Thu Sep 16 06:09:46 2010
@@ -35,8 +35,9 @@ public class SedaEndpointTest extends Co
 
     public void testSedaEndpointUnboundedQueue() throws Exception {
         BlockingQueue<Exchange> unbounded = new LinkedBlockingQueue<Exchange>();
-        SedaEndpoint seda = new SedaEndpoint("seda://foo", unbounded);
+        SedaEndpoint seda = new SedaEndpoint("seda://foo", unbounded);        
         assertNotNull(seda);
+        seda.setCamelContext(context);
 
         assertEquals(Integer.MAX_VALUE, seda.getSize());
         assertSame(unbounded, seda.getQueue());
@@ -60,6 +61,7 @@ public class SedaEndpointTest extends Co
     public void testSedaEndpoint() throws Exception {
         SedaEndpoint seda = new SedaEndpoint("seda://foo", queue);
         assertNotNull(seda);
+        seda.setCamelContext(context);
 
         assertEquals(1000, seda.getSize());
         assertSame(queue, seda.getQueue());
@@ -83,6 +85,7 @@ public class SedaEndpointTest extends Co
     public void testSedaEndpointTwo() throws Exception {
         SedaEndpoint seda = new SedaEndpoint("seda://foo", queue, 2);
         assertNotNull(seda);
+        seda.setCamelContext(context);
 
         assertEquals(1000, seda.getSize());
         assertSame(queue, seda.getQueue());
@@ -106,6 +109,8 @@ public class SedaEndpointTest extends Co
     public void testSedaEndpointSetQueue() throws Exception {
         SedaEndpoint seda = new SedaEndpoint();
         assertNotNull(seda);
+        seda.setCamelContext(context);
+        seda.setEndpointUriIfNotSpecified("seda://bar");
         assertNotNull(seda.getQueue());
         // overwrite with a new queue
         seda.setQueue(new ArrayBlockingQueue<Exchange>(1000));

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java?rev=997593&r1=997592&r2=997593&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java Thu Sep 16 06:09:46 2010
@@ -18,6 +18,7 @@ package org.apache.camel.component.seda;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 
 /**
  * @version $Revision$
@@ -33,6 +34,50 @@ public class SedaMultipleConsumersTest e
 
         assertMockEndpointsSatisfied();
     }
+    
+    public void testSedaMultipleConsumersNewAdded() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+        getMockEndpoint("mock:b").expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+        getMockEndpoint("mock:c").expectedMessageCount(0);
+        
+        template.sendBody("seda:foo", "Hello World");
+        template.sendBody("seda:bar", "Bye World");
+        
+        assertMockEndpointsSatisfied();
+        
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo?multipleConsumers=true").id("testRoute").to("mock:c");
+                
+            }
+            
+        });
+        resetMocks();
+        
+        getMockEndpoint("mock:a").expectedMessageCount(20);
+        getMockEndpoint("mock:b").expectedMessageCount(20);
+        getMockEndpoint("mock:c").expectedMessageCount(20);
+       
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:foo", "Hello World");
+            template.sendBody("seda:bar", "Bye World");
+        }
+        assertMockEndpointsSatisfied();
+        resetMocks();
+        
+        context.suspendRoute("testRoute");
+        getMockEndpoint("mock:a").expectedMessageCount(20);
+        getMockEndpoint("mock:b").expectedMessageCount(20);        
+        getMockEndpoint("mock:c").expectedMessageCount(0);
+        
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:foo", "Hello World");
+            template.sendBody("seda:bar", "Bye World");
+        }
+        assertMockEndpointsSatisfied();
+    }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {