You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/02/15 19:05:14 UTC

svn commit: r910280 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java

Author: davsclaus
Date: Mon Feb 15 18:05:14 2010
New Revision: 910280

URL: http://svn.apache.org/viewvc?rev=910280&view=rev
Log:
CAMEL-1686: Overhaul of aggregator.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=910280&r1=910279&r2=910280&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Mon Feb 15 18:05:14 2010
@@ -17,7 +17,9 @@
 package org.apache.camel.processor.aggregate;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -53,9 +55,14 @@
     private final Expression correlationExpression;
     private ExecutorService executorService;
     private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository();
+    private Set<Object> closedCorrelationKeys = new HashSet<Object>();
+
+    // options
+    private boolean ignoreBadCorrelationKeys;
+    private boolean closeCorrelationKeyOnCompletion;
 
     // different ways to have completion triggered
-    private boolean eagerEvaluateCompletionPredicate;
+    private boolean eagerCheckCompletion;
     private Predicate completionPredicate;
     private long completionTimeout;
     private int completionAggregatedSize;
@@ -91,7 +98,22 @@
         // compute correlation expression
         Object key = correlationExpression.evaluate(exchange, Object.class);
         if (ObjectHelper.isEmpty(key)) {
-            throw new CamelExchangeException("Correlation key could not be evaluated to a value", exchange);
+            // we have a bad correlation key
+            if (isIgnoreBadCorrelationKeys()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Correlation key could not be evaluated to a value. Exchange will be ignored: " + exchange);
+                }
+                return;
+            } else {
+                throw new CamelExchangeException("Correlation key could not be evaluated to a value", exchange);
+            }
+        }
+
+        // is the correlation key closed?
+        if (isCloseCorrelationKeyOnCompletion()) {
+            if (closedCorrelationKeys.contains(key)) {
+                throw new CamelExchangeException("Correlation key has been closed", exchange);
+            }
         }
 
         Exchange oldExchange = aggregationRepository.get(key);
@@ -104,9 +126,9 @@
             size++;
         }
 
-        // are we complete?
+        // check if we are complete
         boolean complete = false;
-        if (isEagerEvaluateCompletionPredicate()) {
+        if (isEagerCheckCompletion()) {
             complete = isCompleted(key, exchange, size);
         }
 
@@ -115,8 +137,9 @@
         newExchange = onAggregation(oldExchange, newExchange);
         newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
 
-        // if not set to evaluate eager then do that after the aggregation
-        if (!isEagerEvaluateCompletionPredicate()) {
+        // maybe we should check completion after the aggregation
+        if (!isEagerCheckCompletion()) {
+            // use the new aggregated exchange when testing
             complete = isCompleted(key, newExchange, size);
         }
 
@@ -169,6 +192,11 @@
             timeoutMap.remove(key);
         }
 
+        // this key has been closed so add it to the closed map
+        if (isCloseCorrelationKeyOnCompletion()) {
+            closedCorrelationKeys.add(key);
+        }
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("Aggregation complete for correlation key " + key + " sending aggregated exchange: " + exchange);
         }
@@ -201,12 +229,12 @@
         this.completionPredicate = completionPredicate;
     }
 
-    public boolean isEagerEvaluateCompletionPredicate() {
-        return eagerEvaluateCompletionPredicate;
+    public boolean isEagerCheckCompletion() {
+        return eagerCheckCompletion;
     }
 
-    public void setEagerEvaluateCompletionPredicate(boolean eagerEvaluateCompletionPredicate) {
-        this.eagerEvaluateCompletionPredicate = eagerEvaluateCompletionPredicate;
+    public void setEagerCheckCompletion(boolean eagerCheckCompletion) {
+        this.eagerCheckCompletion = eagerCheckCompletion;
     }
 
     public long getCompletionTimeout() {
@@ -225,6 +253,22 @@
         this.completionAggregatedSize = completionAggregatedSize;
     }
 
+    public boolean isIgnoreBadCorrelationKeys() {
+        return ignoreBadCorrelationKeys;
+    }
+
+    public void setIgnoreBadCorrelationKeys(boolean ignoreBadCorrelationKeys) {
+        this.ignoreBadCorrelationKeys = ignoreBadCorrelationKeys;
+    }
+
+    public boolean isCloseCorrelationKeyOnCompletion() {
+        return closeCorrelationKeyOnCompletion;
+    }
+
+    public void setCloseCorrelationKeyOnCompletion(boolean closeCorrelationKeyOnCompletion) {
+        this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
+    }
+
     /**
      * Background tasks that looks for aggregated exchanges which is triggered by completion timeouts.
      */
@@ -274,6 +318,7 @@
         }
 
         ServiceHelper.stopService(aggregationRepository);
+        closedCorrelationKeys.clear();
     }
 
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=910280&r1=910279&r2=910280&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Mon Feb 15 18:05:14 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor.aggregator;
 
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
@@ -49,7 +50,7 @@
 
         AggregateProcessor ap = new AggregateProcessor(done, corr, as);
         ap.setCompletionPredicate(complete);
