You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/11/15 18:50:18 UTC
svn commit: r595374 - in /activemq/camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/
main/java/org/apache/camel/processor/aggregate/
test/java/org/apache/camel/processor/
Author: jstrachan
Date: Thu Nov 15 09:50:10 2007
New Revision: 595374
URL: http://svn.apache.org/viewvc?rev=595374&view=rev
Log:
applied patch for https://issues.apache.org/activemq/browse/CAMEL-207
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=595374&r1=595373&r2=595374&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Thu Nov 15 09:50:10 2007
@@ -63,6 +63,7 @@
public void run() {
LOG.debug("Starting thread for " + this);
while (isRunAllowed()) {
+ collection.clear();
try {
processBatch();
} catch (Exception e) {
@@ -116,7 +117,7 @@
protected synchronized void processBatch() throws Exception {
long start = System.currentTimeMillis();
long end = start + batchTimeout;
- for (int i = 0; isBatchCompleted(i); i++) {
+ for (int i = 0; !isBatchCompleted(i); i++) {
long timeout = end - System.currentTimeMillis();
Exchange exchange = consumer.receive(timeout);
@@ -135,7 +136,6 @@
Iterator<Exchange> iter = collection.iterator();
while (iter.hasNext()) {
Exchange exchange = iter.next();
- iter.remove();
processExchange(exchange);
}
}
@@ -144,7 +144,7 @@
* A strategy method to decide if the batch is completed the resulting exchanges should be sent
*/
protected boolean isBatchCompleted(int index) {
- return index < batchSize;
+ return index >= batchSize;
}
/**
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java?rev=595374&r1=595373&r2=595374&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java Thu Nov 15 09:50:10 2007
@@ -55,8 +55,8 @@
// the strategy may just update the old exchange and return it
if (newExchange != oldExchange) {
map.put(correlationKey, newExchange);
- onAggregation(correlationKey, newExchange);
}
+ onAggregation(correlationKey, newExchange);
return true;
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java?rev=595374&r1=595373&r2=595374&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java Thu Nov 15 09:50:10 2007
@@ -62,5 +62,6 @@
@Override
public void clear() {
collection.clear();
+ super.clear();
}
}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java?rev=595374&r1=595373&r2=595374&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java Thu Nov 15 09:50:10 2007
@@ -17,8 +17,10 @@
package org.apache.camel.processor;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
/**
* @version $Revision: 1.1 $
@@ -40,13 +42,43 @@
resultEndpoint.assertIsSatisfied();
}
+ public void testPredicate() throws Exception {
+ MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+
+ resultEndpoint.expectedMessageCount(messageCount / 5);
+ // lets send a large batch of messages
+ for (int i = 1; i <= messageCount; i++) {
+ String body = "message:" + i;
+ template.sendBodyAndHeader("direct:predicate", body, "cheese", 123);
+ }
+
+ resultEndpoint.assertIsSatisfied();
+ }
+
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
// START SNIPPET: ex
from("direct:start").aggregator(header("cheese")).to("mock:result");
+
+ from("direct:predicate").aggregator(header("cheese"), new UseLatestAggregationStrategy() {
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ Exchange result = super.aggregate(oldExchange, newExchange);
+ Integer old = (Integer) oldExchange.getProperty("aggregated");
+ if (old == null) {
+ old = 1;
+ }
+ result.setProperty("aggregated", old + 1);
+ return result;
+ }
+
+ }).
+ completedPredicate(header("aggregated").
+ isEqualTo(5)).to("mock:result");
// END SNIPPET: ex
}
};
}
-}
\ No newline at end of file
+}