You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/08 11:20:21 UTC

svn commit: r931845 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/processor/aggregate/ camel-core/src/test/java/org/apache/camel/processor/ camel-core/src/test/java/org/apache/camel/processor/aggregator/ components/camel-spring/src/test/...

Author: davsclaus
Date: Thu Apr  8 09:20:19 2010
New Revision: 931845

URL: http://svn.apache.org/viewvc?rev=931845&view=rev
Log:
CAMEL-2568: Polished aggregator and fixed a timeout in some rare cases could send the last and not aggregated exchange.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java
      - copied, changed from r931784, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java   (contents, props changed)
      - copied, changed from r931784, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorTest.java
Removed:
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=931845&r1=931844&r2=931845&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Thu Apr  8 09:20:19 2010
@@ -81,7 +81,8 @@ public class AggregateProcessor extends 
     private final Expression correlationExpression;
     private final ExecutorService executorService;
     private ScheduledExecutorService recoverService;
-    private TimeoutMap<Object, Exchange> timeoutMap;
+    // store correlation key -> exchange id in timeout map
+    private TimeoutMap<Object, String> timeoutMap;
     private ExceptionHandler exceptionHandler = new LoggingExceptionHandler(getClass());
     private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository();
     private Map<Object, Object> closedCorrelationKeys;
@@ -203,7 +204,7 @@ public class AggregateProcessor extends 
         }
 
         // check if we are complete
-        boolean complete = false;
+        String complete = null;
         if (isEagerCheckCompletion()) {
             // put the current aggregated size on the exchange so its avail during completion check
             newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
@@ -224,14 +225,13 @@ public class AggregateProcessor extends 
         }
 
         // only need to update aggregation repository if we are not complete
-        if (!complete) {
+        if (complete == null) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("In progress aggregated exchange: " + answer + " with correlation key:" + key);
             }
             aggregationRepository.add(exchange.getContext(), key, answer);
-        }
-
-        if (complete) {
+        } else {
+            answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
             onCompletion(key, answer, false);
         }
 
@@ -242,12 +242,18 @@ public class AggregateProcessor extends 
         return answer;
     }
 
-    protected boolean isCompleted(Object key, Exchange exchange) {
+    /**
+     * Tests whether the given exchange is complete or not
+     *
+     * @param key       the correlation key
+     * @param exchange  the incoming exchange
+     * @return <tt>null</tt> if not completed, otherwise a String with the type that triggered the completion
+     */
+    protected String isCompleted(Object key, Exchange exchange) {
         if (getCompletionPredicate() != null) {
             boolean answer = getCompletionPredicate().matches(exchange);
             if (answer) {
-                exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "predicate");
-                return true;
+                return "predicate";
             }
         }
 
@@ -256,16 +262,14 @@ public class AggregateProcessor extends 
             if (value != null && value > 0) {
                 int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class);
                 if (size >= value) {
-                    exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "size");
-                    return true;
+                    return "size";
                 }
             }
         }
         if (getCompletionSize() > 0) {
             int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class);
             if (size >= getCompletionSize()) {
-                exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "size");
-                return true;
+                return "size";
             }
         }
 
@@ -279,7 +283,7 @@ public class AggregateProcessor extends 
                     LOG.trace("Updating correlation key " + key + " to timeout after "
                             + value + " ms. as exchange received: " + exchange);
                 }
-                timeoutMap.put(key, exchange, value);
+                timeoutMap.put(key, exchange.getExchangeId(), value);
                 timeoutSet = true;
             }
         }
@@ -289,7 +293,7 @@ public class AggregateProcessor extends 
                 LOG.trace("Updating correlation key " + key + " to timeout after "
                         + getCompletionTimeout() + " ms. as exchange received: " + exchange);
             }
-            timeoutMap.put(key, exchange, getCompletionTimeout());
+            timeoutMap.put(key, exchange.getExchangeId(), getCompletionTimeout());
         }
 
         if (isCompletionFromBatchConsumer()) {
@@ -298,12 +302,12 @@ public class AggregateProcessor extends 
             if (size > 0 && batchConsumerCounter.intValue() >= size) {
                 // batch consumer is complete then reset the counter
                 batchConsumerCounter.set(0);
-                exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "consumer");
-                return true;
+                return "consumer";
             }
         }
 
