You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/12/15 11:50:55 UTC

svn commit: r890752 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/seda/ camel-core/src/test/java/org/apache/camel/component/seda/ components/camel-spring/src/test/java/org/apache/camel/spring/example/ components/camel-spring/sr...

Author: davsclaus
Date: Tue Dec 15 10:50:54 2009
New Revision: 890752

URL: http://svn.apache.org/viewvc?rev=890752&view=rev
Log:
CAMEL-2272: Added support for multipleConsumers on seda endpoint for pub-sub style messaging.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java   (with props)
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/AnotherFooEventConsumer.java   (with props)
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventConsumer.java   (with props)
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventRouteTest.java
      - copied, changed from r890306, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/SimpleRouteTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/fooEventRoute.xml
      - copied, changed from r890306, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/simpleRoute.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=890752&r1=890751&r2=890752&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Tue Dec 15 10:50:54 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.seda;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -26,7 +28,9 @@
 import org.apache.camel.Processor;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,6 +46,7 @@
     private SedaEndpoint endpoint;
     private Processor processor;
     private ExecutorService executor;
+    private Processor multicast;
     private ExceptionHandler exceptionHandler;
 
     public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
@@ -69,6 +74,10 @@
         this.exceptionHandler = exceptionHandler;
     }
 
+    public Processor getProcessor() {
+        return processor;
+    }
+
     public void run() {
         BlockingQueue<Exchange> queue = endpoint.getQueue();
         while (queue != null && isRunAllowed()) {
@@ -84,7 +93,7 @@
             if (exchange != null) {
                 if (isRunAllowed()) {
                     try {
-                        processor.process(exchange);
+                        sendToConsumers(exchange);
                     } catch (Exception e) {
                         getExceptionHandler().handleException(e);
                     }
@@ -105,6 +114,51 @@
         }
     }
 
+    /**
+     * Send the given {@link Exchange} to the consumer(s).
+     * <p/>
+     * If multiple consumers then they will each receive a copy of the Exchange.
+     * A multicast processor will send the exchange in parallel to the multiple consumers.
+     * <p/>
+     * If there is only a single consumer then its dispatched directly to it using same thread.
+     * 
+     * @param exchange the exchange
+     * @throws Exception can be thrown if processing of the exchange failed
+     */
+    protected void sendToConsumers(Exchange exchange) throws Exception {
+        int size = endpoint.getConsumers().size();
+
+        // if there are multiple consumers then multicast to them
+        if (size > 1) {
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Multicasting to " + endpoint.getConsumers().size() + " consumers for Exchange: " + exchange);
+            }
+
+            // use a multicast processor to process it
+            Processor mp = getMulticastProcessor();
+            mp.process(exchange);
+        } else {
+            // use the regular processor
+            processor.process(exchange);
+        }
+    }
+
+    protected synchronized Processor getMulticastProcessor() {
+        if (multicast == null) {
+            int size = endpoint.getConsumers().size();
+
+            List<Processor> processors = new ArrayList<Processor>(size);
+            for (SedaConsumer consumer : endpoint.getConsumers()) {
+                processors.add(consumer.getProcessor());
+            }
+
+            ExecutorService multicastExecutor = ExecutorServiceHelper.newFixedThreadPool(size, endpoint.getEndpointUri() + "(multicast)", true);
+            multicast = new MulticastProcessor(processors, null, true, multicastExecutor, false, false);
+        }
+        return multicast;
+    }
+
     protected void doStart() throws Exception {
         int poolSize = endpoint.getConcurrentConsumers();
         executor = ExecutorServiceHelper.newFixedThreadPool(poolSize, endpoint.getEndpointUri(), true);
@@ -118,6 +172,10 @@
         endpoint.onStopped(this);
         executor.shutdownNow();
         executor = null;
+
+        if (multicast != null) {
+            ServiceHelper.stopServices(multicast);
+        }
     }
 
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=890752&r1=890751&r2=890752&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Tue Dec 15 10:50:54 2009
@@ -27,6 +27,7 @@
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
+import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.WaitForTaskToComplete;
@@ -40,10 +41,11 @@
  *
  * @version $Revision$
  */
-public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint {
+public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, MultipleConsumersSupport {
     private volatile BlockingQueue<Exchange> queue;
     private int size = 1000;
     private int concurrentConsumers = 1;
+    private boolean multipleConsumers;
     private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
     private long timeout = 30000;
     private volatile Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>();
@@ -123,6 +125,14 @@
         this.timeout = timeout;
     }
 
+    public boolean isMultipleConsumers() {
+        return multipleConsumers;
+    }
+
+    public void setMultipleConsumers(boolean multipleConsumers) {
+        this.multipleConsumers = multipleConsumers;
+    }
+
     public boolean isSingleton() {
         return true;
     }
@@ -134,6 +144,10 @@
         return new ArrayList<Exchange>(getQueue());
     }
 
+    public boolean isMultipleConsumersSupported() {
+        return isMultipleConsumers();
+    }
+
     /**
      * Returns the current active consumers on this endpoint
      */
@@ -163,4 +177,5 @@
     void onStopped(SedaConsumer consumer) {
         consumers.remove(consumer);
     }
+
 }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java?rev=890752&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java Tue Dec 15 10:50:54 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class SedaMultipleConsumersTest extends ContextTestSupport {
+
+    public void testSedaMultipleConsumers() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+        getMockEndpoint("mock:b").expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+
+        template.sendBody("seda:foo", "Hello World");
+        template.sendBody("seda:bar", "Bye World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo?multipleConsumers=true").to("mock:a");
+
+                from("seda:foo?multipleConsumers=true").to("mock:b");
+
+                from("seda:bar").to("seda:foo?multipleConsumers=true");
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaMultipleConsumersTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/AnotherFooEventConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/AnotherFooEventConsumer.java?rev=890752&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/AnotherFooEventConsumer.java (added)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/AnotherFooEventConsumer.java Tue Dec 15 10:50:54 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.example;
+
+import org.apache.camel.Consume;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+
+/**
+ * @version $Revision$
+ */
+public class AnotherFooEventConsumer {
+
+    @EndpointInject(uri = "mock:result")
+    private ProducerTemplate destination;
+
+    @Consume(ref = "foo")
+    public void doSomething(String body) {
+        destination.sendBody("another" + body);
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/AnotherFooEventConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/AnotherFooEventConsumer.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventConsumer.java?rev=890752&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventConsumer.java (added)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventConsumer.java Tue Dec 15 10:50:54 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.example;
+
+import org.apache.camel.Consume;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+
+/**
+ * @version $Revision$
+ */
+// START SNIPPET: e1
+public class FooEventConsumer {
+
+    @EndpointInject(uri = "mock:result")
+    private ProducerTemplate destination;
+
+    @Consume(ref = "foo")
+    public void doSomething(String body) {
+        destination.sendBody("foo" + body);
+    }
+
+}
+// END SNIPPET: e1

Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventConsumer.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventRouteTest.java (from r890306, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/SimpleRouteTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventRouteTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventRouteTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/SimpleRouteTest.java&r1=890306&r2=890752&rev=890752&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/SimpleRouteTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/example/FooEventRouteTest.java Tue Dec 15 10:50:54 2009
@@ -18,26 +18,31 @@
 
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.spring.SpringTestSupport;
-
 import org.springframework.context.support.AbstractXmlApplicationContext;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 
 /**
  * @version $Revision$
  */
-public class SimpleRouteTest extends SpringTestSupport {
-    protected Object expectedBody = "<hello>world!</hello>";
+public class FooEventRouteTest extends SpringTestSupport {
 
-    public void testSimpleRoute() throws Exception {
+    public void testMultipleConsumersOnSeda() throws Exception {
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedBodiesReceived(expectedBody);
+        resultEndpoint.expectedBodiesReceivedInAnyOrder("fooA", "anotherA", "fooB", "anotherB");
 
-        template.sendBody("direct:start", expectedBody);
+        template.sendBody("seda:foo", "A");
+        template.sendBody("seda:foo", "B");
 
         resultEndpoint.assertIsSatisfied();
     }
+
     protected AbstractXmlApplicationContext createApplicationContext() {
-        return new ClassPathXmlApplicationContext("org/apache/camel/spring/example/simpleRoute.xml");
+        return new ClassPathXmlApplicationContext("org/apache/camel/spring/example/fooEventRoute.xml");
+    }
+
+    @Override
+    protected int getExpectedRouteCount() {
+        return 0;
     }
 
-}
+}
\ No newline at end of file

Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/fooEventRoute.xml (from r890306, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/simpleRoute.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/fooEventRoute.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/fooEventRoute.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/simpleRoute.xml&r1=890306&r2=890752&rev=890752&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/simpleRoute.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/example/fooEventRoute.xml Tue Dec 15 10:50:54 2009
@@ -22,13 +22,16 @@
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
     ">
 
-  <!-- START SNIPPET: example -->
-  <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
-    <route>
-      <from uri="direct:start"/>
-      <to uri="mock:result"/>
-    </route>
-  </camelContext>
-  <!-- END SNIPPET: example -->
+    <!-- START SNIPPET: e1 -->
+    <!-- define the consumers as spring beans -->
+    <bean id="consumer1" class="org.apache.camel.spring.example.FooEventConsumer"/>
+
+    <bean id="consumer2" class="org.apache.camel.spring.example.AnotherFooEventConsumer"/>
+
+    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+        <!-- define a shared endpoint which the consumers can refer to instead of using url -->
+        <endpoint id="foo" uri="seda:foo?multipleConsumers=true"/>
+    </camelContext>
+    <!-- END SNIPPET: e1 -->
 
 </beans>