You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/11 09:15:43 UTC
svn commit: r932842 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java
Author: davsclaus
Date: Sun Apr 11 07:15:43 2010
New Revision: 932842
URL: http://svn.apache.org/viewvc?rev=932842&view=rev
Log:
CAMEL-2629: Fixed completeBatchConsumer when multiple correlation keys was used
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=932842&r1=932841&r2=932842&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Sun Apr 11 07:15:43 2010
@@ -19,6 +19,7 @@ package org.apache.camel.processor.aggre
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -86,6 +87,7 @@ public class AggregateProcessor extends
private ExceptionHandler exceptionHandler = new LoggingExceptionHandler(getClass());
private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository();
private Map<Object, Object> closedCorrelationKeys;
+ private Set<Object> batchConsumerCorrelationKeys = new LinkedHashSet<Object>();
private final Set<String> inProgressCompleteExchanges = new HashSet<String>();
private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>();
// optional dead letter channel for exhausted recovered exchanges
@@ -233,9 +235,19 @@ public class AggregateProcessor extends
}
aggregationRepository.add(exchange.getContext(), key, answer);
} else {
- // TODO: if we are completed from batch consumer then they should all complete (trigger that like timeout map)
- answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
- onCompletion(key, answer, false);
+ // if batch consumer completion is enabled then we need to complete the group
+ if ("consumer".equals(complete)) {
+ for (Object batchKey : batchConsumerCorrelationKeys) {
+ Exchange batchAnswer = aggregationRepository.get(camelContext, batchKey);
+ batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
+ onCompletion(batchKey, batchAnswer, false);
+ }
+ batchConsumerCorrelationKeys.clear();
+ } else {
+ // we are complete for this exchange
+ answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
+ onCompletion(key, answer, false);
+ }
}
if (LOG.isTraceEnabled()) {
@@ -300,6 +312,7 @@ public class AggregateProcessor extends
}
if (isCompletionFromBatchConsumer()) {
+ batchConsumerCorrelationKeys.add(key);
batchConsumerCounter.incrementAndGet();
int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class);
if (size > 0 && batchConsumerCounter.intValue() >= size) {
@@ -710,6 +723,7 @@ public class AggregateProcessor extends
if (closedCorrelationKeys != null) {
closedCorrelationKeys.clear();
}
+ batchConsumerCorrelationKeys.clear();
redeliveryState.clear();
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java?rev=932842&r1=932841&r2=932842&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java Sun Apr 11 07:15:43 2010
@@ -32,13 +32,7 @@ public class FileConcurrentAggregateBatc
private static final Log LOG = LogFactory.getLog(FileConcurrentAggregateBatchConsumerTest.class);
- // TODO: batchConsumer needs to be reworked
-
- public void testDummy() {
- // noop
- }
-
- public void xxxTestProcessFilesConcurrently() throws Exception {
+ public void testProcessFilesConcurrently() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
@@ -83,7 +77,7 @@ public class FileConcurrentAggregateBatc
}
}
- public void xxxTestProcessFilesSequentiel() throws Exception {
+ public void testProcessFilesSequentiel() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {