You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/02/17 13:28:21 UTC

svn commit: r910948 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/aggregate/ test/java/org/apache/camel/processor/aggregator/

Author: davsclaus
Date: Wed Feb 17 12:28:21 2010
New Revision: 910948

URL: http://svn.apache.org/viewvc?rev=910948&view=rev
Log:
CAMEL-1686: Overhaul of aggregator. Work in progress.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=910948&r1=910947&r2=910948&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Wed Feb 17 12:28:21 2010
@@ -77,7 +77,7 @@
     @XmlAttribute(required = false)
     private Boolean ignoreBadCorrelationKeys;
     @XmlAttribute(required = false)
-    private Boolean closeCorrelationKeyOnCompletion;
+    private Integer closeCorrelationKeyOnCompletion;
 
     public AggregateDefinition() {
     }
@@ -159,15 +159,15 @@
         if (isCompletionFromBatchConsumer() != null) {
             answer.setCompletionFromBatchConsumer(isCompletionFromBatchConsumer());
         }
-        if (isCloseCorrelationKeyOnCompletion() != null) {
-            answer.setCloseCorrelationKeyOnCompletion(isCloseCorrelationKeyOnCompletion());
-        }
         if (isEagerCheckCompletion() != null) {
             answer.setEagerCheckCompletion(isEagerCheckCompletion());
         }
         if (isIgnoreBadCorrelationKeys() != null) {
             answer.setIgnoreBadCorrelationKeys(isIgnoreBadCorrelationKeys());
         }
+        if (getCloseCorrelationKeyOnCompletion() != null) {
+            answer.setCloseCorrelationKeyOnCompletion(getCloseCorrelationKeyOnCompletion());
+        }
 
         return answer;
     }
@@ -298,11 +298,11 @@
         this.ignoreBadCorrelationKeys = ignoreBadCorrelationKeys;
     }
 
