You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bo...@apache.org on 2011/09/21 21:45:45 UTC
svn commit: r1173810 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/model/
camel-core/src/main/java/org/apache/camel/processor/aggregate/
camel-core/src/test/java/org/apache/camel/processor/aggregator/
components/camel-spring/src/test/jav...
Author: boday
Date: Wed Sep 21 19:45:44 2011
New Revision: 1173810
URL: http://svn.apache.org/viewvc?rev=1173810&view=rev
Log:
CAMEL-4097 updated Aggregator to support forcing completion of groups when context is stopped
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=1173810&r1=1173809&r2=1173810&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Wed Sep 21 19:45:44 2011
@@ -93,6 +93,8 @@ public class AggregateDefinition extends
private Integer closeCorrelationKeyOnCompletion;
@XmlAttribute
private Boolean discardOnCompletionTimeout;
+ @XmlAttribute
+ private Boolean forceCompletionOnStop;
public AggregateDefinition() {
}
@@ -209,6 +211,9 @@ public class AggregateDefinition extends
if (getDiscardOnCompletionTimeout() != null) {
answer.setDiscardOnCompletionTimeout(isDiscardOnCompletionTimeout());
}
+ if (getForceCompletionOnStop() != null) {
+ answer.setForceCompletionOnStop(getForceCompletionOnStop());
+ }
return answer;
}
@@ -648,6 +653,29 @@ public class AggregateDefinition extends
return this;
}
+ /**
+ * Sets the force completion on stop flag, which considers the current group as complete
+ * and sends out the aggregated exchange when the stop event is executed
+ *
+ * @return builder
+ */
+ public AggregateDefinition forceCompletionOnStop() {
+ setForceCompletionOnStop(true);
+ return this;
+ }
+
+ public Boolean getForceCompletionOnStop() {
+ return forceCompletionOnStop;
+ }
+
+ public boolean isForceCompletionOnStop() {
+ return forceCompletionOnStop != null && forceCompletionOnStop;
+ }
+
+ public void setForceCompletionOnStop(Boolean forceCompletionOnStop) {
+ this.forceCompletionOnStop = forceCompletionOnStop;
+ }
+
/**
* Sending the aggregated output in parallel
*
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1173810&r1=1173809&r2=1173810&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Sep 21 19:45:44 2011
@@ -115,6 +115,7 @@ public class AggregateProcessor extends
private boolean completionFromBatchConsumer;
private AtomicInteger batchConsumerCounter = new AtomicInteger();
private boolean discardOnCompletionTimeout;
+ private boolean forceCompletionOnStop;
private ProducerTemplate deadLetterProducerTemplate;
@@ -567,6 +568,10 @@ public class AggregateProcessor extends
this.discardOnCompletionTimeout = discardOnCompletionTimeout;
}
+ public void setForceCompletionOnStop(boolean forceCompletionOnStop) {
+ this.forceCompletionOnStop = forceCompletionOnStop;
+ }
+
/**
* On completion task which keeps the booking of the in progress up to date
*/
@@ -859,6 +864,16 @@ public class AggregateProcessor extends
@Override
protected void doStop() throws Exception {
+
+ if (forceCompletionOnStop) {
+ forceCompletionOfAllGroups();
+
+ while (inProgressCompleteExchanges.size() > 0) {
+ LOG.trace("waiting for {} in progress exchanges to complete", inProgressCompleteExchanges.size());
+ Thread.sleep(100);
+ }
+ }
+
if (recoverService != null) {
camelContext.getExecutorServiceManager().shutdownNow(recoverService);
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java?rev=1173810&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java Wed Sep 21 19:45:44 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+
+/**
+ * @version
+ */
+public class AggregateForceCompletionOnStopTest extends ContextTestSupport {
+
+ MyCompletionProcessor myCompletionProcessor = new MyCompletionProcessor();
+
+ public void testForceCompletionTrue() throws Exception {
+ myCompletionProcessor.reset();
+ context.getShutdownStrategy().setShutdownNowOnTimeout(true);
+ context.getShutdownStrategy().setTimeout(5);
+
+ template.sendBodyAndHeader("direct:forceCompletionTrue", "test1", "id", "1");
+ template.sendBodyAndHeader("direct:forceCompletionTrue", "test2", "id", "2");
+ template.sendBodyAndHeader("direct:forceCompletionTrue", "test3", "id", "1");
+ template.sendBodyAndHeader("direct:forceCompletionTrue", "test4", "id", "2");
+ assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
+ context.stop();
+ assertEquals("aggregation should have completed", 2, myCompletionProcessor.getAggregationCount());
+ }
+
+ public void testForceCompletionFalse() throws Exception {
+ myCompletionProcessor.reset();
+ context.getShutdownStrategy().setShutdownNowOnTimeout(true);
+ context.getShutdownStrategy().setTimeout(5);
+
+ template.sendBodyAndHeader("direct:forceCompletionFalse", "test1", "id", "1");
+ template.sendBodyAndHeader("direct:forceCompletionFalse", "test2", "id", "2");
+ template.sendBodyAndHeader("direct:forceCompletionFalse", "test3", "id", "1");
+ template.sendBodyAndHeader("direct:forceCompletionFalse", "test4", "id", "2");
+ assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
+ context.stop();
+ assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+
+ from("direct:forceCompletionTrue")
+ .aggregate(header("id"), new BodyInAggregatingStrategy()).forceCompletionOnStop().completionSize(10)
+ .delay(100)
+ .process(myCompletionProcessor);
+
+ from("direct:forceCompletionFalse")
+ .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(10)
+ .delay(100)
+ .process(myCompletionProcessor);
+ }
+ };
+ }
+}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java?rev=1173810&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java Wed Sep 21 19:45:44 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+public class MyCompletionProcessor implements Processor {
+ private static int aggregationCount;
+
+ public int getAggregationCount() {
+ return aggregationCount;
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ aggregationCount++;
+ }
+
+ public void reset() {
+ aggregationCount = 0;
+ }
+}
Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.java?rev=1173810&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.java (added)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.java Wed Sep 21 19:45:44 2011
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor.aggregator;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.aggregator.AggregateForceCompletionOnStopTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+/**
+ * @version
+ */
+public class SpringAggregateForceCompletionOnStopTest extends AggregateForceCompletionOnStopTest {
+
+ protected CamelContext createCamelContext() throws Exception {
+ return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml");
+ }
+
+}
\ No newline at end of file
Added: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml?rev=1173810&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml (added)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml Wed Sep 21 19:45:44 2011
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+ ">
+
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+ <route>
+ <from uri="direct:forceCompletionTrue"/>
+ <aggregate strategyRef="aggregatorStrategy" forceCompletionOnStop="true" completionSize="10">
+ <correlationExpression><header>id</header></correlationExpression>
+ <process ref="myCompletionProcessor"/>
+ </aggregate>
+ </route>
+
+ <route>
+ <from uri="direct:forceCompletionFalse"/>
+ <aggregate strategyRef="aggregatorStrategy" completionSize="10">
+ <correlationExpression><header>id</header></correlationExpression>
+ <process ref="myCompletionProcessor"/>
+ </aggregate>
+ </route>
+
+ </camelContext>
+
+ <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/>
+ <bean id="myCompletionProcessor" class="org.apache.camel.processor.aggregator.MyCompletionProcessor"/>
+
+</beans>