You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/02/15 19:05:14 UTC
svn commit: r910280 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
Author: davsclaus
Date: Mon Feb 15 18:05:14 2010
New Revision: 910280
URL: http://svn.apache.org/viewvc?rev=910280&view=rev
Log:
CAMEL-1686: Overhaul of aggregator.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=910280&r1=910279&r2=910280&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Mon Feb 15 18:05:14 2010
@@ -17,7 +17,9 @@
package org.apache.camel.processor.aggregate;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -53,9 +55,14 @@
private final Expression correlationExpression;
private ExecutorService executorService;
private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository();
+ private Set<Object> closedCorrelationKeys = new HashSet<Object>();
+
+ // options
+ private boolean ignoreBadCorrelationKeys;
+ private boolean closeCorrelationKeyOnCompletion;
// different ways to have completion triggered
- private boolean eagerEvaluateCompletionPredicate;
+ private boolean eagerCheckCompletion;
private Predicate completionPredicate;
private long completionTimeout;
private int completionAggregatedSize;
@@ -91,7 +98,22 @@
// compute correlation expression
Object key = correlationExpression.evaluate(exchange, Object.class);
if (ObjectHelper.isEmpty(key)) {
- throw new CamelExchangeException("Correlation key could not be evaluated to a value", exchange);
+ // we have a bad correlation key
+ if (isIgnoreBadCorrelationKeys()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Correlation key could not be evaluated to a value. Exchange will be ignored: " + exchange);
+ }
+ return;
+ } else {
+ throw new CamelExchangeException("Correlation key could not be evaluated to a value", exchange);
+ }
+ }
+
+ // is the correlation key closed?
+ if (isCloseCorrelationKeyOnCompletion()) {
+ if (closedCorrelationKeys.contains(key)) {
+ throw new CamelExchangeException("Correlation key has been closed", exchange);
+ }
}
Exchange oldExchange = aggregationRepository.get(key);
@@ -104,9 +126,9 @@
size++;
}
- // are we complete?
+ // check if we are complete
boolean complete = false;
- if (isEagerEvaluateCompletionPredicate()) {
+ if (isEagerCheckCompletion()) {
complete = isCompleted(key, exchange, size);
}
@@ -115,8 +137,9 @@
newExchange = onAggregation(oldExchange, newExchange);
newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
- // if not set to evaluate eager then do that after the aggregation
- if (!isEagerEvaluateCompletionPredicate()) {
+ // maybe we should check completion after the aggregation
+ if (!isEagerCheckCompletion()) {
+ // use the new aggregated exchange when testing
complete = isCompleted(key, newExchange, size);
}
@@ -169,6 +192,11 @@
timeoutMap.remove(key);
}
+ // this key has been closed so add it to the closed map
+ if (isCloseCorrelationKeyOnCompletion()) {
+ closedCorrelationKeys.add(key);
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("Aggregation complete for correlation key " + key + " sending aggregated exchange: " + exchange);
}
@@ -201,12 +229,12 @@
this.completionPredicate = completionPredicate;
}
- public boolean isEagerEvaluateCompletionPredicate() {
- return eagerEvaluateCompletionPredicate;
+ public boolean isEagerCheckCompletion() {
+ return eagerCheckCompletion;
}
- public void setEagerEvaluateCompletionPredicate(boolean eagerEvaluateCompletionPredicate) {
- this.eagerEvaluateCompletionPredicate = eagerEvaluateCompletionPredicate;
+ public void setEagerCheckCompletion(boolean eagerCheckCompletion) {
+ this.eagerCheckCompletion = eagerCheckCompletion;
}
public long getCompletionTimeout() {
@@ -225,6 +253,22 @@
this.completionAggregatedSize = completionAggregatedSize;
}
+ public boolean isIgnoreBadCorrelationKeys() {
+ return ignoreBadCorrelationKeys;
+ }
+
+ public void setIgnoreBadCorrelationKeys(boolean ignoreBadCorrelationKeys) {
+ this.ignoreBadCorrelationKeys = ignoreBadCorrelationKeys;
+ }
+
+ public boolean isCloseCorrelationKeyOnCompletion() {
+ return closeCorrelationKeyOnCompletion;
+ }
+
+ public void setCloseCorrelationKeyOnCompletion(boolean closeCorrelationKeyOnCompletion) {
+ this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
+ }
+
/**
* Background tasks that looks for aggregated exchanges which is triggered by completion timeouts.
*/
@@ -274,6 +318,7 @@
}
ServiceHelper.stopService(aggregationRepository);
+ closedCorrelationKeys.clear();
}
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=910280&r1=910279&r2=910280&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Mon Feb 15 18:05:14 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor.aggregator;
+import org.apache.camel.CamelExchangeException;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
@@ -49,7 +50,7 @@
AggregateProcessor ap = new AggregateProcessor(done, corr, as);
ap.setCompletionPredicate(complete);
- ap.setEagerEvaluateCompletionPredicate(false);
+ ap.setEagerCheckCompletion(false);
ap.start();
Exchange e1 = new DefaultExchange(context);
@@ -89,7 +90,7 @@
AggregateProcessor ap = new AggregateProcessor(done, corr, as);
ap.setCompletionPredicate(complete);
- ap.setEagerEvaluateCompletionPredicate(true);
+ ap.setEagerCheckCompletion(true);
ap.start();
Exchange e1 = new DefaultExchange(context);
@@ -136,7 +137,7 @@
AggregateProcessor ap = new AggregateProcessor(done, corr, as);
ap.setCompletionAggregatedSize(3);
- ap.setEagerEvaluateCompletionPredicate(eager);
+ ap.setEagerCheckCompletion(eager);
ap.start();
Exchange e1 = new DefaultExchange(context);
@@ -183,7 +184,7 @@
AggregateProcessor ap = new AggregateProcessor(done, corr, as);
ap.setCompletionTimeout(3000);
- ap.setEagerEvaluateCompletionPredicate(eager);
+ ap.setEagerCheckCompletion(eager);
ap.start();
Exchange e1 = new DefaultExchange(context);
@@ -218,4 +219,138 @@
ap.stop();
}
+ public void testAggregateIgnoreBadCorrelationKey() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("A+C+END");
+
+ Processor done = new SendProcessor(context.getEndpoint("mock:result"));
+ Expression corr = header("id");
+ AggregationStrategy as = new BodyInAggregatingStrategy();
+ Predicate complete = body().contains("END");
+
+ AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+ ap.setCompletionPredicate(complete);
+ ap.setIgnoreBadCorrelationKeys(true);
+
+ ap.start();
+
+ Exchange e1 = new DefaultExchange(context);
+ e1.getIn().setBody("A");
+ e1.getIn().setHeader("id", 123);
+
+ Exchange e2 = new DefaultExchange(context);
+ e2.getIn().setBody("B");
+
+ Exchange e3 = new DefaultExchange(context);
+ e3.getIn().setBody("C");
+ e3.getIn().setHeader("id", 123);
+
+ Exchange e4 = new DefaultExchange(context);
+ e4.getIn().setBody("END");
+ e4.getIn().setHeader("id", 123);
+
+ ap.process(e1);
+ ap.process(e2);
+ ap.process(e3);
+ ap.process(e4);
+
+ assertMockEndpointsSatisfied();
+
+ ap.stop();
+ }
+
+ public void testAggregateBadCorrelationKey() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("A+C+END");
+
+ Processor done = new SendProcessor(context.getEndpoint("mock:result"));
+ Expression corr = header("id");
+ AggregationStrategy as = new BodyInAggregatingStrategy();
+ Predicate complete = body().contains("END");
+
+ AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+ ap.setCompletionPredicate(complete);
+
+ ap.start();
+
+ Exchange e1 = new DefaultExchange(context);
+ e1.getIn().setBody("A");
+ e1.getIn().setHeader("id", 123);
+
+ Exchange e2 = new DefaultExchange(context);
+ e2.getIn().setBody("B");
+
+ Exchange e3 = new DefaultExchange(context);
+ e3.getIn().setBody("C");
+ e3.getIn().setHeader("id", 123);
+
+
+ Exchange e4 = new DefaultExchange(context);
+ e4.getIn().setBody("END");
+ e4.getIn().setHeader("id", 123);
+
+ ap.process(e1);
+
+ try {
+ ap.process(e2);
+ fail("Should have thrown an exception");
+ } catch (CamelExchangeException e) {
+ assertEquals("Correlation key could not be evaluated to a value. Exchange[Message: B]", e.getMessage());
+ }
+
+ ap.process(e3);
+ ap.process(e4);
+
+ assertMockEndpointsSatisfied();
+
+ ap.stop();
+ }
+
+ public void testAggregateCloseCorrelationKeyOnCompletion() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("A+B+END");
+
+ Processor done = new SendProcessor(context.getEndpoint("mock:result"));
+ Expression corr = header("id");
+ AggregationStrategy as = new BodyInAggregatingStrategy();
+ Predicate complete = body().contains("END");
+
+ AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+ ap.setCompletionPredicate(complete);
+ ap.setCloseCorrelationKeyOnCompletion(true);
+
+ ap.start();
+
+ Exchange e1 = new DefaultExchange(context);
+ e1.getIn().setBody("A");
+ e1.getIn().setHeader("id", 123);
+
+ Exchange e2 = new DefaultExchange(context);
+ e2.getIn().setBody("B");
+ e2.getIn().setHeader("id", 123);
+
+ Exchange e3 = new DefaultExchange(context);
+ e3.getIn().setBody("END");
+ e3.getIn().setHeader("id", 123);
+
+ Exchange e4 = new DefaultExchange(context);
+ e4.getIn().setBody("C");
+ e4.getIn().setHeader("id", 123);
+
+ ap.process(e1);
+ ap.process(e2);
+ ap.process(e3);
+
+ try {
+ ap.process(e4);
+ fail("Should have thrown an exception");
+ } catch (CamelExchangeException e) {
+ assertEquals("Correlation key has been closed. Exchange[Message: C]", e.getMessage());
+ }
+
+ assertMockEndpointsSatisfied();
+
+ ap.stop();
+ }
+
}