You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/09/16 08:09:46 UTC
svn commit: r997593 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/component/seda/
test/java/org/apache/camel/component/seda/
Author: ningjiang
Date: Thu Sep 16 06:09:46 2010
New Revision: 997593
URL: http://svn.apache.org/viewvc?rev=997593&view=rev
Log:
CAMEL-3119 Seda endpoint now supports to track multip consumers at the runtime
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaEndpointTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=997593&r1=997592&r2=997593&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Thu Sep 16 06:09:46 2010
@@ -51,7 +51,6 @@ public class SedaConsumer extends Servic
private SedaEndpoint endpoint;
private AsyncProcessor processor;
private ExecutorService executor;
- private MulticastProcessor multicast;
private ExceptionHandler exceptionHandler;
public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
@@ -155,9 +154,9 @@ public class SedaConsumer extends Servic
if (LOG.isDebugEnabled()) {
LOG.debug("Multicasting to " + endpoint.getConsumers().size() + " consumers for Exchange: " + exchange);
}
-
+
// use a multicast processor to process it
- MulticastProcessor mp = getMulticastProcessor();
+ MulticastProcessor mp = endpoint.getConumserMulticastProcessor();
// and use the asynchronous routing engine to support it
AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() {
@@ -175,22 +174,6 @@ public class SedaConsumer extends Servic
}
}
- protected synchronized MulticastProcessor getMulticastProcessor() {
- if (multicast == null) {
- int size = endpoint.getConsumers().size();
-
- List<Processor> processors = new ArrayList<Processor>(size);
- for (SedaConsumer consumer : endpoint.getConsumers()) {
- processors.add(consumer.getProcessor());
- }
-
- ExecutorService multicastExecutor = endpoint.getCamelContext().getExecutorServiceStrategy()
- .newFixedThreadPool(this, endpoint.getEndpointUri() + "(multicast)", size);
- multicast = new MulticastProcessor(endpoint.getCamelContext(), processors, null, true, multicastExecutor, false, false, 0);
- }
- return multicast;
- }
-
protected void doStart() throws Exception {
int poolSize = endpoint.getConcurrentConsumers();
executor = endpoint.getCamelContext().getExecutorServiceStrategy()
@@ -206,11 +189,6 @@ public class SedaConsumer extends Servic
// must shutdown executor on stop to avoid overhead of having them running
endpoint.getCamelContext().getExecutorServiceStrategy().shutdown(executor);
executor = null;
-
- if (multicast != null) {
- ServiceHelper.stopService(multicast);
- multicast = null;
- }
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=997593&r1=997592&r2=997593&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Thu Sep 16 06:09:46 2010
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.camel.Component;
@@ -32,6 +33,7 @@ import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.WaitForTaskToComplete;
import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.processor.MulticastProcessor;
import org.apache.camel.spi.BrowsableEndpoint;
/**
@@ -45,11 +47,13 @@ public class SedaEndpoint extends Defaul
private volatile BlockingQueue<Exchange> queue;
private int size;
private int concurrentConsumers = 1;
+ private volatile ExecutorService multicastExecutor;
private boolean multipleConsumers;
private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
private long timeout = 30000;
private volatile Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>();
private volatile Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>();
+ private volatile MulticastProcessor conumserMulticastProcessor;
public SedaEndpoint() {
}
@@ -95,6 +99,30 @@ public class SedaEndpoint extends Defaul
return queue;
}
+ protected synchronized MulticastProcessor getConumserMulticastProcessor() {
+ return conumserMulticastProcessor;
+ }
+
+ protected synchronized void updateMulticastProcessor() {
+ int size = getConsumers().size();
+ if (size == 0) {
+ // stop the multicastExecutor
+ multicastExecutor.shutdown();
+ multicastExecutor = null;
+ }
+ if (size == 1 && multicastExecutor == null) {
+ multicastExecutor = getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, getEndpointUri() + "(multicast)");
+ }
+ List<Processor> processors = new ArrayList<Processor>(size);
+ for (SedaConsumer consumer : getConsumers()) {
+ processors.add(consumer.getProcessor());
+ }
+ conumserMulticastProcessor = new MulticastProcessor(getCamelContext(), processors, null, true, multicastExecutor, false, false, 0);
+
+ }
+
+
+
public void setQueue(BlockingQueue<Exchange> queue) {
this.queue = queue;
this.size = queue.remainingCapacity();
@@ -179,10 +207,12 @@ public class SedaEndpoint extends Defaul
void onStarted(SedaConsumer consumer) {
consumers.add(consumer);
+ updateMulticastProcessor();
}
void onStopped(SedaConsumer consumer) {
consumers.remove(consumer);
+ updateMulticastProcessor();
}
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaEndpointTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaEndpointTest.java?rev=997593&r1=997592&r2=997593&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaEndpointTest.java Thu Sep 16 06:09:46 2010
@@ -35,8 +35,9 @@ public class SedaEndpointTest extends Co
public void testSedaEndpointUnboundedQueue() throws Exception {
BlockingQueue<Exchange> unbounded = new LinkedBlockingQueue<Exchange>();
- SedaEndpoint seda = new SedaEndpoint("seda://foo", unbounded);
+ SedaEndpoint seda = new SedaEndpoint("seda://foo", unbounded);
assertNotNull(seda);
+ seda.setCamelContext(context);
assertEquals(Integer.MAX_VALUE, seda.getSize());
assertSame(unbounded, seda.getQueue());
@@ -60,6 +61,7 @@ public class SedaEndpointTest extends Co
public void testSedaEndpoint() throws Exception {
SedaEndpoint seda = new SedaEndpoint("seda://foo", queue);
assertNotNull(seda);
+ seda.setCamelContext(context);
assertEquals(1000, seda.getSize());
assertSame(queue, seda.getQueue());
@@ -83,6 +85,7 @@ public class SedaEndpointTest extends Co
public void testSedaEndpointTwo() throws Exception {
SedaEndpoint seda = new SedaEndpoint("seda://foo", queue, 2);
assertNotNull(seda);
+ seda.setCamelContext(context);
assertEquals(1000, seda.getSize());
assertSame(queue, seda.getQueue());
@@ -106,6 +109,8 @@ public class SedaEndpointTest extends Co
public void testSedaEndpointSetQueue() throws Exception {
SedaEndpoint seda = new SedaEndpoint();
assertNotNull(seda);
+ seda.setCamelContext(context);
+ seda.setEndpointUriIfNotSpecified("seda://bar");
assertNotNull(seda.getQueue());
// overwrite with a new queue
seda.setQueue(new ArrayBlockingQueue<Exchange>(1000));
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java?rev=997593&r1=997592&r2=997593&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java Thu Sep 16 06:09:46 2010
@@ -18,6 +18,7 @@ package org.apache.camel.component.seda;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
/**
* @version $Revision$
@@ -33,6 +34,50 @@ public class SedaMultipleConsumersTest e
assertMockEndpointsSatisfied();
}
+
+ public void testSedaMultipleConsumersNewAdded() throws Exception {
+ getMockEndpoint("mock:a").expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+ getMockEndpoint("mock:b").expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+ getMockEndpoint("mock:c").expectedMessageCount(0);
+
+ template.sendBody("seda:foo", "Hello World");
+ template.sendBody("seda:bar", "Bye World");
+
+ assertMockEndpointsSatisfied();
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:foo?multipleConsumers=true").id("testRoute").to("mock:c");
+
+ }
+
+ });
+ resetMocks();
+
+ getMockEndpoint("mock:a").expectedMessageCount(20);
+ getMockEndpoint("mock:b").expectedMessageCount(20);
+ getMockEndpoint("mock:c").expectedMessageCount(20);
+
+
+ for (int i = 0; i < 10; i++) {
+ template.sendBody("seda:foo", "Hello World");
+ template.sendBody("seda:bar", "Bye World");
+ }
+ assertMockEndpointsSatisfied();
+ resetMocks();
+
+ context.suspendRoute("testRoute");
+ getMockEndpoint("mock:a").expectedMessageCount(20);
+ getMockEndpoint("mock:b").expectedMessageCount(20);
+ getMockEndpoint("mock:c").expectedMessageCount(0);
+
+ for (int i = 0; i < 10; i++) {
+ template.sendBody("seda:foo", "Hello World");
+ template.sendBody("seda:bar", "Bye World");
+ }
+ assertMockEndpointsSatisfied();
+ }
@Override
protected RouteBuilder createRouteBuilder() throws Exception {