You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/12/15 11:50:55 UTC
svn commit: r890752 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/component/seda/
camel-core/src/test/java/org/apache/camel/component/seda/
components/camel-spring/src/test/java/org/apache/camel/spring/example/
components/camel-spring/sr...
Author: davsclaus
Date: Tue Dec 15 10:50:54 2009
New Revision: 890752
URL: http://svn.apache.org/viewvc?rev=890752&view=rev
Log:
CAMEL-2272: Added support for multipleConsumers on seda endpoint for pub-sub style messaging.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java (with props)
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/AnotherFooEventConsumer.java (with props)
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventConsumer.java (with props)
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventRouteTest.java
- copied, changed from r890306, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/SimpleRouteTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/fooEventRoute.xml
- copied, changed from r890306, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/simpleRoute.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=890752&r1=890751&r2=890752&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Tue Dec 15 10:50:54 2009
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.seda;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -26,7 +28,9 @@
import org.apache.camel.Processor;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.processor.MulticastProcessor;
import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,6 +46,7 @@
private SedaEndpoint endpoint;
private Processor processor;
private ExecutorService executor;
+ private Processor multicast;
private ExceptionHandler exceptionHandler;
public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
@@ -69,6 +74,10 @@
this.exceptionHandler = exceptionHandler;
}
+ public Processor getProcessor() {
+ return processor;
+ }
+
public void run() {
BlockingQueue<Exchange> queue = endpoint.getQueue();
while (queue != null && isRunAllowed()) {
@@ -84,7 +93,7 @@
if (exchange != null) {
if (isRunAllowed()) {
try {
- processor.process(exchange);
+ sendToConsumers(exchange);
} catch (Exception e) {
getExceptionHandler().handleException(e);
}
@@ -105,6 +114,51 @@
}
}
+ /**
+ * Send the given {@link Exchange} to the consumer(s).
+ * <p/>
+ * If multiple consumers then they will each receive a copy of the Exchange.
+ * A multicast processor will send the exchange in parallel to the multiple consumers.
+ * <p/>
+ * If there is only a single consumer then its dispatched directly to it using same thread.
+ *
+ * @param exchange the exchange
+ * @throws Exception can be thrown if processing of the exchange failed
+ */
+ protected void sendToConsumers(Exchange exchange) throws Exception {
+ int size = endpoint.getConsumers().size();
+
+ // if there are multiple consumers then multicast to them
+ if (size > 1) {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Multicasting to " + endpoint.getConsumers().size() + " consumers for Exchange: " + exchange);
+ }
+
+ // use a multicast processor to process it
+ Processor mp = getMulticastProcessor();
+ mp.process(exchange);
+ } else {
+ // use the regular processor
+ processor.process(exchange);
+ }
+ }
+
+ protected synchronized Processor getMulticastProcessor() {
+ if (multicast == null) {
+ int size = endpoint.getConsumers().size();
+
+ List<Processor> processors = new ArrayList<Processor>(size);
+ for (SedaConsumer consumer : endpoint.getConsumers()) {
+ processors.add(consumer.getProcessor());
+ }
+
+ ExecutorService multicastExecutor = ExecutorServiceHelper.newFixedThreadPool(size, endpoint.getEndpointUri() + "(multicast)", true);
+ multicast = new MulticastProcessor(processors, null, true, multicastExecutor, false, false);
+ }
+ return multicast;
+ }
+
protected void doStart() throws Exception {
int poolSize = endpoint.getConcurrentConsumers();
executor = ExecutorServiceHelper.newFixedThreadPool(poolSize, endpoint.getEndpointUri(), true);
@@ -118,6 +172,10 @@
endpoint.onStopped(this);
executor.shutdownNow();
executor = null;
+
+ if (multicast != null) {
+ ServiceHelper.stopServices(multicast);
+ }
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=890752&r1=890751&r2=890752&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Tue Dec 15 10:50:54 2009
@@ -27,6 +27,7 @@
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
+import org.apache.camel.MultipleConsumersSupport;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.WaitForTaskToComplete;
@@ -40,10 +41,11 @@
*
* @version $Revision$
*/
-public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint {
+public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, MultipleConsumersSupport {
private volatile BlockingQueue<Exchange> queue;
private int size = 1000;
private int concurrentConsumers = 1;
+ private boolean multipleConsumers;
private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
private long timeout = 30000;
private volatile Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>();
@@ -123,6 +125,14 @@
this.timeout = timeout;
}
+ public boolean isMultipleConsumers() {
+ return multipleConsumers;
+ }
+
+ public void setMultipleConsumers(boolean multipleConsumers) {
+ this.multipleConsumers = multipleConsumers;
+ }
+
public boolean isSingleton() {
return true;
}
@@ -134,6 +144,10 @@
return new ArrayList<Exchange>(getQueue());
}
+ public boolean isMultipleConsumersSupported() {
+ return isMultipleConsumers();
+ }
+
/**
* Returns the current active consumers on this endpoint
*/
@@ -163,4 +177,5 @@
void onStopped(SedaConsumer consumer) {
consumers.remove(consumer);
}
+
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java?rev=890752&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java Tue Dec 15 10:50:54 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class SedaMultipleConsumersTest extends ContextTestSupport {
+
+ public void testSedaMultipleConsumers() throws Exception {
+ getMockEndpoint("mock:a").expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+ getMockEndpoint("mock:b").expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+
+ template.sendBody("seda:foo", "Hello World");
+ template.sendBody("seda:bar", "Bye World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:foo?multipleConsumers=true").to("mock:a");
+
+ from("seda:foo?multipleConsumers=true").to("mock:b");
+
+ from("seda:bar").to("seda:foo?multipleConsumers=true");
+ }
+ };
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/AnotherFooEventConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/AnotherFooEventConsumer.java?rev=890752&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/AnotherFooEventConsumer.java (added)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/AnotherFooEventConsumer.java Tue Dec 15 10:50:54 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.example;
+
+import org.apache.camel.Consume;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+
+/**
+ * @version $Revision$
+ */
+public class AnotherFooEventConsumer {
+
+ @EndpointInject(uri = "mock:result")
+ private ProducerTemplate destination;
+
+ @Consume(ref = "foo")
+ public void doSomething(String body) {
+ destination.sendBody("another" + body);
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/AnotherFooEventConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/AnotherFooEventConsumer.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventConsumer.java?rev=890752&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventConsumer.java (added)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventConsumer.java Tue Dec 15 10:50:54 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.example;
+
+import org.apache.camel.Consume;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+
+/**
+ * @version $Revision$
+ */
+// START SNIPPET: e1
+public class FooEventConsumer {
+
+ @EndpointInject(uri = "mock:result")
+ private ProducerTemplate destination;
+
+ @Consume(ref = "foo")
+ public void doSomething(String body) {
+ destination.sendBody("foo" + body);
+ }
+
+}
+// END SNIPPET: e1
Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventConsumer.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventRouteTest.java (from r890306, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/SimpleRouteTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventRouteTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventRouteTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/SimpleRouteTest.java&r1=890306&r2=890752&rev=890752&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/SimpleRouteTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventRouteTest.java Tue Dec 15 10:50:54 2009
@@ -18,26 +18,31 @@
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spring.SpringTestSupport;
-
import org.springframework.context.support.AbstractXmlApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @version $Revision$
*/
-public class SimpleRouteTest extends SpringTestSupport {
- protected Object expectedBody = "<hello>world!</hello>";
+public class FooEventRouteTest extends SpringTestSupport {
- public void testSimpleRoute() throws Exception {
+ public void testMultipleConsumersOnSeda() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
- resultEndpoint.expectedBodiesReceived(expectedBody);
+ resultEndpoint.expectedBodiesReceivedInAnyOrder("fooA", "anotherA", "fooB", "anotherB");
- template.sendBody("direct:start", expectedBody);
+ template.sendBody("seda:foo", "A");
+ template.sendBody("seda:foo", "B");
resultEndpoint.assertIsSatisfied();
}
+
protected AbstractXmlApplicationContext createApplicationContext() {
- return new ClassPathXmlApplicationContext("org/apache/camel/spring/example/simpleRoute.xml");
+ return new ClassPathXmlApplicationContext("org/apache/camel/spring/example/fooEventRoute.xml");
+ }
+
+ @Override
+ protected int getExpectedRouteCount() {
+ return 0;
}
-}
+}
\ No newline at end of file
Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/fooEventRoute.xml (from r890306, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/simpleRoute.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/fooEventRoute.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/fooEventRoute.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/simpleRoute.xml&r1=890306&r2=890752&rev=890752&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/simpleRoute.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/fooEventRoute.xml Tue Dec 15 10:50:54 2009
@@ -22,13 +22,16 @@
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
">
- <!-- START SNIPPET: example -->
- <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
- <route>
- <from uri="direct:start"/>
- <to uri="mock:result"/>
- </route>
- </camelContext>
- <!-- END SNIPPET: example -->
+ <!-- START SNIPPET: e1 -->
+ <!-- define the consumers as spring beans -->
+ <bean id="consumer1" class="org.apache.camel.spring.example.FooEventConsumer"/>
+
+ <bean id="consumer2" class="org.apache.camel.spring.example.AnotherFooEventConsumer"/>
+
+ <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+ <!-- define a shared endpoint which the consumers can refer to instead of using url -->
+ <endpoint id="foo" uri="seda:foo?multipleConsumers=true"/>
+ </camelContext>
+ <!-- END SNIPPET: e1 -->
</beans>