-    public Boolean isCloseCorrelationKeyOnCompletion() {
+    public Integer getCloseCorrelationKeyOnCompletion() {
         return closeCorrelationKeyOnCompletion;
     }
 
-    public void setCloseCorrelationKeyOnCompletion(Boolean closeCorrelationKeyOnCompletion) {
+    public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) {
         this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
     }
 
@@ -336,10 +336,12 @@
      * that has been closed, it will be defined and a {@link org.apache.camel.processor.aggregate.ClosedCorrelationKeyException}
      * is thrown.
      *
+     * @param capacity the maximum capacity of the closed correlation key cache.
+     *                 Use <tt>0</tt> or negative value for unbounded capacity.
      * @return builder
      */
-    public AggregateDefinition closeCorrelationKeyOnCompletion() {
-        setCloseCorrelationKeyOnCompletion(true);
+    public AggregateDefinition closeCorrelationKeyOnCompletion(int capacity) {
+        setCloseCorrelationKeyOnCompletion(capacity);
         return this;
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=910948&r1=910947&r2=910948&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Feb 17 12:28:21 2010
@@ -17,9 +17,9 @@
 package org.apache.camel.processor.aggregate;
 
 import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -35,6 +35,7 @@
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.util.DefaultTimeoutMap;
 import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.LRUCache;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.TimeoutMap;
@@ -71,11 +72,11 @@
     private ExecutorService executorService;
     private ExceptionHandler exceptionHandler;
     private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository();
-    private Set<Object> closedCorrelationKeys = new HashSet<Object>();
+    private Map<Object, Object> closedCorrelationKeys;
 
     // options
     private boolean ignoreBadCorrelationKeys;
-    private boolean closeCorrelationKeyOnCompletion;
+    private Integer closeCorrelationKeyOnCompletion;
     private boolean parallelProcessing;
 
     // different ways to have completion triggered
@@ -133,10 +134,8 @@
         }
 
         // is the correlation key closed?
-        if (isCloseCorrelationKeyOnCompletion()) {
-            if (closedCorrelationKeys.contains(key)) {
-                throw new ClosedCorrelationKeyException(key, exchange);
-            }
+        if (closedCorrelationKeys != null && closedCorrelationKeys.containsKey(key)) {
+            throw new ClosedCorrelationKeyException(key, exchange);
         }
 
         doAggregation(key, exchange);
@@ -250,8 +249,8 @@
         }
 
         // this key has been closed so add it to the closed map
-        if (isCloseCorrelationKeyOnCompletion()) {
-            closedCorrelationKeys.add(key);
+        if (closedCorrelationKeys != null) {
+            closedCorrelationKeys.put(key, key);
         }
 
         if (LOG.isDebugEnabled()) {
@@ -326,11 +325,11 @@
         this.ignoreBadCorrelationKeys = ignoreBadCorrelationKeys;
     }
 
-    public boolean isCloseCorrelationKeyOnCompletion() {
+    public Integer getCloseCorrelationKeyOnCompletion() {
         return closeCorrelationKeyOnCompletion;
     }
 
-    public void setCloseCorrelationKeyOnCompletion(boolean closeCorrelationKeyOnCompletion) {
+    public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) {
         this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
     }
 
@@ -386,6 +385,17 @@
                     + " [completionTimeout, completionAggregatedSize, completionPredicate] must be set");
         }
 
+        if (getCloseCorrelationKeyOnCompletion() != null) {
+            if (getCloseCorrelationKeyOnCompletion() > 0) {
+                LOG.info("Using ClosedCorrelationKeys with a LRUCache with a capacity of " + getCloseCorrelationKeyOnCompletion());
+                closedCorrelationKeys = new LRUCache<Object, Object>(getCloseCorrelationKeyOnCompletion());
+            } else {
+                LOG.info("Using ClosedCorrelationKeys with unbounded capacity");
+                closedCorrelationKeys = new HashMap<Object, Object>();
+            }
+
+        }
+
         ServiceHelper.startService(aggregationRepository);
 
         if (executorService == null) {
@@ -416,7 +426,10 @@
         }
 
         ServiceHelper.stopService(aggregationRepository);
-        closedCorrelationKeys.clear();
+
+        if (closedCorrelationKeys != null) {
+            closedCorrelationKeys.clear();
+        }
     }
 
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java?rev=910948&r1=910947&r2=910948&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java Wed Feb 17 12:28:21 2010
@@ -27,7 +27,23 @@
  */
 public class AggregateClosedCorrelationKeyTest extends ContextTestSupport {
 
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
     public void testAggregateClosedCorrelationKey() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .aggregate(header("id"), new BodyInAggregatingStrategy())
+                        .completionSize(2).closeCorrelationKeyOnCompletion(1000)
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
         getMockEndpoint("mock:result").expectedBodiesReceived("A+B");
 
         template.sendBodyAndHeader("direct:start", "A", "id", 1);
@@ -46,17 +62,51 @@
         assertMockEndpointsSatisfied();
     }
 
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
+    public void testAggregateClosedCorrelationKeyCache() throws Exception {
+        context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .aggregate(header("id"), new BodyInAggregatingStrategy())
-                        .completionSize(2).closeCorrelationKeyOnCompletion()
+                        .aggregate(header("id"), new BodyInAggregatingStrategy())
+                        .completionSize(2).closeCorrelationKeyOnCompletion(2)
                         .to("mock:result");
-
             }
-        };
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("A+B", "C+D", "E+F");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 1);
+        template.sendBodyAndHeader("direct:start", "B", "id", 1);
+        template.sendBodyAndHeader("direct:start", "C", "id", 2);
+        template.sendBodyAndHeader("direct:start", "D", "id", 2);
+        template.sendBodyAndHeader("direct:start", "E", "id", 3);
+        template.sendBodyAndHeader("direct:start", "F", "id", 3);
+
+        // should NOT be closed because only 2 and 3 is remembered as they are the two last used
+        template.sendBodyAndHeader("direct:start", "G", "id", 1);
+
+        // should be closed
+        try {
+            template.sendBodyAndHeader("direct:start", "H", "id", 2);
+            fail("Should throw an exception");
+        } catch (CamelExecutionException e) {
+            ClosedCorrelationKeyException cause = assertIsInstanceOf(ClosedCorrelationKeyException.class, e.getCause());
+            assertEquals(2, cause.getCorrelationKey());
+            assertEquals("The correlation key [2] has been closed. Exchange[Message: H]", cause.getMessage());
+        }
+
+        // should be closed
+        try {
+            template.sendBodyAndHeader("direct:start", "I", "id", 3);
+            fail("Should throw an exception");
+        } catch (CamelExecutionException e) {
+            ClosedCorrelationKeyException cause = assertIsInstanceOf(ClosedCorrelationKeyException.class, e.getCause());
+            assertEquals(3, cause.getCorrelationKey());
+            assertEquals("The correlation key [3] has been closed. Exchange[Message: I]", cause.getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
     }
+
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=910948&r1=910947&r2=910948&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Wed Feb 17 12:28:21 2010
@@ -320,7 +320,7 @@
 
         AggregateProcessor ap = new AggregateProcessor(done, corr, as);
         ap.setCompletionPredicate(complete);
-        ap.setCloseCorrelationKeyOnCompletion(true);
+        ap.setCloseCorrelationKeyOnCompletion(1000);
 
         ap.start();