You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/19 11:33:56 UTC
svn commit: r935492 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/model/
camel-core/src/main/java/org/apache/camel/processor/aggregate/
camel-core/src/main/java/org/apache/camel/spi/
camel-core/src/test/java/org/apache/camel/processor/ag...
Author: davsclaus
Date: Mon Apr 19 09:33:55 2010
New Revision: 935492
URL: http://svn.apache.org/viewvc?rev=935492&view=rev
Log:
CAMEL-2656: Added completionInterval to Aggregator.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionIntervalTest.java
- copied, changed from r935469, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.java
- copied, changed from r935469, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.xml
- copied, changed from r935469, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=935492&r1=935491&r2=935492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Mon Apr 19 09:33:55 2010
@@ -77,6 +77,8 @@ public class AggregateDefinition extends
@XmlAttribute
private Integer completionSize;
@XmlAttribute
+ private Long completionInterval;
+ @XmlAttribute
private Long completionTimeout;
@XmlAttribute
private Boolean completionFromBatchConsumer;
@@ -180,6 +182,9 @@ public class AggregateDefinition extends
if (getCompletionTimeout() != null) {
answer.setCompletionTimeout(getCompletionTimeout());
}
+ if (getCompletionInterval() != null) {
+ answer.setCompletionInterval(getCompletionInterval());
+ }
if (getCompletionSizeExpression() != null) {
Expression expression = getCompletionSizeExpression().createExpression(routeContext);
answer.setCompletionSizeExpression(expression);
@@ -258,6 +263,14 @@ public class AggregateDefinition extends
this.completionSize = completionSize;
}
+ public Long getCompletionInterval() {
+ return completionInterval;
+ }
+
+ public void setCompletionInterval(Long completionInterval) {
+ this.completionInterval = completionInterval;
+ }
+
public Long getCompletionTimeout() {
return completionTimeout;
}
@@ -454,6 +467,18 @@ public class AggregateDefinition extends
}
/**
+ * Sets the completion interval, which would cause the aggregate to consider the group as complete
+ * and send out the aggregated exchange.
+ *
+ * @param completionInterval the interval in millis
+ * @return the builder
+ */
+ public AggregateDefinition completionInterval(long completionInterval) {
+ setCompletionInterval(completionInterval);
+ return this;
+ }
+
+ /**
* Sets the completion timeout, which would cause the aggregate to consider the group as complete
* and send out the aggregated exchange.
*
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=935492&r1=935491&r2=935492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Mon Apr 19 09:33:55 2010
@@ -109,6 +109,7 @@ public class AggregateProcessor extends
private Predicate completionPredicate;
private long completionTimeout;
private Expression completionTimeoutExpression;
+ private long completionInterval;
private int completionSize;
private Expression completionSizeExpression;
private boolean completionFromBatchConsumer;
@@ -420,6 +421,14 @@ public class AggregateProcessor extends
this.completionTimeoutExpression = completionTimeoutExpression;
}
+ public long getCompletionInterval() {
+ return completionInterval;
+ }
+
+ public void setCompletionInterval(long completionInterval) {
+ this.completionInterval = completionInterval;
+ }
+
public int getCompletionSize() {
return completionSize;
}
@@ -568,6 +577,46 @@ public class AggregateProcessor extends
}
/**
+ * Background task that triggers completion based on interval.
+ */
+ private final class AggregationIntervalTask implements Runnable {
+
+ public void run() {
+ // only run if CamelContext has been fully started
+ if (!camelContext.getStatus().isStarted()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Completion interval task cannot start due CamelContext(" + camelContext.getName() + ") has not been started yet");
+ }
+ return;
+ }
+
+ LOG.trace("Starting completion interval task");
+
+ // trigger completion for all in the repository
+ try {
+ lock.lock();
+
+ Set<String> keys = aggregationRepository.getKeys();
+ for (String key : keys) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Completion interval triggered for correlation key: " + key);
+ }
+ Exchange exchange = aggregationRepository.get(camelContext, key);
+
+ // indicate it was completed by interval
+ exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval");
+
+ onCompletion(key, exchange, false);
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ LOG.trace("Completion interval task complete");
+ }
+ }
+
+ /**
* Background task that looks for aggregated exchanges to recover.
*/
private final class RecoverTask implements Runnable {
@@ -670,11 +719,11 @@ public class AggregateProcessor extends
@Override
protected void doStart() throws Exception {
- if (getCompletionTimeout() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null
+ if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null
&& !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null
&& getCompletionSizeExpression() == null) {
throw new IllegalStateException("At least one of the completions options"
- + " [completionTimeout, completionSize, completionPredicate, completionFromBatchConsumer] must be set");
+ + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set");
}
if (getCloseCorrelationKeyOnCompletion() != null) {
@@ -717,8 +766,19 @@ public class AggregateProcessor extends
}
}
+ if (getCompletionInterval() > 0 && getCompletionTimeout() > 0) {
+ throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both.");
+ }
+ if (getCompletionInterval() > 0) {
+ LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis.");
+ ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1);
+ // trigger completion based on interval
+ scheduler.scheduleAtFixedRate(new AggregationIntervalTask(), 1000L, getCompletionInterval(), TimeUnit.MILLISECONDS);
+ }
+
// start timeout service if its in use
if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) {
+ LOG.info("Using CompletionTimeout to trigger after " + getCompletionInterval() + " millis of inactivity.");
ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1);
// check for timed out aggregated messages once every second
timeoutMap = new AggregationTimeoutMap(scheduler, 1000L);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java?rev=935492&r1=935491&r2=935492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java Mon Apr 19 09:33:55 2010
@@ -16,7 +16,9 @@
*/
package org.apache.camel.processor.aggregate;
+import java.util.Collections;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.camel.CamelContext;
@@ -48,6 +50,11 @@ public class MemoryAggregationRepository
// noop
}
+ public Set<String> getKeys() {
+ // do not allow edits to the set
+ return Collections.unmodifiableSet(cache.keySet());
+ }
+
@Override
protected void doStart() throws Exception {
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java?rev=935492&r1=935491&r2=935492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java Mon Apr 19 09:33:55 2010
@@ -16,6 +16,8 @@
*/
package org.apache.camel.spi;
+import java.util.Set;
+
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
@@ -65,4 +67,11 @@ public interface AggregationRepository {
*/
void confirm(CamelContext camelContext, String exchangeId);
+ /**
+ * Gets the keys currently in the repository.
+ *
+ * @return the keys
+ */
+ Set<String> getKeys();
+
}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionIntervalTest.java (from r935469, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionIntervalTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionIntervalTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java&r1=935469&r2=935492&rev=935492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionIntervalTest.java Mon Apr 19 09:33:55 2010
@@ -22,19 +22,20 @@ import org.apache.camel.component.mock.M
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
/**
- * Unit test to verify that aggregate by timeout only also works.
- *
+ * Unit test to verify that aggregate by interval only also works.
+ *
* @version $Revision$
*/
-public class AggregateTimeoutOnlyTest extends ContextTestSupport {
+public class AggregateCompletionIntervalTest extends ContextTestSupport {
- public void testAggregateTimeoutOnly() throws Exception {
+ public void testAggregateInterval() throws Exception {
MockEndpoint result = getMockEndpoint("mock:result");
// by default the use latest aggregation strategy is used so we get message 9
result.expectedBodiesReceived("Message 9");
- // should take 3 seconds to complete this one
- result.setMinimumResultWaitTime(2500);
+ // ensure messages are send after the 1s
+ Thread.sleep(2000);
+
for (int i = 0; i < 10; i++) {
template.sendBodyAndHeader("direct:start", "Message " + i, "id", "1");
}
@@ -49,11 +50,12 @@ public class AggregateTimeoutOnlyTest ex
public void configure() throws Exception {
// START SNIPPET: e1
from("direct:start")
- // aggregate every 3th second and disable the batch size so we set it to 0
- .aggregate(header("id"), new UseLatestAggregationStrategy()).completionTimeout(3000).completionSize(0)
+ .aggregate(header("id"), new UseLatestAggregationStrategy())
+ // trigger completion every 5th second
+ .completionInterval(5000)
.to("mock:result");
// END SNIPPET: e1
}
};
}
-}
+}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=935492&r1=935491&r2=935492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Mon Apr 19 09:33:55 2010
@@ -236,6 +236,50 @@ public class AggregateProcessorTest exte
ap.stop();
}
+ public void testAggregateCompletionInterval() throws Exception {
+ // camel context must be started
+ context.start();
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("A+B+C", "D");
+ mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "interval");
+
+ Processor done = new SendProcessor(context.getEndpoint("mock:result"));
+ Expression corr = header("id");
+ AggregationStrategy as = new BodyInAggregatingStrategy();
+
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+ ap.setCompletionInterval(3000);
+ ap.start();
+
+ Exchange e1 = new DefaultExchange(context);
+ e1.getIn().setBody("A");
+ e1.getIn().setHeader("id", 123);
+
+ Exchange e2 = new DefaultExchange(context);
+ e2.getIn().setBody("B");
+ e2.getIn().setHeader("id", 123);
+
+ Exchange e3 = new DefaultExchange(context);
+ e3.getIn().setBody("C");
+ e3.getIn().setHeader("id", 123);
+
+ Exchange e4 = new DefaultExchange(context);
+ e4.getIn().setBody("D");
+ e4.getIn().setHeader("id", 123);
+
+ ap.process(e1);
+ ap.process(e2);
+ ap.process(e3);
+
+ Thread.sleep(5000);
+ ap.process(e4);
+
+ assertMockEndpointsSatisfied();
+
+ ap.stop();
+ }
+
public void testAggregateIgnoreInvalidCorrelationKey() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("A+C+END");
Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.java (from r935469, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java&r1=935469&r2=935492&rev=935492&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.java Mon Apr 19 09:33:55 2010
@@ -17,17 +17,17 @@
package org.apache.camel.spring.processor.aggregator;
import org.apache.camel.CamelContext;
-import org.apache.camel.processor.aggregator.AggregateSimpleTimeoutTest;
+import org.apache.camel.processor.aggregator.AggregateCompletionIntervalTest;
import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
/**
* @version $Revision$
*/
-public class SpringAggregateSimpleTimeoutTest extends AggregateSimpleTimeoutTest {
+public class SpringAggregateCompletionIntervalTest extends AggregateCompletionIntervalTest {
protected CamelContext createCamelContext() throws Exception {
- return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml");
+ return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.xml");
}
-}
+}
\ No newline at end of file
Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.xml (from r935469, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml&r1=935469&r2=935492&rev=935492&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.xml Mon Apr 19 09:33:55 2010
@@ -26,16 +26,17 @@
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
- <aggregate strategyRef="aggregatorStrategy" completionTimeout="3000">
+ <!-- trigger completion of all current aggregated exchanges every 5th second -->
+ <aggregate strategyRef="aggregatorStrategy" completionInterval="5000">
<correlationExpression>
<simple>header.id</simple>
</correlationExpression>
- <to uri="mock:aggregated"/>
+ <to uri="mock:result"/>
</aggregate>
</route>
</camelContext>
- <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/>
+ <bean id="aggregatorStrategy" class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy"/>
<!-- END SNIPPET: e1 -->
</beans>