You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/02/17 17:44:38 UTC

svn commit: r745139 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/file/ main/java/org/apache/camel/impl/ main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/aggregator/

Author: davsclaus
Date: Tue Feb 17 16:44:38 2009
New Revision: 745139

URL: http://svn.apache.org/viewvc?rev=745139&view=rev
Log:
CAMEL-971: Introduced GroupedExchange for aggregate N exchanges into 1 single grouped exchange holding the X exchanges.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java?rev=745139&r1=745138&r2=745139&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java Tue Feb 17 16:44:38 2009
@@ -53,7 +53,7 @@
     /**
      * Header key holding file path to a local work directory containg a consumed file (if any)
      */
-    public static final String HEADER_FILE_LOCAL_WORK_PATH= "CamelFileLocalWorkPath";
+    public static final String HEADER_FILE_LOCAL_WORK_PATH = "CamelFileLocalWorkPath";
 
     protected GenericFileEndpoint<File> buildFileEndpoint(String uri, String remaining, Map parameters) throws Exception {
         File file = new File(remaining);

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java?rev=745139&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java Tue Feb 17 16:44:38 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+
+
+/**
+ * A grouped exchange that groups together other exchanges, as a holder object.
+ * <p/>
+ * This grouped exchange is useable for the aggregator so multiple exchanges can be grouped
+ * into this single exchange and thus only one exchange is sent for further processing.
+ */
+public class GroupedExchange extends DefaultExchange {
+
+    private List<Exchange> exchanges = new ArrayList<Exchange>();
+
+    public GroupedExchange(CamelContext context) {
+        super(context);
+    }
+
+    public GroupedExchange(CamelContext context, ExchangePattern pattern) {
+        super(context, pattern);
+    }
+
+    public GroupedExchange(Exchange parent) {
+        super(parent);
+    }
+
+    public GroupedExchange(Endpoint fromEndpoint) {
+        super(fromEndpoint);
+    }
+
+    public GroupedExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
+        super(fromEndpoint, pattern);
+    }
+
+    public List<Exchange> getExchanges() {
+        return exchanges;
+    }
+
+    public void setExchanges(List<Exchange> exchanges) {
+        this.exchanges = exchanges;
+    }
+
+    public void addExchange(Exchange exchange) {
+        this.exchanges.add(exchange);
+    }
+
+    public int size() {
+        return exchanges.size();
+    }
+
+    public Exchange get(int index) {
+        return exchanges.get(index);
+    }
+
+    @Override
+    public String toString() {
+        return "Exchange[Grouped with: " + exchanges.size() + " exchanges]";
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java?rev=745139&r1=745138&r2=745139&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java Tue Feb 17 16:44:38 2009
@@ -66,9 +66,12 @@
     private String strategyRef;
     @XmlAttribute(required = false)
     private String collectionRef;    
+    @XmlAttribute(required = false)
+    private Boolean groupExchanges;
     @XmlElement(name = "completedPredicate", required = false)
     private ExpressionSubElementType completedPredicate;
 
+
     public AggregatorType() {
     }
 
@@ -168,6 +171,10 @@
         if (outBatchSize != null) {
             aggregator.setOutBatchSize(outBatchSize);
         }
+
+        if (groupExchanges != null) {
+            aggregator.setGroupExchanges(groupExchanges);
+        }
         
         return aggregator;
     }
@@ -256,6 +263,14 @@
         return completedPredicate;
     }
 
+    public Boolean getGroupExchanges() {
+        return groupExchanges;
+    }
+
+    public void setGroupExchanges(Boolean groupExchanges) {
+        this.groupExchanges = groupExchanges;
+    }
+
     // Fluent API
     //-------------------------------------------------------------------------
 
