You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/06/18 10:42:56 UTC
svn commit: r1351237 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/aggregate/
test/java/org/apache/camel/processor/aggregator/
Author: davsclaus
Date: Mon Jun 18 08:42:55 2012
New Revision: 1351237
URL: http://svn.apache.org/viewvc?rev=1351237&view=rev
Log:
CAMEL-5375: Added CompletionAwareAggregationStrategy to aggregate EIP to allow callback when the aggregated exchnge is complete.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/CompletionAwareAggregationStrategy.java
- copied, changed from r1351221, camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionAwareAggregationStrategyTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1351237&r1=1351236&r2=1351237&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Mon Jun 18 08:42:55 2012
@@ -408,7 +408,12 @@ public class AggregateProcessor extends
// add this as in progress before we submit the task
inProgressCompleteExchanges.add(exchange.getExchangeId());
- // send this exchange
+ // invoke the on completion callback
+ if (aggregationStrategy instanceof CompletionAwareAggregationStrategy) {
+ ((CompletionAwareAggregationStrategy) aggregationStrategy).onCompletion(exchange);
+ }
+
+ // send this exchange
executorService.submit(new Runnable() {
public void run() {
LOG.debug("Processing aggregated exchange: {}", exchange);
Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/CompletionAwareAggregationStrategy.java (from r1351221, camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/CompletionAwareAggregationStrategy.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/CompletionAwareAggregationStrategy.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java&r1=1351221&r2=1351237&rev=1351237&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/CompletionAwareAggregationStrategy.java Mon Jun 18 08:42:55 2012
@@ -19,24 +19,21 @@ package org.apache.camel.processor.aggre
import org.apache.camel.Exchange;
/**
- * A specialized {@link org.apache.camel.processor.aggregate.AggregationStrategy} which can handle timeouts as well.
+ * A specialized {@link AggregationStrategy} which has callback when the aggregated {@link Exchange} is completed.
*
- * @version
+ * @version
*/
-public interface TimeoutAwareAggregationStrategy extends AggregationStrategy {
+public interface CompletionAwareAggregationStrategy extends AggregationStrategy {
// TODO: In Camel 3.0 we should move this to org.apache.camel package
/**
- * A timeout occurred.
- * <p/>
+ * The aggregated {@link Exchange} has completed
+ *
* <b>Important: </b> This method must <b>not</b> throw any exceptions.
*
- * @param oldExchange the current aggregated exchange, or the original {@link Exchange} if no aggregation
- * has been done before the timeout occurred
- * @param index the index, may be <tt>-1</tt> if not possible to determine the index
- * @param total the total, may be <tt>-1</tt> if not possible to determine the total
- * @param timeout the timeout value in millis, may be <tt>-1</tt> if not possible to determine the timeout
+ * @param exchange the current aggregated exchange, or the original {@link org.apache.camel.Exchange} if no aggregation
+ * has been done before the completion occurred
*/
- void timeout(Exchange oldExchange, int index, int total, long timeout);
+ void onCompletion(Exchange exchange);
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionAwareAggregationStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionAwareAggregationStrategyTest.java?rev=1351237&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionAwareAggregationStrategyTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionAwareAggregationStrategyTest.java Mon Jun 18 08:42:55 2012
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy;
+
+/**
+ *
+ */
+public class AggregateCompletionAwareAggregationStrategyTest extends ContextTestSupport {
+
+ public void testAggregateCompletionAware() throws Exception {
+ getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C");
+ getMockEndpoint("mock:aggregated").expectedHeaderReceived("bodyCopy", "A+B+C");
+ getMockEndpoint("mock:aggregated").expectedHeaderReceived("myStrategyCompleted", true);
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new MyCompletionStrategy()).completionSize(3)
+ .to("mock:aggregated");
+ }
+ };
+ }
+
+ private final class MyCompletionStrategy implements CompletionAwareAggregationStrategy {
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ String oldBody = oldExchange.getIn().getBody(String.class);
+ String newBody = newExchange.getIn().getBody(String.class);
+ oldExchange.getIn().setBody(oldBody + "+" + newBody);
+ return oldExchange;
+ }
+
+ @Override
+ public void onCompletion(Exchange exchange) {
+ // copy body so we can test what the body was when this callback was invoked
+ exchange.getIn().setHeader("bodyCopy", exchange.getIn().getBody());
+ // add a header so we know we were called
+ exchange.getIn().setHeader("myStrategyCompleted", true);
+ }
+ }
+}