You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ge...@apache.org on 2008/09/09 11:07:56 UTC

svn commit: r693409 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/ test/java/org/apache/camel/processor/

Author: gertv
Date: Tue Sep  9 02:07:54 2008
New Revision: 693409

URL: http://svn.apache.org/viewvc?rev=693409&view=rev
Log:
CAMEL-876: added streaming() to the splitter DSL to send messages on the fly as the data becomes available

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java?rev=693409&r1=693408&r2=693409&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java Tue Sep  9 02:07:54 2008
@@ -48,6 +48,8 @@
     private Boolean parallelProcessing;
     @XmlTransient
     private ThreadPoolExecutor threadPoolExecutor;
+    @XmlAttribute(required = false)
+    private Boolean streaming = false;
     
     public SplitterType() {
     }
@@ -80,7 +82,7 @@
             threadPoolExecutor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
         }
         return new Splitter(getExpression().createExpression(routeContext), childProcessor, aggregationStrategy,
-                isParallelProcessing(), threadPoolExecutor);
+                isParallelProcessing(), threadPoolExecutor, streaming);
     }
     
     public AggregationStrategy getAggregationStrategy() {
@@ -98,6 +100,30 @@
     public void setParallelProcessing(boolean parallelProcessing) {
         this.parallelProcessing = parallelProcessing;
     }
+    
+    /**
+     * The splitter should use streaming -- exchanges are being sent as the data for them becomes available.
+     * This improves throughput and memory usage, but it has a drawback: 
+     * - the sent exchanges will no longer contain the {@link Splitter#SPLIT_SIZE} header property 
+     * 
+     * @return 
+     */
+    public boolean getStreaming() {
+        return streaming != null ? streaming : false;
+    }
+
+    public void setStreaming(boolean streaming) {
+        this.streaming = streaming;
+    }
+    
+    /**
+     * Enables streaming. 
+     * Cfr. {@link SplitterType#setStreaming(boolean)} for more information
+     */
+    public SplitterType streaming() {
+        setStreaming(true);
+        return this;
+    }
 
     public ThreadPoolExecutor getThreadPoolExecutor() {
         return threadPoolExecutor;

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=693409&r1=693408&r2=693409&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Tue Sep  9 02:07:54 2008
@@ -31,6 +31,7 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.converter.CollectionConverter;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.util.ExchangeHelper;
@@ -138,17 +139,19 @@
     public void process(Exchange exchange) throws Exception {
         Exchange result = null;
 
-        List<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange);
+        Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange);
         
         // Parallel Processing the producer
         if (isParallelProcessing) {
-            Exchange[] exchanges = new Exchange[pairs.size()];
-            final CountDownLatch completedExchanges = new CountDownLatch(pairs.size());
+            //TODO: make a dynamic countdown latch to avoid having to convert back to list
+            List<ProcessorExchangePair> allPairs = CollectionConverter.toList(pairs);
+            Exchange[] exchanges = new Exchange[allPairs.size()];
+            final CountDownLatch completedExchanges = new CountDownLatch(allPairs.size());
             int i = 0;
             for (ProcessorExchangePair pair : pairs) {
                 Processor producer = pair.getProcessor();
                 exchanges[i] = pair.getExchange();
-                updateNewExchange(exchanges[i], i, pairs);
+                updateNewExchange(exchanges[i], i, allPairs);
                 ProcessCall call = new ProcessCall(exchanges[i], producer, new AsyncCallback() {
                     public void done(boolean doneSynchronously) {
                         completedExchanges.countDown();
@@ -193,11 +196,11 @@
         }
     }
 
-    protected void updateNewExchange(Exchange exchange, int i, List<ProcessorExchangePair> allPairs) {
+    protected void updateNewExchange(Exchange exchange, int i, Iterable<ProcessorExchangePair> allPairs) {
         // No updates needed
     }
 
-    protected List<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) {
+    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) {
         List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size());
         Processor[] processorsArray = processors.toArray(new Processor[processors.size()]);
         for (int i = 0; i < processorsArray.length; i++) {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=693409&r1=693408&r2=693409&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Tue Sep  9 02:07:54 2008
@@ -16,7 +16,10 @@
  */
 package org.apache.camel.processor;
 
+import static org.apache.camel.util.ObjectHelper.notNull;
+
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -26,13 +29,10 @@
 import org.apache.camel.Expression;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.converter.ObjectConverter;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.util.CollectionHelper;
 import org.apache.camel.util.ObjectHelper;
 
-import static org.apache.camel.util.ObjectHelper.notNull;
-
 /**
  * Implements a dynamic <a
  * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> pattern
@@ -46,17 +46,19 @@
     public static final String SPLIT_COUNTER = "org.apache.camel.splitCounter";
 
     private final Expression expression;
+    private final boolean streaming;
 
     public Splitter(Expression expression, Processor destination, AggregationStrategy aggregationStrategy) {
-        this(expression, destination, aggregationStrategy, false, null);
+        this(expression, destination, aggregationStrategy, false, null, false);
     }
 
     public Splitter(Expression expression, Processor destination,
             AggregationStrategy aggregationStrategy,
-            boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) {
+            boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor, boolean streaming) {
         super(Collections.singleton(destination), aggregationStrategy, parallelProcessing, threadPoolExecutor);
 
         this.expression = expression;
+        this.streaming = streaming;
         notNull(expression, "expression");
         notNull(destination, "destination");
     }
@@ -67,10 +69,47 @@
     }
 
     @Override
-    protected List<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) {
+    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) {
         Object value = expression.evaluate(exchange);
-        Integer collectionSize = CollectionHelper.size(value);
+        
+        if (streaming) {
+            return createProcessorExchangePairsIterable(exchange, value);
+        } else {
+            return createProcessorExchangePairsList(exchange, value);
+        }
+    }
+
+    private Iterable<ProcessorExchangePair> createProcessorExchangePairsIterable(final Exchange exchange, Object value) {
+        final Iterator iterator = ObjectHelper.createIterator(value);
+        return new Iterable() {
+
+            public Iterator iterator() {
+                return new Iterator() {
+
+                    public boolean hasNext() {
+                        return iterator.hasNext();
+                    }
+
+                    public Object next() {
+                        Object part = iterator.next();
+                        Exchange newExchange = exchange.copy();
+                        Message in = newExchange.getIn();
+                        in.setBody(part);
+                        return new ProcessorExchangePair(getProcessors().iterator().next(), newExchange);
+                    }
+
+                    public void remove() {
+                        throw new UnsupportedOperationException("remove is not supported by this iterator");
+                    }
+                };
+            }
+            
+        };
+    }
+
+    private Iterable<ProcessorExchangePair> createProcessorExchangePairsList(Exchange exchange, Object value) {
         List<ProcessorExchangePair> result;
+        Integer collectionSize = CollectionHelper.size(value);
         if (collectionSize != null) {
             result = new ArrayList<ProcessorExchangePair>(collectionSize);
         } else {
@@ -88,8 +127,10 @@
     }
 
     @Override
-    protected void updateNewExchange(Exchange exchange, int i, List<ProcessorExchangePair> allPairs) {
+    protected void updateNewExchange(Exchange exchange, int i, Iterable<ProcessorExchangePair> allPairs) {
         exchange.getIn().setHeader(SPLIT_COUNTER, i);
-        exchange.getIn().setHeader(SPLIT_SIZE, allPairs.size());
+        if (allPairs instanceof Collection) {
+            exchange.getIn().setHeader(SPLIT_SIZE, ((Collection) allPairs).size());
+        }
     }
 }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java?rev=693409&r1=693408&r2=693409&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java Tue Sep  9 02:07:54 2008
@@ -271,8 +271,17 @@
 
     /**
      * Asserts that all the expectations of the Mock endpoints are valid
+     * 
+     * @deprecated use {{@link #assertMockEndpointsSatisfied()} instead
      */
     protected void assertMockEndpointsSatisifed() throws InterruptedException {
+        assertMockEndpointsSatisfied();
+    }
+
+    /**
+     * Asserts that all the expectations of the Mock endpoints are valid
+     */
+    protected void assertMockEndpointsSatisfied() throws InterruptedException {
         MockEndpoint.assertIsSatisfied(context);
     }
 

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java?rev=693409&r1=693408&r2=693409&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java Tue Sep  9 02:07:54 2008
@@ -137,12 +137,35 @@
         assertMessageHeader(out, "foo", "bar");
         assertEquals((Integer)5, result.getProperty("aggregated", Integer.class));
     }
+    
+    public void testSplitterWithStreaming() throws Exception {
+        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+        resultEndpoint.expectedMessageCount(5);
+        resultEndpoint.expectedHeaderReceived("foo", "bar");
+
+        Exchange result = template.send("direct:streaming", new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody("James,Guillaume,Hiram,Rob,Roman");
+                in.setHeader("foo", "bar");
+            }
+        });
+        
+        assertMockEndpointsSatisfied();
+        for (Exchange exchange : resultEndpoint.getReceivedExchanges()) {
+            assertNotNull(exchange.getIn().getHeader(Splitter.SPLIT_COUNTER));
+            //this header can not be set when streaming is used
+            assertNull(exchange.getIn().getHeader(Splitter.SPLIT_SIZE));
+        }
+
+    }
 
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
                 from("direct:seqential").splitter(body().tokenize(","), new UseLatestAggregationStrategy()).to("mock:result");
                 from("direct:parallel").splitter(body().tokenize(","), new MyAggregationStrategy(), true).to("mock:result");
+                from("direct:streaming").splitter(body().tokenize(",")).streaming().to("mock:result");
             }
         };
     }