-        return false;
+        // not complete
+        return null;
     }
 
     protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) {
@@ -498,23 +502,27 @@ public class AggregateProcessor extends 
     /**
      * Background task that looks for aggregated exchanges which is triggered by completion timeouts.
      */
-    private final class AggregationTimeoutMap extends DefaultTimeoutMap<Object, Exchange> {
+    private final class AggregationTimeoutMap extends DefaultTimeoutMap<Object, String> {
 
         private AggregationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
             super(executor, requestMapPollTimeMillis);
         }
 
         @Override
-        public void onEviction(Object key, Exchange exchange) {
+        public void onEviction(Object key, String exchangeId) {
             if (log.isDebugEnabled()) {
                 log.debug("Completion timeout triggered for correlation key: " + key);
             }
 
-            exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
+            // get the aggregated exchange
+            Exchange answer = aggregationRepository.get(camelContext, key);
+
+            // indicate it was completed by timeout
+            answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
 
             try {
                 lock.lock();
-                onCompletion(key, exchange, true);
+                onCompletion(key, answer, true);
             } finally {
                 lock.unlock();
             }
@@ -580,12 +588,7 @@ public class AggregateProcessor extends 
                         exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
 
                         // resubmit the recovered exchange
-                        try {
-                            lock.lock();
-                            onSubmitCompletion(key, exchange);
-                        } finally {
-                            lock.unlock();
-                        }
+                        onSubmitCompletion(key, exchange);
                     }
                 }
             }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java?rev=931845&r1=931844&r2=931845&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java Thu Apr  8 09:20:19 2010
@@ -23,12 +23,14 @@ import org.apache.camel.processor.aggreg
 public class BodyInAggregatingStrategy implements AggregationStrategy {
 
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
-        if (oldExchange != null) {
-            String oldBody = oldExchange.getIn().getBody(String.class);
-            String newBody = newExchange.getIn().getBody(String.class);
-            newExchange.getIn().setBody(oldBody + "+" + newBody);
+        if (oldExchange == null) {
+            return newExchange;
         }
-        return newExchange;
+
+        String oldBody = oldExchange.getIn().getBody(String.class);
+        String newBody = newExchange.getIn().getBody(String.class);
+        oldExchange.getIn().setBody(oldBody + "+" + newBody);
+        return oldExchange;
     }
 
     /**

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java?rev=931845&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java Thu Apr  8 09:20:19 2010
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test for aggregate grouped exchanges completed by size
+ */
+public class AggregateGroupedExchangeSizePredicateTest extends ContextTestSupport {
+
+    public void testGroupedSize() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        // we expect 2 messages since we group by size (3 and 2)
+        result.expectedMessageCount(2);
+
+        template.sendBodyAndHeader("direct:start", "100", "groupSize", 3);
+        template.sendBodyAndHeader("direct:start", "150", "groupSize", 3);
+        template.sendBodyAndHeader("direct:start", "130", "groupSize", 3);
+        template.sendBodyAndHeader("direct:start", "200", "groupSize", 2);
+        template.sendBodyAndHeader("direct:start", "190", "groupSize", 2);
+
+        assertMockEndpointsSatisfied();
+
+        Exchange out = result.getExchanges().get(0);
+        List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
+        assertEquals(3, grouped.size());
+        assertEquals("100", grouped.get(0).getIn().getBody(String.class));
+        assertEquals("150", grouped.get(1).getIn().getBody(String.class));
+        assertEquals("130", grouped.get(2).getIn().getBody(String.class));
+
+        out = result.getExchanges().get(1);
+        grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
+        assertEquals(2, grouped.size());
+
+        assertEquals("200", grouped.get(0).getIn().getBody(String.class));
+        assertEquals("190", grouped.get(1).getIn().getBody(String.class));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:start")
+                    // must use eagerCheckCompletion so we can check the groupSize header on the incoming exchange 
+                    .aggregate().constant(true).groupExchanges().eagerCheckCompletion().completionSize(header("groupSize"))
+                        .to("mock:result")
+                    .end();
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java (from r931784, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java&r1=931784&r2=931845&rev=931845&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java Thu Apr  8 09:20:19 2010
@@ -24,55 +24,50 @@ import org.apache.camel.builder.RouteBui
 import org.apache.camel.component.mock.MockEndpoint;
 
 /**
- * Unit test for aggregate grouped exchanges.
+ * Unit test for aggregate grouped exchanges completed by size
  */
-public class AggregateGroupedExchangeTest extends ContextTestSupport {
+public class AggregateGroupedExchangeSizeTest extends ContextTestSupport {
 
-    public void testGrouped() throws Exception {
-        // START SNIPPET: e2
+    public void testGroupedSize() throws Exception {
         MockEndpoint result = getMockEndpoint("mock:result");
 
-        // we expect 1 messages since we group all we get in using the same correlation key
-        result.expectedMessageCount(1);
+        // we expect 2 messages since we group by size (3 and 3)
+        result.expectedMessageCount(2);
 
-        // then we sent all the message at once
         template.sendBody("direct:start", "100");
         template.sendBody("direct:start", "150");
         template.sendBody("direct:start", "130");
         template.sendBody("direct:start", "200");
         template.sendBody("direct:start", "190");
+        template.sendBody("direct:start", "120");
 
         assertMockEndpointsSatisfied();
 
         Exchange out = result.getExchanges().get(0);
         List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
-
-        assertEquals(5, grouped.size());
-
+        assertEquals(3, grouped.size());
         assertEquals("100", grouped.get(0).getIn().getBody(String.class));
         assertEquals("150", grouped.get(1).getIn().getBody(String.class));
         assertEquals("130", grouped.get(2).getIn().getBody(String.class));
-        assertEquals("200", grouped.get(3).getIn().getBody(String.class));
-        assertEquals("190", grouped.get(4).getIn().getBody(String.class));
-        // END SNIPPET: e2
+
+        out = result.getExchanges().get(1);
+        grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
+        assertEquals(3, grouped.size());
+
+        assertEquals("200", grouped.get(0).getIn().getBody(String.class));
+        assertEquals("190", grouped.get(1).getIn().getBody(String.class));
+        assertEquals("120", grouped.get(2).getIn().getBody(String.class));
     }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                // START SNIPPET: e1
-                // our route is aggregating from the direct queue and sending the response to the mock
                 from("direct:start")
-                    // aggregate all using same expression
-                    .aggregate().constant(true)
-                    // wait for 0.5 seconds to aggregate
-                    .completionTimeout(500L)
-                    // group the exchanges so we get one single exchange containing all the others
-                    .groupExchanges()
-                    .to("mock:result");
-                // END SNIPPET: e1
+                    .aggregate().constant(true).groupExchanges().completionSize(3)
+                        .to("mock:result")
+                    .end();
             }
         };
     }
-}
+}
\ No newline at end of file

Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java (from r931784, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorTest.java&r1=931784&r2=931845&rev=931845&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java Thu Apr  8 09:20:19 2010
@@ -14,10 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.spring.processor;
+package org.apache.camel.spring.processor.aggregator;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.processor.AggregatorTest;
+import org.apache.camel.processor.aggregator.AggregatorTest;
 import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
 
 /**

Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date