You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/11/15 18:50:18 UTC

svn commit: r595374 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/processor/ main/java/org/apache/camel/processor/aggregate/ test/java/org/apache/camel/processor/

Author: jstrachan
Date: Thu Nov 15 09:50:10 2007
New Revision: 595374

URL: http://svn.apache.org/viewvc?rev=595374&view=rev
Log:
applied patch for https://issues.apache.org/activemq/browse/CAMEL-207

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=595374&r1=595373&r2=595374&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Thu Nov 15 09:50:10 2007
@@ -63,6 +63,7 @@
     public void run() {
         LOG.debug("Starting thread for " + this);
         while (isRunAllowed()) {
+            collection.clear();
             try {
                 processBatch();
             } catch (Exception e) {
@@ -116,7 +117,7 @@
     protected synchronized void processBatch() throws Exception {
         long start = System.currentTimeMillis();
         long end = start + batchTimeout;
-        for (int i = 0; isBatchCompleted(i); i++) {
+        for (int i = 0; !isBatchCompleted(i); i++) {
             long timeout = end - System.currentTimeMillis();
 
             Exchange exchange = consumer.receive(timeout);
@@ -135,7 +136,6 @@
         Iterator<Exchange> iter = collection.iterator();
         while (iter.hasNext()) {
             Exchange exchange = iter.next();
-            iter.remove();
             processExchange(exchange);
         }
     }
@@ -144,7 +144,7 @@
      * A strategy method to decide if the batch is completed the resulting exchanges should be sent
      */
     protected boolean isBatchCompleted(int index) {
-        return index < batchSize;
+        return index >= batchSize;
     }
 
     /**

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java?rev=595374&r1=595373&r2=595374&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java Thu Nov 15 09:50:10 2007
@@ -55,8 +55,8 @@
         // the strategy may just update the old exchange and return it
         if (newExchange != oldExchange) {
             map.put(correlationKey, newExchange);
-            onAggregation(correlationKey, newExchange);
         }
+        onAggregation(correlationKey, newExchange);
         return true;
     }
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java?rev=595374&r1=595373&r2=595374&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java Thu Nov 15 09:50:10 2007
@@ -62,5 +62,6 @@
     @Override
     public void clear() {
         collection.clear();
+        super.clear();
     }
 }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java?rev=595374&r1=595373&r2=595374&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java Thu Nov 15 09:50:10 2007
@@ -17,8 +17,10 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 
 /**
  * @version $Revision: 1.1 $
@@ -40,13 +42,43 @@
         resultEndpoint.assertIsSatisfied();
     }
 
+    public void testPredicate() throws Exception {
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+
+        resultEndpoint.expectedMessageCount(messageCount / 5);
+        // lets send a large batch of messages
+        for (int i = 1; i <= messageCount; i++) {
+            String body = "message:" + i;
+            template.sendBodyAndHeader("direct:predicate", body, "cheese", 123);
+        }
+
+        resultEndpoint.assertIsSatisfied();        
+    }
+    
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
                 // START SNIPPET: ex
                 from("direct:start").aggregator(header("cheese")).to("mock:result");
+                
+                from("direct:predicate").aggregator(header("cheese"), new UseLatestAggregationStrategy() {
+                
+                    @Override
+                    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+                        Exchange result = super.aggregate(oldExchange, newExchange);
+                        Integer old = (Integer) oldExchange.getProperty("aggregated");
+                        if (old == null) {
+                            old = 1;
+                        }
+                        result.setProperty("aggregated", old + 1);
+                        return result;
+                    }
+                
+                }).
+                    completedPredicate(header("aggregated").
+                    isEqualTo(5)).to("mock:result");
                 // END SNIPPET: ex
             }
         };
     }
-}
\ No newline at end of file
+}