You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bo...@apache.org on 2011/09/21 21:45:45 UTC

svn commit: r1173810 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/aggregate/ camel-core/src/test/java/org/apache/camel/processor/aggregator/ components/camel-spring/src/test/jav...

Author: boday
Date: Wed Sep 21 19:45:44 2011
New Revision: 1173810

URL: http://svn.apache.org/viewvc?rev=1173810&view=rev
Log:
CAMEL-4097 updated Aggregator to support forcing completion of groups when context is stopped

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=1173810&r1=1173809&r2=1173810&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Wed Sep 21 19:45:44 2011
@@ -93,6 +93,8 @@ public class AggregateDefinition extends
     private Integer closeCorrelationKeyOnCompletion;
     @XmlAttribute
     private Boolean discardOnCompletionTimeout;
+    @XmlAttribute
+    private Boolean forceCompletionOnStop;
 
     public AggregateDefinition() {
     }
@@ -209,6 +211,9 @@ public class AggregateDefinition extends
         if (getDiscardOnCompletionTimeout() != null) {
             answer.setDiscardOnCompletionTimeout(isDiscardOnCompletionTimeout());
         }
+        if (getForceCompletionOnStop() != null) {
+            answer.setForceCompletionOnStop(getForceCompletionOnStop());
+        }
 
         return answer;
     }
@@ -648,6 +653,29 @@ public class AggregateDefinition extends
         return this;
     }
 
+     /**
+     * Sets the force completion on stop flag, which considers the current group as complete
+     * and sends out the aggregated exchange when the stop event is executed
+     *
+     * @return builder
+     */
+    public AggregateDefinition forceCompletionOnStop() {
+        setForceCompletionOnStop(true);
+        return this;
+    }
+
+    public Boolean getForceCompletionOnStop() {
+        return forceCompletionOnStop;
+    }
+
+    public boolean isForceCompletionOnStop() {
+        return forceCompletionOnStop != null && forceCompletionOnStop;
+    }
+
+    public void setForceCompletionOnStop(Boolean forceCompletionOnStop) {
+        this.forceCompletionOnStop = forceCompletionOnStop;
+    }
+
     /**
      * Sending the aggregated output in parallel
      *

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1173810&r1=1173809&r2=1173810&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Sep 21 19:45:44 2011
@@ -115,6 +115,7 @@ public class AggregateProcessor extends 
     private boolean completionFromBatchConsumer;
     private AtomicInteger batchConsumerCounter = new AtomicInteger();
     private boolean discardOnCompletionTimeout;
+    private boolean forceCompletionOnStop;
 
     private ProducerTemplate deadLetterProducerTemplate;
 
@@ -567,6 +568,10 @@ public class AggregateProcessor extends 
         this.discardOnCompletionTimeout = discardOnCompletionTimeout;
     }
 
+    public void setForceCompletionOnStop(boolean forceCompletionOnStop) {
+        this.forceCompletionOnStop = forceCompletionOnStop;
+    }
+
     /**
      * On completion task which keeps the booking of the in progress up to date
      */
@@ -859,6 +864,16 @@ public class AggregateProcessor extends 
 
     @Override
     protected void doStop() throws Exception {
+
+        if (forceCompletionOnStop) {
+            forceCompletionOfAllGroups();
+
+            while (inProgressCompleteExchanges.size() > 0) {
+                LOG.trace("waiting for {} in progress exchanges to complete", inProgressCompleteExchanges.size());
+                Thread.sleep(100);
+            }
+        }
+
         if (recoverService != null) {
             camelContext.getExecutorServiceManager().shutdownNow(recoverService);
         }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java?rev=1173810&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopTest.java Wed Sep 21 19:45:44 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+
+/**
+ * @version 
+ */
+public class AggregateForceCompletionOnStopTest extends ContextTestSupport {
+
+    MyCompletionProcessor myCompletionProcessor = new MyCompletionProcessor();
+
+    public void testForceCompletionTrue() throws Exception {
+        myCompletionProcessor.reset();
+        context.getShutdownStrategy().setShutdownNowOnTimeout(true);
+        context.getShutdownStrategy().setTimeout(5);
+
+        template.sendBodyAndHeader("direct:forceCompletionTrue", "test1", "id", "1");
+        template.sendBodyAndHeader("direct:forceCompletionTrue", "test2", "id", "2");
+        template.sendBodyAndHeader("direct:forceCompletionTrue", "test3", "id", "1");
+        template.sendBodyAndHeader("direct:forceCompletionTrue", "test4", "id", "2");
+        assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
+        context.stop();
+        assertEquals("aggregation should have completed", 2, myCompletionProcessor.getAggregationCount());
+    }
+
+    public void testForceCompletionFalse() throws Exception {
+        myCompletionProcessor.reset();
+        context.getShutdownStrategy().setShutdownNowOnTimeout(true);
+        context.getShutdownStrategy().setTimeout(5);
+
+        template.sendBodyAndHeader("direct:forceCompletionFalse", "test1", "id", "1");
+        template.sendBodyAndHeader("direct:forceCompletionFalse", "test2", "id", "2");
+        template.sendBodyAndHeader("direct:forceCompletionFalse", "test3", "id", "1");
+        template.sendBodyAndHeader("direct:forceCompletionFalse", "test4", "id", "2");
+        assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
+        context.stop();
+        assertEquals("aggregation should not have completed yet", 0, myCompletionProcessor.getAggregationCount());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+
+                from("direct:forceCompletionTrue")
+                    .aggregate(header("id"), new BodyInAggregatingStrategy()).forceCompletionOnStop().completionSize(10)
+                    .delay(100)
+                    .process(myCompletionProcessor);
+
+                from("direct:forceCompletionFalse")
+                    .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(10)
+                    .delay(100)
+                    .process(myCompletionProcessor);
+            }
+        };
+    }
+}

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java?rev=1173810&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyCompletionProcessor.java Wed Sep 21 19:45:44 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+public class MyCompletionProcessor implements Processor {
+    private static int aggregationCount;
+
+    public int getAggregationCount() {
+        return aggregationCount;
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        aggregationCount++;
+    }
+
+    public void reset() {
+        aggregationCount = 0;
+    }
+}

Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.java?rev=1173810&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.java (added)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.java Wed Sep 21 19:45:44 2011
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor.aggregator;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.aggregator.AggregateForceCompletionOnStopTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+/**
+ * @version 
+ */
+public class SpringAggregateForceCompletionOnStopTest extends AggregateForceCompletionOnStopTest {
+
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml");
+    }
+
+}
\ No newline at end of file

Added: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml?rev=1173810&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml (added)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateForceCompletionOnStopTest.xml Wed Sep 21 19:45:44 2011
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+
+    <camelContext xmlns="http://camel.apache.org/schema/spring">
+        <route>
+            <from uri="direct:forceCompletionTrue"/>
+            <aggregate strategyRef="aggregatorStrategy" forceCompletionOnStop="true" completionSize="10">
+                <correlationExpression><header>id</header></correlationExpression>
+                <process ref="myCompletionProcessor"/>
+            </aggregate>
+        </route>
+
+        <route>
+            <from uri="direct:forceCompletionFalse"/>
+            <aggregate strategyRef="aggregatorStrategy" completionSize="10">
+                <correlationExpression><header>id</header></correlationExpression>
+                <process ref="myCompletionProcessor"/>
+            </aggregate>
+        </route>
+
+    </camelContext>
+
+    <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/>
+    <bean id="myCompletionProcessor" class="org.apache.camel.processor.aggregator.MyCompletionProcessor"/>
+
+</beans>