You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/11 09:15:43 UTC

svn commit: r932842 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java

Author: davsclaus
Date: Sun Apr 11 07:15:43 2010
New Revision: 932842

URL: http://svn.apache.org/viewvc?rev=932842&view=rev
Log:
CAMEL-2629: Fixed completeBatchConsumer when multiple correlation keys was used

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=932842&r1=932841&r2=932842&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Sun Apr 11 07:15:43 2010
@@ -19,6 +19,7 @@ package org.apache.camel.processor.aggre
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -86,6 +87,7 @@ public class AggregateProcessor extends 
     private ExceptionHandler exceptionHandler = new LoggingExceptionHandler(getClass());
     private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository();
     private Map<Object, Object> closedCorrelationKeys;
+    private Set<Object> batchConsumerCorrelationKeys = new LinkedHashSet<Object>();
     private final Set<String> inProgressCompleteExchanges = new HashSet<String>();
     private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>();
     // optional dead letter channel for exhausted recovered exchanges
@@ -233,9 +235,19 @@ public class AggregateProcessor extends 
             }
             aggregationRepository.add(exchange.getContext(), key, answer);
         } else {
-            // TODO: if we are completed from batch consumer then they should all complete (trigger that like timeout map)
-            answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
-            onCompletion(key, answer, false);
+            // if batch consumer completion is enabled then we need to complete the group
+            if ("consumer".equals(complete)) {
+                for (Object batchKey : batchConsumerCorrelationKeys) {
+                    Exchange batchAnswer = aggregationRepository.get(camelContext, batchKey);
+                    batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
+                    onCompletion(batchKey, batchAnswer, false);
+                }
+                batchConsumerCorrelationKeys.clear();
+            } else {
+                // we are complete for this exchange
+                answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
+                onCompletion(key, answer, false);
+            }
         }
 
         if (LOG.isTraceEnabled()) {
@@ -300,6 +312,7 @@ public class AggregateProcessor extends 
         }
 
         if (isCompletionFromBatchConsumer()) {
+            batchConsumerCorrelationKeys.add(key);
             batchConsumerCounter.incrementAndGet();
             int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class);
             if (size > 0 && batchConsumerCounter.intValue() >= size) {
@@ -710,6 +723,7 @@ public class AggregateProcessor extends 
         if (closedCorrelationKeys != null) {
             closedCorrelationKeys.clear();
         }
+        batchConsumerCorrelationKeys.clear();
         redeliveryState.clear();
     }
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java?rev=932842&r1=932841&r2=932842&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java Sun Apr 11 07:15:43 2010
@@ -32,13 +32,7 @@ public class FileConcurrentAggregateBatc
 
     private static final Log LOG = LogFactory.getLog(FileConcurrentAggregateBatchConsumerTest.class);
 
-    // TODO: batchConsumer needs to be reworked
-
-    public void testDummy() {
-        // noop
-    }
-
-    public void xxxTestProcessFilesConcurrently() throws Exception {
+    public void testProcessFilesConcurrently() throws Exception {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
@@ -83,7 +77,7 @@ public class FileConcurrentAggregateBatc
         }
     }
 
-    public void xxxTestProcessFilesSequentiel() throws Exception {
+    public void testProcessFilesSequentiel() throws Exception {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {