You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/09/26 16:30:47 UTC

svn commit: r1001436 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/aggregate/ camel-core/src/test/java/org/apache/camel/processor/aggregator/ components/camel-spring/src/test/jav...

Author: davsclaus
Date: Sun Sep 26 14:30:47 2010
New Revision: 1001436

URL: http://svn.apache.org/viewvc?rev=1001436&view=rev
Log:
CAMEL-3159: Added discardOnCompletionTimeout to aggregator EIP.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java
      - copied, changed from r1001415, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.java
      - copied, changed from r1001415, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.xml
      - copied, changed from r1001415, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=1001436&r1=1001435&r2=1001436&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Sun Sep 26 14:30:47 2010
@@ -90,6 +90,8 @@ public class AggregateDefinition extends
     private Boolean ignoreInvalidCorrelationKeys;
     @XmlAttribute
     private Integer closeCorrelationKeyOnCompletion;
+    @XmlAttribute
+    private Boolean discardOnCompletionTimeout;
 
     public AggregateDefinition() {
     }
@@ -203,6 +205,9 @@ public class AggregateDefinition extends
         if (getCloseCorrelationKeyOnCompletion() != null) {
             answer.setCloseCorrelationKeyOnCompletion(getCloseCorrelationKeyOnCompletion());
         }
+        if (isDiscardOnCompletionTimeout() != null) {
+            answer.setDiscardOnCompletionTimeout(isDiscardOnCompletionTimeout());
+        }
 
         return answer;
     }
@@ -390,6 +395,14 @@ public class AggregateDefinition extends
         this.aggregationRepositoryRef = aggregationRepositoryRef;
     }
 
+    public Boolean isDiscardOnCompletionTimeout() {
+        return discardOnCompletionTimeout;
+    }
+
+    public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) {
+        this.discardOnCompletionTimeout = discardOnCompletionTimeout;
+    }
+
     // Fluent API
     //-------------------------------------------------------------------------
 
@@ -430,6 +443,18 @@ public class AggregateDefinition extends
     }
 
     /**
+     * Discards the aggregated message on completion timeout.
+     * <p/>
+     * This means on timeout the aggregated message is dropped and not sent out of the aggregator.
+     *
+     * @return builder
+     */
+    public AggregateDefinition discardOnCompletionTimeout() {
+        setDiscardOnCompletionTimeout(true);
+        return this;
+    }
+
+    /**
      * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer}
      * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported
      * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete.
@@ -641,4 +666,5 @@ public class AggregateDefinition extends
     public void setOutputs(List<ProcessorDefinition> outputs) {
         this.outputs = outputs;
     }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1001436&r1=1001435&r2=1001436&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Sun Sep 26 14:30:47 2010
@@ -118,6 +118,7 @@ public class AggregateProcessor extends 
     private Expression completionSizeExpression;
     private boolean completionFromBatchConsumer;
     private AtomicInteger batchConsumerCounter = new AtomicInteger();
+    private boolean discardOnCompletionTimeout;
 
     public AggregateProcessor(CamelContext camelContext, Processor processor,
                               Expression correlationExpression, AggregationStrategy aggregationStrategy,
@@ -359,6 +360,14 @@ public class AggregateProcessor extends 
             closedCorrelationKeys.put(key, key);
         }
 
+        if (fromTimeout && isDiscardOnCompletionTimeout()) {
+            // discard due timeout
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Aggregation for correlation key " + key + " discarding aggregated exchange: " + exchange);
+            }
+            return;
+        }
+
         onSubmitCompletion(key, exchange);
     }
 
@@ -503,6 +512,14 @@ public class AggregateProcessor extends 
         this.aggregationRepository = aggregationRepository;
     }
 
+    public boolean isDiscardOnCompletionTimeout() {
+        return discardOnCompletionTimeout;
+    }
+
+    public void setDiscardOnCompletionTimeout(boolean discardOnCompletionTimeout) {
+        this.discardOnCompletionTimeout = discardOnCompletionTimeout;
+    }
+
     /**
      * On completion task which keeps the booking of the in progress up to date
      */

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java (from r1001415, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java&r1=1001415&r2=1001436&rev=1001436&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java Sun Sep 26 14:30:47 2010
@@ -16,23 +16,40 @@
  */
 package org.apache.camel.processor.aggregator;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.BodyInAggregatingStrategy;
 
 /**
  * @version $Revision$
  */
-public class AggregateSimpleTimeoutTest extends ContextTestSupport {
+public class AggregateDiscardOnTimeoutTest extends ContextTestSupport {
+
+    public void testAggregateDiscardOnTimeout() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+
+        // wait 3 seconds
+        Thread.sleep(3000);
+
+        mock.assertIsSatisfied();
 
-    public void testAggregateSimpleTimeout() throws Exception {
-        getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C");
+        // now send 3 which does not timeout
+        mock.reset();
+        mock.expectedBodiesReceived("C+D+E");
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
         template.sendBodyAndHeader("direct:start", "C", "id", 123);
 
-        assertMockEndpointsSatisfied();
+        // should complete before timeout
+        mock.await(1500, TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -42,11 +59,12 @@ public class AggregateSimpleTimeoutTest 
             public void configure() throws Exception {
                 // START SNIPPET: e1
                 from("direct:start")
-                    // aggregate all exchanges correlated by the id header.
-                    // Aggregate them using the BodyInAggregatingStrategy strategy which
-                    // and after 3 seconds of inactivity them timeout and complete the aggregation
-                    // and send it to mock:aggregated
-                    .aggregate(header("id"), new BodyInAggregatingStrategy()).completionTimeout(3000)
+                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                        .completionSize(3)
+                        // use a 3 second timeout
+                        .completionTimeout(2000)
+                        // and if timeout occurred then just discard the aggregated message
+                        .discardOnCompletionTimeout()
                         .to("mock:aggregated");
                 // END SNIPPET: e1
             }

Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.java (from r1001415, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java&r1=1001415&r2=1001436&rev=1001436&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.java Sun Sep 26 14:30:47 2010
@@ -17,14 +17,14 @@
 package org.apache.camel.spring.processor.aggregator;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.processor.aggregator.AggregateSimpleTimeoutTest;
+import org.apache.camel.processor.aggregator.AggregateDiscardOnTimeoutTest;
 
 import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
 
 /**
  * @version $Revision$
  */
-public class SpringAggregateSimpleTimeoutTest extends AggregateSimpleTimeoutTest {
+public class SpringAggregateDiscardOnTimeoutTest extends AggregateDiscardOnTimeoutTest {
 
     protected CamelContext createCamelContext() throws Exception {
         return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml");

Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.xml (from r1001415, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml&r1=1001415&r2=1001436&rev=1001436&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.xml Sun Sep 26 14:30:47 2010
@@ -26,7 +26,7 @@
     <camelContext xmlns="http://camel.apache.org/schema/spring">
         <route>
             <from uri="direct:start"/>
-            <aggregate strategyRef="aggregatorStrategy" completionTimeout="3000">
+            <aggregate strategyRef="aggregatorStrategy" completionTimeout="3000" discardOnCompletionTimeout="true">
                 <correlationExpression>
                     <simple>header.id</simple>
                 </correlationExpression>