You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/08 11:20:21 UTC
svn commit: r931845 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/processor/aggregate/
camel-core/src/test/java/org/apache/camel/processor/
camel-core/src/test/java/org/apache/camel/processor/aggregator/
components/camel-spring/src/test/...
Author: davsclaus
Date: Thu Apr 8 09:20:19 2010
New Revision: 931845
URL: http://svn.apache.org/viewvc?rev=931845&view=rev
Log:
CAMEL-2568: Polished aggregator and fixed a timeout in some rare cases could send the last and not aggregated exchange.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java
- copied, changed from r931784, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java (contents, props changed)
- copied, changed from r931784, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorTest.java
Removed:
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=931845&r1=931844&r2=931845&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Thu Apr 8 09:20:19 2010
@@ -81,7 +81,8 @@ public class AggregateProcessor extends
private final Expression correlationExpression;
private final ExecutorService executorService;
private ScheduledExecutorService recoverService;
- private TimeoutMap<Object, Exchange> timeoutMap;
+ // store correlation key -> exchange id in timeout map
+ private TimeoutMap<Object, String> timeoutMap;
private ExceptionHandler exceptionHandler = new LoggingExceptionHandler(getClass());
private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository();
private Map<Object, Object> closedCorrelationKeys;
@@ -203,7 +204,7 @@ public class AggregateProcessor extends
}
// check if we are complete
- boolean complete = false;
+ String complete = null;
if (isEagerCheckCompletion()) {
// put the current aggregated size on the exchange so its avail during completion check
newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
@@ -224,14 +225,13 @@ public class AggregateProcessor extends
}
// only need to update aggregation repository if we are not complete
- if (!complete) {
+ if (complete == null) {
if (LOG.isTraceEnabled()) {
LOG.trace("In progress aggregated exchange: " + answer + " with correlation key:" + key);
}
aggregationRepository.add(exchange.getContext(), key, answer);
- }
-
- if (complete) {
+ } else {
+ answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
onCompletion(key, answer, false);
}
@@ -242,12 +242,18 @@ public class AggregateProcessor extends
return answer;
}
- protected boolean isCompleted(Object key, Exchange exchange) {
+ /**
+ * Tests whether the given exchange is complete or not
+ *
+ * @param key the correlation key
+ * @param exchange the incoming exchange
+ * @return <tt>null</tt> if not completed, otherwise a String with the type that triggered the completion
+ */
+ protected String isCompleted(Object key, Exchange exchange) {
if (getCompletionPredicate() != null) {
boolean answer = getCompletionPredicate().matches(exchange);
if (answer) {
- exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "predicate");
- return true;
+ return "predicate";
}
}
@@ -256,16 +262,14 @@ public class AggregateProcessor extends
if (value != null && value > 0) {
int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class);
if (size >= value) {
- exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "size");
- return true;
+ return "size";
}
}
}
if (getCompletionSize() > 0) {
int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class);
if (size >= getCompletionSize()) {
- exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "size");
- return true;
+ return "size";
}
}
@@ -279,7 +283,7 @@ public class AggregateProcessor extends
LOG.trace("Updating correlation key " + key + " to timeout after "
+ value + " ms. as exchange received: " + exchange);
}
- timeoutMap.put(key, exchange, value);
+ timeoutMap.put(key, exchange.getExchangeId(), value);
timeoutSet = true;
}
}
@@ -289,7 +293,7 @@ public class AggregateProcessor extends
LOG.trace("Updating correlation key " + key + " to timeout after "
+ getCompletionTimeout() + " ms. as exchange received: " + exchange);
}
- timeoutMap.put(key, exchange, getCompletionTimeout());
+ timeoutMap.put(key, exchange.getExchangeId(), getCompletionTimeout());
}
if (isCompletionFromBatchConsumer()) {
@@ -298,12 +302,12 @@ public class AggregateProcessor extends
if (size > 0 && batchConsumerCounter.intValue() >= size) {
// batch consumer is complete then reset the counter
batchConsumerCounter.set(0);
- exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "consumer");
- return true;
+ return "consumer";
}
}
- return false;
+ // not complete
+ return null;
}
protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) {
@@ -498,23 +502,27 @@ public class AggregateProcessor extends
/**
* Background task that looks for aggregated exchanges which is triggered by completion timeouts.
*/
- private final class AggregationTimeoutMap extends DefaultTimeoutMap<Object, Exchange> {
+ private final class AggregationTimeoutMap extends DefaultTimeoutMap<Object, String> {
private AggregationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
super(executor, requestMapPollTimeMillis);
}
@Override
- public void onEviction(Object key, Exchange exchange) {
+ public void onEviction(Object key, String exchangeId) {
if (log.isDebugEnabled()) {
log.debug("Completion timeout triggered for correlation key: " + key);
}
- exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
+ // get the aggregated exchange
+ Exchange answer = aggregationRepository.get(camelContext, key);
+
+ // indicate it was completed by timeout
+ answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
try {
lock.lock();
- onCompletion(key, exchange, true);
+ onCompletion(key, answer, true);
} finally {
lock.unlock();
}
@@ -580,12 +588,7 @@ public class AggregateProcessor extends
exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
// resubmit the recovered exchange
- try {
- lock.lock();
- onSubmitCompletion(key, exchange);
- } finally {
- lock.unlock();
- }
+ onSubmitCompletion(key, exchange);
}
}
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java?rev=931845&r1=931844&r2=931845&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java Thu Apr 8 09:20:19 2010
@@ -23,12 +23,14 @@ import org.apache.camel.processor.aggreg
public class BodyInAggregatingStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
- if (oldExchange != null) {
- String oldBody = oldExchange.getIn().getBody(String.class);
- String newBody = newExchange.getIn().getBody(String.class);
- newExchange.getIn().setBody(oldBody + "+" + newBody);
+ if (oldExchange == null) {
+ return newExchange;
}
- return newExchange;
+
+ String oldBody = oldExchange.getIn().getBody(String.class);
+ String newBody = newExchange.getIn().getBody(String.class);
+ oldExchange.getIn().setBody(oldBody + "+" + newBody);
+ return oldExchange;
}
/**
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java?rev=931845&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java Thu Apr 8 09:20:19 2010
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test for aggregate grouped exchanges completed by size
+ */
+public class AggregateGroupedExchangeSizePredicateTest extends ContextTestSupport {
+
+ public void testGroupedSize() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+
+ // we expect 2 messages since we group by size (3 and 2)
+ result.expectedMessageCount(2);
+
+ template.sendBodyAndHeader("direct:start", "100", "groupSize", 3);
+ template.sendBodyAndHeader("direct:start", "150", "groupSize", 3);
+ template.sendBodyAndHeader("direct:start", "130", "groupSize", 3);
+ template.sendBodyAndHeader("direct:start", "200", "groupSize", 2);
+ template.sendBodyAndHeader("direct:start", "190", "groupSize", 2);
+
+ assertMockEndpointsSatisfied();
+
+ Exchange out = result.getExchanges().get(0);
+ List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
+ assertEquals(3, grouped.size());
+ assertEquals("100", grouped.get(0).getIn().getBody(String.class));
+ assertEquals("150", grouped.get(1).getIn().getBody(String.class));
+ assertEquals("130", grouped.get(2).getIn().getBody(String.class));
+
+ out = result.getExchanges().get(1);
+ grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
+ assertEquals(2, grouped.size());
+
+ assertEquals("200", grouped.get(0).getIn().getBody(String.class));
+ assertEquals("190", grouped.get(1).getIn().getBody(String.class));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:start")
+ // must use eagerCheckCompletion so we can check the groupSize header on the incoming exchange
+ .aggregate().constant(true).groupExchanges().eagerCheckCompletion().completionSize(header("groupSize"))
+ .to("mock:result")
+ .end();
+ }
+ };
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizePredicateTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java (from r931784, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java&r1=931784&r2=931845&rev=931845&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeSizeTest.java Thu Apr 8 09:20:19 2010
@@ -24,55 +24,50 @@ import org.apache.camel.builder.RouteBui
import org.apache.camel.component.mock.MockEndpoint;
/**
- * Unit test for aggregate grouped exchanges.
+ * Unit test for aggregate grouped exchanges completed by size
*/
-public class AggregateGroupedExchangeTest extends ContextTestSupport {
+public class AggregateGroupedExchangeSizeTest extends ContextTestSupport {
- public void testGrouped() throws Exception {
- // START SNIPPET: e2
+ public void testGroupedSize() throws Exception {
MockEndpoint result = getMockEndpoint("mock:result");
- // we expect 1 messages since we group all we get in using the same correlation key
- result.expectedMessageCount(1);
+ // we expect 2 messages since we group by size (3 and 3)
+ result.expectedMessageCount(2);
- // then we sent all the message at once
template.sendBody("direct:start", "100");
template.sendBody("direct:start", "150");
template.sendBody("direct:start", "130");
template.sendBody("direct:start", "200");
template.sendBody("direct:start", "190");
+ template.sendBody("direct:start", "120");
assertMockEndpointsSatisfied();
Exchange out = result.getExchanges().get(0);
List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
-
- assertEquals(5, grouped.size());
-
+ assertEquals(3, grouped.size());
assertEquals("100", grouped.get(0).getIn().getBody(String.class));
assertEquals("150", grouped.get(1).getIn().getBody(String.class));
assertEquals("130", grouped.get(2).getIn().getBody(String.class));
- assertEquals("200", grouped.get(3).getIn().getBody(String.class));
- assertEquals("190", grouped.get(4).getIn().getBody(String.class));
- // END SNIPPET: e2
+
+ out = result.getExchanges().get(1);
+ grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
+ assertEquals(3, grouped.size());
+
+ assertEquals("200", grouped.get(0).getIn().getBody(String.class));
+ assertEquals("190", grouped.get(1).getIn().getBody(String.class));
+ assertEquals("120", grouped.get(2).getIn().getBody(String.class));
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- // START SNIPPET: e1
- // our route is aggregating from the direct queue and sending the response to the mock
from("direct:start")
- // aggregate all using same expression
- .aggregate().constant(true)
- // wait for 0.5 seconds to aggregate
- .completionTimeout(500L)
- // group the exchanges so we get one single exchange containing all the others
- .groupExchanges()
- .to("mock:result");
- // END SNIPPET: e1
+ .aggregate().constant(true).groupExchanges().completionSize(3)
+ .to("mock:result")
+ .end();
}
};
}
-}
+}
\ No newline at end of file
Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java (from r931784, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorTest.java&r1=931784&r2=931845&rev=931845&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java Thu Apr 8 09:20:19 2010
@@ -14,10 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.spring.processor;
+package org.apache.camel.spring.processor.aggregator;
import org.apache.camel.CamelContext;
-import org.apache.camel.processor.AggregatorTest;
+import org.apache.camel.processor.aggregator.AggregatorTest;
import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
/**
Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date