You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/02/17 17:44:38 UTC
svn commit: r745139 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/component/file/ main/java/org/apache/camel/impl/
main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/aggregator/
Author: davsclaus
Date: Tue Feb 17 16:44:38 2009
New Revision: 745139
URL: http://svn.apache.org/viewvc?rev=745139&view=rev
Log:
CAMEL-971: Introduced GroupedExchange for aggregate N exchanges into 1 single grouped exchange holding the X exchanges.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java?rev=745139&r1=745138&r2=745139&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java Tue Feb 17 16:44:38 2009
@@ -53,7 +53,7 @@
/**
* Header key holding file path to a local work directory containg a consumed file (if any)
*/
- public static final String HEADER_FILE_LOCAL_WORK_PATH= "CamelFileLocalWorkPath";
+ public static final String HEADER_FILE_LOCAL_WORK_PATH = "CamelFileLocalWorkPath";
protected GenericFileEndpoint<File> buildFileEndpoint(String uri, String remaining, Map parameters) throws Exception {
File file = new File(remaining);
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java?rev=745139&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java Tue Feb 17 16:44:38 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+
+
+/**
+ * A grouped exchange that groups together other exchanges, as a holder object.
+ * <p/>
+ * This grouped exchange is useable for the aggregator so multiple exchanges can be grouped
+ * into this single exchange and thus only one exchange is sent for further processing.
+ */
+public class GroupedExchange extends DefaultExchange {
+
+ private List<Exchange> exchanges = new ArrayList<Exchange>();
+
+ public GroupedExchange(CamelContext context) {
+ super(context);
+ }
+
+ public GroupedExchange(CamelContext context, ExchangePattern pattern) {
+ super(context, pattern);
+ }
+
+ public GroupedExchange(Exchange parent) {
+ super(parent);
+ }
+
+ public GroupedExchange(Endpoint fromEndpoint) {
+ super(fromEndpoint);
+ }
+
+ public GroupedExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
+ super(fromEndpoint, pattern);
+ }
+
+ public List<Exchange> getExchanges() {
+ return exchanges;
+ }
+
+ public void setExchanges(List<Exchange> exchanges) {
+ this.exchanges = exchanges;
+ }
+
+ public void addExchange(Exchange exchange) {
+ this.exchanges.add(exchange);
+ }
+
+ public int size() {
+ return exchanges.size();
+ }
+
+ public Exchange get(int index) {
+ return exchanges.get(index);
+ }
+
+ @Override
+ public String toString() {
+ return "Exchange[Grouped with: " + exchanges.size() + " exchanges]";
+ }
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java?rev=745139&r1=745138&r2=745139&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java Tue Feb 17 16:44:38 2009
@@ -66,9 +66,12 @@
private String strategyRef;
@XmlAttribute(required = false)
private String collectionRef;
+ @XmlAttribute(required = false)
+ private Boolean groupExchanges;
@XmlElement(name = "completedPredicate", required = false)
private ExpressionSubElementType completedPredicate;
+
public AggregatorType() {
}
@@ -168,6 +171,10 @@
if (outBatchSize != null) {
aggregator.setOutBatchSize(outBatchSize);
}
+
+ if (groupExchanges != null) {
+ aggregator.setGroupExchanges(groupExchanges);
+ }
return aggregator;
}
@@ -256,6 +263,14 @@
return completedPredicate;
}
+ public Boolean getGroupExchanges() {
+ return groupExchanges;
+ }
+
+ public void setGroupExchanges(Boolean groupExchanges) {
+ this.groupExchanges = groupExchanges;
+ }
+
// Fluent API
//-------------------------------------------------------------------------
@@ -337,6 +352,17 @@
}
/**
+ * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single
+ * combined {@link org.apache.camel.impl.GroupedExchange} class holding all the aggregated exchanges.
+ *
+ * @return the builder
+ */
+ public AggregatorType groupExchanges() {
+ setGroupExchanges(true);
+ return this;
+ }
+
+ /**
* Sets the predicate used to determine if the aggregation is completed
*
* @return the clause used to create the predicate
@@ -402,5 +428,5 @@
if (isInheritErrorHandler()) {
output.setErrorHandlerBuilder(getErrorHandlerBuilder());
}
- }
+ }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=745139&r1=745138&r2=745139&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Tue Feb 17 16:44:38 2009
@@ -22,6 +22,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.impl.GroupedExchange;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.spi.ExceptionHandler;
@@ -42,13 +43,14 @@
private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
private int batchSize = DEFAULT_BATCH_SIZE;
private int outBatchSize;
+ private boolean groupExchanges;
private Processor processor;
private Collection<Exchange> collection;
private ExceptionHandler exceptionHandler;
private BatchSender sender;
-
+
public BatchProcessor(Processor processor, Collection<Exchange> collection) {
ObjectHelper.notNull(processor, "processor");
ObjectHelper.notNull(collection, "collection");
@@ -112,6 +114,14 @@
this.batchTimeout = batchTimeout;
}
+ public boolean isGroupExchanges() {
+ return groupExchanges;
+ }
+
+ public void setGroupExchanges(boolean groupExchanges) {
+ this.groupExchanges = groupExchanges;
+ }
+
public Processor getProcessor() {
return processor;
}
@@ -219,11 +229,27 @@
}
private void sendExchanges() throws Exception {
+ GroupedExchange grouped = null;
+
Iterator<Exchange> iter = collection.iterator();
while (iter.hasNext()) {
Exchange exchange = iter.next();
iter.remove();
- processExchange(exchange);
+ if (!groupExchanges) {
+ // non grouped so process the exchange one at a time
+ processExchange(exchange);
+ } else {
+ // grouped so add all exchanges into one group
+ if (grouped == null) {
+ grouped = new GroupedExchange(exchange.getContext());
+ }
+ grouped.addExchange(exchange);
+ }
+ }
+
+ // and after adding process the single grouped exchange
+ if (grouped != null) {
+ processExchange(grouped);
}
}
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java?rev=745139&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java Tue Feb 17 16:44:38 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.GroupedExchange;
+
+/**
+ * Unit test for aggregate grouped exchanges.
+ */
+public class AggregateGroupedExchangeTest extends ContextTestSupport {
+
+ public void testGrouped() throws Exception {
+ // START SNIPPET: e2
+ MockEndpoint result = getMockEndpoint("mock:result");
+
+ // we expect 1 messages since we group all we get in using the same correlation key
+ result.expectedMessageCount(1);
+
+ // then we sent all the message at once
+ template.sendBody("direct:start", "100");
+ template.sendBody("direct:start", "150");
+ template.sendBody("direct:start", "130");
+ template.sendBody("direct:start", "200");
+ template.sendBody("direct:start", "190");
+
+ assertMockEndpointsSatisfied();
+
+ Exchange out = result.getExchanges().get(0);
+ assertTrue(out instanceof GroupedExchange);
+ GroupedExchange grouped = (GroupedExchange)out;
+ assertEquals(5, grouped.size());
+
+ assertEquals("100", grouped.get(0).getIn().getBody(String.class));
+ assertEquals("150", grouped.get(1).getIn().getBody(String.class));
+ assertEquals("130", grouped.get(2).getIn().getBody(String.class));
+ assertEquals("200", grouped.get(3).getIn().getBody(String.class));
+ assertEquals("190", grouped.get(4).getIn().getBody(String.class));
+ // END SNIPPET: e2
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: e1
+ // our route is aggregating from the direct queue and sending the response to the mock
+ from("direct:start")
+ // aggregated using id as correlation so each is unqiue and thus we batch everything
+ .aggregate().simple("id")
+ // wait for 0.5 seconds to aggregate
+ .batchTimeout(500L)
+ // group the exchanges so we get one single exchange containing all the others
+ .groupExchanges()
+ .to("mock:result");
+ // END SNIPPET: e1
+ }
+ };
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date