@@ -337,6 +352,17 @@
     }
 
     /**
+     * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single
+     * combined {@link org.apache.camel.impl.GroupedExchange} class holding all the aggregated exchanges.
+     *
+     * @return the builder
+     */
+    public AggregatorType groupExchanges() {
+        setGroupExchanges(true);
+        return this;
+    }
+
+    /**
      * Sets the predicate used to determine if the aggregation is completed
      *
      * @return the clause used to create the predicate
@@ -402,5 +428,5 @@
         if (isInheritErrorHandler()) {
             output.setErrorHandlerBuilder(getErrorHandlerBuilder());
         }
-    }    
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=745139&r1=745138&r2=745139&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Tue Feb 17 16:44:38 2009
@@ -22,6 +22,7 @@
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.GroupedExchange;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.spi.ExceptionHandler;
@@ -42,13 +43,14 @@
     private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
     private int batchSize = DEFAULT_BATCH_SIZE;
     private int outBatchSize;
+    private boolean groupExchanges;
 
     private Processor processor;
     private Collection<Exchange> collection;
     private ExceptionHandler exceptionHandler;
 
     private BatchSender sender;
-    
+
     public BatchProcessor(Processor processor, Collection<Exchange> collection) {
         ObjectHelper.notNull(processor, "processor");
         ObjectHelper.notNull(collection, "collection");
@@ -112,6 +114,14 @@
         this.batchTimeout = batchTimeout;
     }
 
+    public boolean isGroupExchanges() {
+        return groupExchanges;
+    }
+
+    public void setGroupExchanges(boolean groupExchanges) {
+        this.groupExchanges = groupExchanges;
+    }
+
     public Processor getProcessor() {
         return processor;
     }
@@ -219,11 +229,27 @@
         }
         
         private void sendExchanges() throws Exception {
+            GroupedExchange grouped = null;
+
             Iterator<Exchange> iter = collection.iterator();
             while (iter.hasNext()) {
                 Exchange exchange = iter.next();
                 iter.remove();
-                processExchange(exchange);
+                if (!groupExchanges) {
+                    // non grouped so process the exchange one at a time
+                    processExchange(exchange);
+                } else {
+                    // grouped so add all exchanges into one group
+                    if (grouped == null) {
+                        grouped = new GroupedExchange(exchange.getContext());
+                    }
+                    grouped.addExchange(exchange);
+                }
+            }
+
+            // and after adding process the single grouped exchange
+            if (grouped != null) {
+                processExchange(grouped);
             }
         }
     }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java?rev=745139&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java Tue Feb 17 16:44:38 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.GroupedExchange;
+
+/**
+ * Unit test for aggregate grouped exchanges.
+ */
+public class AggregateGroupedExchangeTest extends ContextTestSupport {
+
+    public void testGrouped() throws Exception {
+        // START SNIPPET: e2
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        // we expect 1 messages since we group all we get in using the same correlation key
+        result.expectedMessageCount(1);
+
+        // then we sent all the message at once
+        template.sendBody("direct:start", "100");
+        template.sendBody("direct:start", "150");
+        template.sendBody("direct:start", "130");
+        template.sendBody("direct:start", "200");
+        template.sendBody("direct:start", "190");
+
+        assertMockEndpointsSatisfied();
+
+        Exchange out = result.getExchanges().get(0);
+        assertTrue(out instanceof GroupedExchange);
+        GroupedExchange grouped = (GroupedExchange)out;
+        assertEquals(5, grouped.size());
+
+        assertEquals("100", grouped.get(0).getIn().getBody(String.class));
+        assertEquals("150", grouped.get(1).getIn().getBody(String.class));
+        assertEquals("130", grouped.get(2).getIn().getBody(String.class));
+        assertEquals("200", grouped.get(3).getIn().getBody(String.class));
+        assertEquals("190", grouped.get(4).getIn().getBody(String.class));
+        // END SNIPPET: e2
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                // our route is aggregating from the direct queue and sending the response to the mock
+                from("direct:start")
+                    // aggregated using id as correlation so each is unqiue and thus we batch everything
+                    .aggregate().simple("id")
+                    // wait for 0.5 seconds to aggregate
+                    .batchTimeout(500L)
+                    // group the exchanges so we get one single exchange containing all the others
+                    .groupExchanges()
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date