You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/09/28 08:43:43 UTC

svn commit: r1002036 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java

Author: davsclaus
Date: Tue Sep 28 06:43:42 2010
New Revision: 1002036

URL: http://svn.apache.org/viewvc?rev=1002036&view=rev
Log:
CAMEL-3159: discard on timeout in Aggregate EIP. Added test with hawtdb as well.

Added:
    camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java
      - copied, changed from r1002017, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1002036&r1=1002035&r2=1002036&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Tue Sep 28 06:43:42 2010
@@ -365,6 +365,10 @@ public class AggregateProcessor extends 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Aggregation for correlation key " + key + " discarding aggregated exchange: " + exchange);
             }
+            // must confirm the discarded exchange
+            aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId());
+            // and remove redelivery state as well
+            redeliveryState.remove(exchange.getExchangeId());
             return;
         }
 

Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java (from r1002017, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java&r1=1002017&r2=1002036&rev=1002036&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java Tue Sep 28 06:43:42 2010
@@ -20,13 +20,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
-public class HawtDBAggregateRecoverTest extends CamelTestSupport {
+public class HawtDBAggregateDiscardOnTimeoutTest extends CamelTestSupport {
 
     private static AtomicInteger counter = new AtomicInteger(0);
     private HawtDBAggregationRepository repo;
@@ -43,22 +43,28 @@ public class HawtDBAggregateRecoverTest 
     }
 
     @Test
-    public void testHawtDBAggregateRecover() throws Exception {
-        // should fail the first 2 times and then recover
-        getMockEndpoint("mock:aggregated").expectedMessageCount(3);
-        getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
-        // should be marked as redelivered
-        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
-        // on the 2nd redelivery attempt we success
-        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+    public void testAggregateDiscardOnTimeout() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+
+        // wait 3 seconds
+        Thread.sleep(3000);
+
+        mock.assertIsSatisfied();
+
+        // now send 3 which does not timeout
+        mock.reset();
+        mock.expectedBodiesReceived("C+D+E");
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
         template.sendBodyAndHeader("direct:start", "C", "id", 123);
-        template.sendBodyAndHeader("direct:start", "D", "id", 123);
-        template.sendBodyAndHeader("direct:start", "E", "id", 123);
 
-        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+        // should complete before timeout
+        mock.await(1500, TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -68,21 +74,12 @@ public class HawtDBAggregateRecoverTest 
             public void configure() throws Exception {
                 from("direct:start")
                     .aggregate(header("id"), new MyAggregationStrategy())
-                        .completionSize(5).aggregationRepository(repo)
-                        .log("aggregated exchange id ${exchangeId} with ${body}")
-                        .to("mock:aggregated")
-                        .delay(1000)
-                        // simulate errors the first two times
-                        .process(new Processor() {
-                            public void process(Exchange exchange) throws Exception {
-                                int count = counter.incrementAndGet();
-                                if (count <= 2) {
-                                    throw new IllegalArgumentException("Damn");
-                                }
-                            }
-                        })
-                        .to("mock:result")
-                    .end();
+                        .completionSize(3).aggregationRepository(repo)
+                        // use a 3 second timeout
+                        .completionTimeout(2000)
+                        // and if timeout occurred then just discard the aggregated message
+                        .discardOnCompletionTimeout()
+                        .to("mock:aggregated");
             }
         };
     }