You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/09/26 16:30:47 UTC
svn commit: r1001436 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/model/
camel-core/src/main/java/org/apache/camel/processor/aggregate/
camel-core/src/test/java/org/apache/camel/processor/aggregator/
components/camel-spring/src/test/jav...
Author: davsclaus
Date: Sun Sep 26 14:30:47 2010
New Revision: 1001436
URL: http://svn.apache.org/viewvc?rev=1001436&view=rev
Log:
CAMEL-3159: Added discardOnCompletionTimeout to aggregator EIP.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java
- copied, changed from r1001415, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.java
- copied, changed from r1001415, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.xml
- copied, changed from r1001415, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=1001436&r1=1001435&r2=1001436&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Sun Sep 26 14:30:47 2010
@@ -90,6 +90,8 @@ public class AggregateDefinition extends
private Boolean ignoreInvalidCorrelationKeys;
@XmlAttribute
private Integer closeCorrelationKeyOnCompletion;
+ @XmlAttribute
+ private Boolean discardOnCompletionTimeout;
public AggregateDefinition() {
}
@@ -203,6 +205,9 @@ public class AggregateDefinition extends
if (getCloseCorrelationKeyOnCompletion() != null) {
answer.setCloseCorrelationKeyOnCompletion(getCloseCorrelationKeyOnCompletion());
}
+ if (isDiscardOnCompletionTimeout() != null) {
+ answer.setDiscardOnCompletionTimeout(isDiscardOnCompletionTimeout());
+ }
return answer;
}
@@ -390,6 +395,14 @@ public class AggregateDefinition extends
this.aggregationRepositoryRef = aggregationRepositoryRef;
}
+ public Boolean isDiscardOnCompletionTimeout() {
+ return discardOnCompletionTimeout;
+ }
+
+ public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) {
+ this.discardOnCompletionTimeout = discardOnCompletionTimeout;
+ }
+
// Fluent API
//-------------------------------------------------------------------------
@@ -430,6 +443,18 @@ public class AggregateDefinition extends
}
/**
+ * Discards the aggregated message on completion timeout.
+ * <p/>
+ * This means on timeout the aggregated message is dropped and not sent out of the aggregator.
+ *
+ * @return builder
+ */
+ public AggregateDefinition discardOnCompletionTimeout() {
+ setDiscardOnCompletionTimeout(true);
+ return this;
+ }
+
+ /**
* Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer}
* and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported
* as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete.
@@ -641,4 +666,5 @@ public class AggregateDefinition extends
public void setOutputs(List<ProcessorDefinition> outputs) {
this.outputs = outputs;
}
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1001436&r1=1001435&r2=1001436&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Sun Sep 26 14:30:47 2010
@@ -118,6 +118,7 @@ public class AggregateProcessor extends
private Expression completionSizeExpression;
private boolean completionFromBatchConsumer;
private AtomicInteger batchConsumerCounter = new AtomicInteger();
+ private boolean discardOnCompletionTimeout;
public AggregateProcessor(CamelContext camelContext, Processor processor,
Expression correlationExpression, AggregationStrategy aggregationStrategy,
@@ -359,6 +360,14 @@ public class AggregateProcessor extends
closedCorrelationKeys.put(key, key);
}
+ if (fromTimeout && isDiscardOnCompletionTimeout()) {
+ // discard due timeout
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Aggregation for correlation key " + key + " discarding aggregated exchange: " + exchange);
+ }
+ return;
+ }
+
onSubmitCompletion(key, exchange);
}
@@ -503,6 +512,14 @@ public class AggregateProcessor extends
this.aggregationRepository = aggregationRepository;
}
+ public boolean isDiscardOnCompletionTimeout() {
+ return discardOnCompletionTimeout;
+ }
+
+ public void setDiscardOnCompletionTimeout(boolean discardOnCompletionTimeout) {
+ this.discardOnCompletionTimeout = discardOnCompletionTimeout;
+ }
+
/**
* On completion task which keeps the booking of the in progress up to date
*/
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java (from r1001415, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java&r1=1001415&r2=1001436&rev=1001436&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java Sun Sep 26 14:30:47 2010
@@ -16,23 +16,40 @@
*/
package org.apache.camel.processor.aggregator;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.BodyInAggregatingStrategy;
/**
* @version $Revision$
*/
-public class AggregateSimpleTimeoutTest extends ContextTestSupport {
+public class AggregateDiscardOnTimeoutTest extends ContextTestSupport {
+
+ public void testAggregateDiscardOnTimeout() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ mock.expectedMessageCount(0);
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+
+ // wait 3 seconds
+ Thread.sleep(3000);
+
+ mock.assertIsSatisfied();
- public void testAggregateSimpleTimeout() throws Exception {
- getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C");
+ // now send 3 which does not timeout
+ mock.reset();
+ mock.expectedBodiesReceived("C+D+E");
template.sendBodyAndHeader("direct:start", "A", "id", 123);
template.sendBodyAndHeader("direct:start", "B", "id", 123);
template.sendBodyAndHeader("direct:start", "C", "id", 123);
- assertMockEndpointsSatisfied();
+ // should complete before timeout
+ mock.await(1500, TimeUnit.MILLISECONDS);
}
@Override
@@ -42,11 +59,12 @@ public class AggregateSimpleTimeoutTest
public void configure() throws Exception {
// START SNIPPET: e1
from("direct:start")
- // aggregate all exchanges correlated by the id header.
- // Aggregate them using the BodyInAggregatingStrategy strategy which
- // and after 3 seconds of inactivity them timeout and complete the aggregation
- // and send it to mock:aggregated
- .aggregate(header("id"), new BodyInAggregatingStrategy()).completionTimeout(3000)
+ .aggregate(header("id"), new BodyInAggregatingStrategy())
+ .completionSize(3)
+ // use a 3 second timeout
+ .completionTimeout(2000)
+ // and if timeout occurred then just discard the aggregated message
+ .discardOnCompletionTimeout()
.to("mock:aggregated");
// END SNIPPET: e1
}
Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.java (from r1001415, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java&r1=1001415&r2=1001436&rev=1001436&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.java Sun Sep 26 14:30:47 2010
@@ -17,14 +17,14 @@
package org.apache.camel.spring.processor.aggregator;
import org.apache.camel.CamelContext;
-import org.apache.camel.processor.aggregator.AggregateSimpleTimeoutTest;
+import org.apache.camel.processor.aggregator.AggregateDiscardOnTimeoutTest;
import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
/**
* @version $Revision$
*/
-public class SpringAggregateSimpleTimeoutTest extends AggregateSimpleTimeoutTest {
+public class SpringAggregateDiscardOnTimeoutTest extends AggregateDiscardOnTimeoutTest {
protected CamelContext createCamelContext() throws Exception {
return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml");
Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.xml (from r1001415, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml&r1=1001415&r2=1001436&rev=1001436&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateDiscardOnTimeoutTest.xml Sun Sep 26 14:30:47 2010
@@ -26,7 +26,7 @@
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
- <aggregate strategyRef="aggregatorStrategy" completionTimeout="3000">
+ <aggregate strategyRef="aggregatorStrategy" completionTimeout="3000" discardOnCompletionTimeout="true">
<correlationExpression>
<simple>header.id</simple>
</correlationExpression>