-        ap.setEagerEvaluateCompletionPredicate(false);
+        ap.setEagerCheckCompletion(false);
         ap.start();
 
         Exchange e1 = new DefaultExchange(context);
@@ -89,7 +90,7 @@
 
         AggregateProcessor ap = new AggregateProcessor(done, corr, as);
         ap.setCompletionPredicate(complete);
-        ap.setEagerEvaluateCompletionPredicate(true);
+        ap.setEagerCheckCompletion(true);
         ap.start();
 
         Exchange e1 = new DefaultExchange(context);
@@ -136,7 +137,7 @@
 
         AggregateProcessor ap = new AggregateProcessor(done, corr, as);
         ap.setCompletionAggregatedSize(3);
-        ap.setEagerEvaluateCompletionPredicate(eager);
+        ap.setEagerCheckCompletion(eager);
         ap.start();
 
         Exchange e1 = new DefaultExchange(context);
@@ -183,7 +184,7 @@
 
         AggregateProcessor ap = new AggregateProcessor(done, corr, as);
         ap.setCompletionTimeout(3000);
-        ap.setEagerEvaluateCompletionPredicate(eager);
+        ap.setEagerCheckCompletion(eager);
         ap.start();
 
         Exchange e1 = new DefaultExchange(context);
@@ -218,4 +219,138 @@
         ap.stop();
     }
 
+    public void testAggregateIgnoreBadCorrelationKey() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("A+C+END");
+
+        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
+        Expression corr = header("id");
+        AggregationStrategy as = new BodyInAggregatingStrategy();
+        Predicate complete = body().contains("END");
+
+        AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+        ap.setCompletionPredicate(complete);
+        ap.setIgnoreBadCorrelationKeys(true);
+
+        ap.start();
+
+        Exchange e1 = new DefaultExchange(context);
+        e1.getIn().setBody("A");
+        e1.getIn().setHeader("id", 123);
+
+        Exchange e2 = new DefaultExchange(context);
+        e2.getIn().setBody("B");
+
+        Exchange e3 = new DefaultExchange(context);
+        e3.getIn().setBody("C");
+        e3.getIn().setHeader("id", 123);
+
+        Exchange e4 = new DefaultExchange(context);
+        e4.getIn().setBody("END");
+        e4.getIn().setHeader("id", 123);
+
+        ap.process(e1);
+        ap.process(e2);
+        ap.process(e3);
+        ap.process(e4);
+
+        assertMockEndpointsSatisfied();
+
+        ap.stop();
+    }
+
+    public void testAggregateBadCorrelationKey() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("A+C+END");
+
+        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
+        Expression corr = header("id");
+        AggregationStrategy as = new BodyInAggregatingStrategy();
+        Predicate complete = body().contains("END");
+
+        AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+        ap.setCompletionPredicate(complete);
+
+        ap.start();
+
+        Exchange e1 = new DefaultExchange(context);
+        e1.getIn().setBody("A");
+        e1.getIn().setHeader("id", 123);
+
+        Exchange e2 = new DefaultExchange(context);
+        e2.getIn().setBody("B");
+
+        Exchange e3 = new DefaultExchange(context);
+        e3.getIn().setBody("C");
+        e3.getIn().setHeader("id", 123);
+
+
+        Exchange e4 = new DefaultExchange(context);
+        e4.getIn().setBody("END");
+        e4.getIn().setHeader("id", 123);
+
+        ap.process(e1);
+
+        try {
+            ap.process(e2);
+            fail("Should have thrown an exception");
+        } catch (CamelExchangeException e) {
+            assertEquals("Correlation key could not be evaluated to a value. Exchange[Message: B]", e.getMessage());
+        }
+
+        ap.process(e3);
+        ap.process(e4);
+
+        assertMockEndpointsSatisfied();
+
+        ap.stop();
+    }
+
+    public void testAggregateCloseCorrelationKeyOnCompletion() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("A+B+END");
+
+        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
+        Expression corr = header("id");
+        AggregationStrategy as = new BodyInAggregatingStrategy();
+        Predicate complete = body().contains("END");
+
+        AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+        ap.setCompletionPredicate(complete);
+        ap.setCloseCorrelationKeyOnCompletion(true);
+
+        ap.start();
+
+        Exchange e1 = new DefaultExchange(context);
+        e1.getIn().setBody("A");
+        e1.getIn().setHeader("id", 123);
+
+        Exchange e2 = new DefaultExchange(context);
+        e2.getIn().setBody("B");
+        e2.getIn().setHeader("id", 123);
+
+        Exchange e3 = new DefaultExchange(context);
+        e3.getIn().setBody("END");
+        e3.getIn().setHeader("id", 123);
+
+        Exchange e4 = new DefaultExchange(context);
+        e4.getIn().setBody("C");
+        e4.getIn().setHeader("id", 123);
+
+        ap.process(e1);
+        ap.process(e2);
+        ap.process(e3);
+
+        try {
+            ap.process(e4);
+            fail("Should have thrown an exception");
+        } catch (CamelExchangeException e) {
+            assertEquals("Correlation key has been closed. Exchange[Message: C]", e.getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+
+        ap.stop();
+    }
+
 }