You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/02/17 13:28:21 UTC
svn commit: r910948 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/model/
main/java/org/apache/camel/processor/aggregate/
test/java/org/apache/camel/processor/aggregator/
Author: davsclaus
Date: Wed Feb 17 12:28:21 2010
New Revision: 910948
URL: http://svn.apache.org/viewvc?rev=910948&view=rev
Log:
CAMEL-1686: Overhaul of aggregator. Work in progress.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=910948&r1=910947&r2=910948&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Wed Feb 17 12:28:21 2010
@@ -77,7 +77,7 @@
@XmlAttribute(required = false)
private Boolean ignoreBadCorrelationKeys;
@XmlAttribute(required = false)
- private Boolean closeCorrelationKeyOnCompletion;
+ private Integer closeCorrelationKeyOnCompletion;
public AggregateDefinition() {
}
@@ -159,15 +159,15 @@
if (isCompletionFromBatchConsumer() != null) {
answer.setCompletionFromBatchConsumer(isCompletionFromBatchConsumer());
}
- if (isCloseCorrelationKeyOnCompletion() != null) {
- answer.setCloseCorrelationKeyOnCompletion(isCloseCorrelationKeyOnCompletion());
- }
if (isEagerCheckCompletion() != null) {
answer.setEagerCheckCompletion(isEagerCheckCompletion());
}
if (isIgnoreBadCorrelationKeys() != null) {
answer.setIgnoreBadCorrelationKeys(isIgnoreBadCorrelationKeys());
}
+ if (getCloseCorrelationKeyOnCompletion() != null) {
+ answer.setCloseCorrelationKeyOnCompletion(getCloseCorrelationKeyOnCompletion());
+ }
return answer;
}
@@ -298,11 +298,11 @@
this.ignoreBadCorrelationKeys = ignoreBadCorrelationKeys;
}
- public Boolean isCloseCorrelationKeyOnCompletion() {
+ public Integer getCloseCorrelationKeyOnCompletion() {
return closeCorrelationKeyOnCompletion;
}
- public void setCloseCorrelationKeyOnCompletion(Boolean closeCorrelationKeyOnCompletion) {
+ public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) {
this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
}
@@ -336,10 +336,12 @@
* that has been closed, it will be defined and a {@link org.apache.camel.processor.aggregate.ClosedCorrelationKeyException}
* is thrown.
*
+ * @param capacity the maximum capacity of the closed correlation key cache.
+ * Use <tt>0</tt> or negative value for unbounded capacity.
* @return builder
*/
- public AggregateDefinition closeCorrelationKeyOnCompletion() {
- setCloseCorrelationKeyOnCompletion(true);
+ public AggregateDefinition closeCorrelationKeyOnCompletion(int capacity) {
+ setCloseCorrelationKeyOnCompletion(capacity);
return this;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=910948&r1=910947&r2=910948&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Feb 17 12:28:21 2010
@@ -17,9 +17,9 @@
package org.apache.camel.processor.aggregate;
import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.List;
-import java.util.Set;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -35,6 +35,7 @@
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.DefaultTimeoutMap;
import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.LRUCache;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.TimeoutMap;
@@ -71,11 +72,11 @@
private ExecutorService executorService;
private ExceptionHandler exceptionHandler;
private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository();
- private Set<Object> closedCorrelationKeys = new HashSet<Object>();
+ private Map<Object, Object> closedCorrelationKeys;
// options
private boolean ignoreBadCorrelationKeys;
- private boolean closeCorrelationKeyOnCompletion;
+ private Integer closeCorrelationKeyOnCompletion;
private boolean parallelProcessing;
// different ways to have completion triggered
@@ -133,10 +134,8 @@
}
// is the correlation key closed?
- if (isCloseCorrelationKeyOnCompletion()) {
- if (closedCorrelationKeys.contains(key)) {
- throw new ClosedCorrelationKeyException(key, exchange);
- }
+ if (closedCorrelationKeys != null && closedCorrelationKeys.containsKey(key)) {
+ throw new ClosedCorrelationKeyException(key, exchange);
}
doAggregation(key, exchange);
@@ -250,8 +249,8 @@
}
// this key has been closed so add it to the closed map
- if (isCloseCorrelationKeyOnCompletion()) {
- closedCorrelationKeys.add(key);
+ if (closedCorrelationKeys != null) {
+ closedCorrelationKeys.put(key, key);
}
if (LOG.isDebugEnabled()) {
@@ -326,11 +325,11 @@
this.ignoreBadCorrelationKeys = ignoreBadCorrelationKeys;
}
- public boolean isCloseCorrelationKeyOnCompletion() {
+ public Integer getCloseCorrelationKeyOnCompletion() {
return closeCorrelationKeyOnCompletion;
}
- public void setCloseCorrelationKeyOnCompletion(boolean closeCorrelationKeyOnCompletion) {
+ public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) {
this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
}
@@ -386,6 +385,17 @@
+ " [completionTimeout, completionAggregatedSize, completionPredicate] must be set");
}
+ if (getCloseCorrelationKeyOnCompletion() != null) {
+ if (getCloseCorrelationKeyOnCompletion() > 0) {
+ LOG.info("Using ClosedCorrelationKeys with a LRUCache with a capacity of " + getCloseCorrelationKeyOnCompletion());
+ closedCorrelationKeys = new LRUCache<Object, Object>(getCloseCorrelationKeyOnCompletion());
+ } else {
+ LOG.info("Using ClosedCorrelationKeys with unbounded capacity");
+ closedCorrelationKeys = new HashMap<Object, Object>();
+ }
+
+ }
+
ServiceHelper.startService(aggregationRepository);
if (executorService == null) {
@@ -416,7 +426,10 @@
}
ServiceHelper.stopService(aggregationRepository);
- closedCorrelationKeys.clear();
+
+ if (closedCorrelationKeys != null) {
+ closedCorrelationKeys.clear();
+ }
}
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java?rev=910948&r1=910947&r2=910948&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java Wed Feb 17 12:28:21 2010
@@ -27,7 +27,23 @@
*/
public class AggregateClosedCorrelationKeyTest extends ContextTestSupport {
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
public void testAggregateClosedCorrelationKey() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new BodyInAggregatingStrategy())
+ .completionSize(2).closeCorrelationKeyOnCompletion(1000)
+ .to("mock:result");
+ }
+ });
+ context.start();
+
getMockEndpoint("mock:result").expectedBodiesReceived("A+B");
template.sendBodyAndHeader("direct:start", "A", "id", 1);
@@ -46,17 +62,51 @@
assertMockEndpointsSatisfied();
}
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
+ public void testAggregateClosedCorrelationKeyCache() throws Exception {
+ context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
- .aggregate(header("id"), new BodyInAggregatingStrategy())
- .completionSize(2).closeCorrelationKeyOnCompletion()
+ .aggregate(header("id"), new BodyInAggregatingStrategy())
+ .completionSize(2).closeCorrelationKeyOnCompletion(2)
.to("mock:result");
-
}
- };
+ });
+ context.start();
+
+ getMockEndpoint("mock:result").expectedBodiesReceived("A+B", "C+D", "E+F");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 1);
+ template.sendBodyAndHeader("direct:start", "B", "id", 1);
+ template.sendBodyAndHeader("direct:start", "C", "id", 2);
+ template.sendBodyAndHeader("direct:start", "D", "id", 2);
+ template.sendBodyAndHeader("direct:start", "E", "id", 3);
+ template.sendBodyAndHeader("direct:start", "F", "id", 3);
+
+ // should NOT be closed because only 2 and 3 is remembered as they are the two last used
+ template.sendBodyAndHeader("direct:start", "G", "id", 1);
+
+ // should be closed
+ try {
+ template.sendBodyAndHeader("direct:start", "H", "id", 2);
+ fail("Should throw an exception");
+ } catch (CamelExecutionException e) {
+ ClosedCorrelationKeyException cause = assertIsInstanceOf(ClosedCorrelationKeyException.class, e.getCause());
+ assertEquals(2, cause.getCorrelationKey());
+ assertEquals("The correlation key [2] has been closed. Exchange[Message: H]", cause.getMessage());
+ }
+
+ // should be closed
+ try {
+ template.sendBodyAndHeader("direct:start", "I", "id", 3);
+ fail("Should throw an exception");
+ } catch (CamelExecutionException e) {
+ ClosedCorrelationKeyException cause = assertIsInstanceOf(ClosedCorrelationKeyException.class, e.getCause());
+ assertEquals(3, cause.getCorrelationKey());
+ assertEquals("The correlation key [3] has been closed. Exchange[Message: I]", cause.getMessage());
+ }
+
+ assertMockEndpointsSatisfied();
}
+
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=910948&r1=910947&r2=910948&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Wed Feb 17 12:28:21 2010
@@ -320,7 +320,7 @@
AggregateProcessor ap = new AggregateProcessor(done, corr, as);
ap.setCompletionPredicate(complete);
- ap.setCloseCorrelationKeyOnCompletion(true);
+ ap.setCloseCorrelationKeyOnCompletion(1000);
ap.start();