You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/21 10:04:05 UTC

[3/3] camel git commit: CAMEL-7434: Aggregate now allows using a controller to control completion of groups from external source.

CAMEL-7434: Aggregate now allows using a controller to control completion of groups from external source.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4bb4be75
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4bb4be75
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4bb4be75

Branch: refs/heads/master
Commit: 4bb4be75181a89d8f5f12217188be3455d35571b
Parents: a664b08
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Mar 21 10:05:58 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Mar 21 10:05:58 2015 +0100

----------------------------------------------------------------------
 .../apache/camel/model/AggregateDefinition.java |  2 +-
 .../aggregate/AggregateController.java          |  6 +--
 .../processor/aggregate/AggregateProcessor.java | 21 ++--------
 .../aggregate/DefaultAggregateController.java   | 27 ++----------
 .../aggregator/AggregateControllerTest.java     | 18 +++++---
 .../aggregator/AggregateProcessorTest.java      | 22 +++++-----
 ...teProcessorTimeoutCompletionRestartTest.java |  6 +--
 .../SpringAggregateControllerTest.java          | 39 ++++++++++++++++++
 .../SpringAggregateControllerTest.xml           | 43 ++++++++++++++++++++
 9 files changed, 117 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 087bc74..c880c8a 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -187,7 +187,7 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
             shutdownThreadPool = true;
         }
 
-        AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), getId(), internal,
+        AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), internal,
                 correlation, strategy, threadPool, shutdownThreadPool);
 
         AggregationRepository repository = createAggregationRepository(routeContext);

http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
index dab9d6e..974ee9a 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
@@ -25,18 +25,16 @@ public interface AggregateController {
     /**
      * Callback when the aggregate processor is started.
      *
-     * @param id        the aggregator id
      * @param processor the aggregate processor
      */
-    void onStart(String id, AggregateProcessor processor);
+    void onStart(AggregateProcessor processor);
 
     /**
      * Callback when the aggregate processor is stopped.
      *
-     * @param id        the aggregator id
      * @param processor the aggregate processor
      */
-    void onStop(String id, AggregateProcessor processor);
+    void onStop(AggregateProcessor processor);
 
     /**
      * To force completing a specific group by its key.

http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 1a773ea..8e34284 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -47,7 +47,6 @@ import org.apache.camel.TimeoutMap;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.spi.ExceptionHandler;
-import org.apache.camel.spi.HasId;
 import org.apache.camel.spi.OptimisticLockingAggregationRepository;
 import org.apache.camel.spi.RecoverableAggregationRepository;
 import org.apache.camel.spi.ShutdownPrepared;
@@ -80,7 +79,7 @@ import org.slf4j.LoggerFactory;
  * and older prices are discarded). Another idea is to combine line item messages
  * together into a single invoice message.
  */
-public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared, HasId {
+public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared {
 
     public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker";
 
@@ -88,7 +87,6 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
 
     private final Lock lock = new ReentrantLock();
     private final CamelContext camelContext;
-    private final String id;
     private final Processor processor;
     private AggregationStrategy aggregationStrategy;
     private Expression correlationExpression;
@@ -134,24 +132,15 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
 
     private ProducerTemplate deadLetterProducerTemplate;
 
-    @Deprecated
     public AggregateProcessor(CamelContext camelContext, Processor processor,
                               Expression correlationExpression, AggregationStrategy aggregationStrategy,
                               ExecutorService executorService, boolean shutdownExecutorService) {
-        this(camelContext, "aggregate", processor, correlationExpression, aggregationStrategy, executorService, shutdownExecutorService);
-    }
-
-    public AggregateProcessor(CamelContext camelContext, String id, Processor processor,
-                              Expression correlationExpression, AggregationStrategy aggregationStrategy,
-                              ExecutorService executorService, boolean shutdownExecutorService) {
         ObjectHelper.notNull(camelContext, "camelContext");
-        ObjectHelper.notNull(id, "id");
         ObjectHelper.notNull(processor, "processor");
         ObjectHelper.notNull(correlationExpression, "correlationExpression");
         ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy");
         ObjectHelper.notNull(executorService, "executorService");
         this.camelContext = camelContext;
-        this.id = id;
         this.processor = processor;
         this.correlationExpression = correlationExpression;
         this.aggregationStrategy = aggregationStrategy;
@@ -160,10 +149,6 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass());
     }
 
-    public String getId() {
-        return id;
-    }
-
     @Override
     public String toString() {
         return "AggregateProcessor[to: " + processor + "]";
@@ -1144,7 +1129,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         if (aggregateController == null) {
             aggregateController = new DefaultAggregateController();
         }
-        aggregateController.onStart(id, this);
+        aggregateController.onStart(this);
     }
 
     @Override
@@ -1154,7 +1139,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         // and is better suited for preparing to shutdown than this doStop method is
 
         if (aggregateController != null) {
-            aggregateController.onStop(id, this);
+            aggregateController.onStop(this);
         }
 
         if (recoverService != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
index 7bb3448..0888ae5 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
@@ -16,30 +16,21 @@
  */
 package org.apache.camel.processor.aggregate;
 
-import org.apache.camel.api.management.ManagedOperation;
-import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.support.ServiceSupport;
-
 /**
  * A default {@link org.apache.camel.processor.aggregate.AggregateController} that offers Java and JMX API.
  */
-@ManagedResource(description = "Aggregation controller")
-public class DefaultAggregateController extends ServiceSupport implements AggregateController {
+public class DefaultAggregateController implements AggregateController {
 
     private AggregateProcessor processor;
-    private String id;
 
-    public void onStart(String id, AggregateProcessor processor) {
-        this.id = id;
+    public void onStart(AggregateProcessor processor) {
         this.processor = processor;
     }
 
-    public void onStop(String id, AggregateProcessor processor) {
-        this.id = id;
+    public void onStop(AggregateProcessor processor) {
         this.processor = null;
     }
 
-    @ManagedOperation(description = "To force completion a group on the aggregator")
     public int forceCompletionOfGroup(String key) {
         if (processor != null) {
             return processor.forceCompletionOfGroup(key);
@@ -48,7 +39,6 @@ public class DefaultAggregateController extends ServiceSupport implements Aggreg
         }
     }
 
-    @ManagedOperation(description = "To force completion all groups on the aggregator")
     public int forceCompletionOfAllGroups() {
         if (processor != null) {
             return processor.forceCompletionOfAllGroups();
@@ -57,15 +47,4 @@ public class DefaultAggregateController extends ServiceSupport implements Aggreg
         }
     }
 
-    protected void doStart() throws Exception {
-        // noop
-    }
-
-    protected void doStop() throws Exception {
-        // noop
-    }
-
-    public String toString() {
-        return "DefaultAggregateController[" + id + "]";
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
index e00fdfd..e9dee08 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
@@ -29,7 +29,14 @@ import org.junit.Test;
  */
 public class AggregateControllerTest extends ContextTestSupport {
 
-    private AggregateController controller = new DefaultAggregateController();
+    private AggregateController controller;
+
+    public AggregateController getAggregateController() {
+        if (controller == null) {
+            controller = new DefaultAggregateController();
+        }
+        return controller;
+    }
 
     @Test
     public void testForceCompletionOfAll() throws Exception {
@@ -46,7 +53,7 @@ public class AggregateControllerTest extends ContextTestSupport {
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
         getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
 
-        int groups = controller.forceCompletionOfAllGroups();
+        int groups = getAggregateController().forceCompletionOfAllGroups();
         assertEquals(2, groups);
 
         assertMockEndpointsSatisfied();
@@ -67,7 +74,7 @@ public class AggregateControllerTest extends ContextTestSupport {
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3");
         getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
 
-        int groups = controller.forceCompletionOfGroup("1");
+        int groups = getAggregateController().forceCompletionOfGroup("1");
         assertEquals(1, groups);
 
         assertMockEndpointsSatisfied();
@@ -76,13 +83,12 @@ public class AggregateControllerTest extends ContextTestSupport {
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
-
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .aggregate(header("id"), new MyAggregationStrategy()).aggregateController(controller)
+                        .aggregate(header("id"), new MyAggregationStrategy()).aggregateController(getAggregateController())
                         .completionSize(10)
-                    .to("mock:aggregated");
+                        .to("mock:aggregated");
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
index e541442..9ae8b61 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
@@ -61,7 +61,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().contains("END");
 
-        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionPredicate(complete);
         ap.setEagerCheckCompletion(false);
         ap.start();
@@ -102,7 +102,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().isEqualTo("END");
 
-        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionPredicate(complete);
         ap.setEagerCheckCompletion(true);
         ap.start();
@@ -150,7 +150,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionSize(3);
         ap.setEagerCheckCompletion(eager);
         ap.start();
@@ -198,7 +198,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionTimeout(3000);
         ap.setEagerCheckCompletion(eager);
         ap.start();
@@ -247,7 +247,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionInterval(3000);
         ap.start();
 
@@ -288,7 +288,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().contains("END");
 
-        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionPredicate(complete);
         ap.setIgnoreInvalidCorrelationKeys(true);
 
@@ -328,7 +328,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().contains("END");
 
-        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionPredicate(complete);
 
         ap.start();
@@ -373,7 +373,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().contains("END");
 
-        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionPredicate(complete);
         ap.setCloseCorrelationKeyOnCompletion(1000);
 
@@ -418,7 +418,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionSize(100);
         ap.setCompletionFromBatchConsumer(true);
 
@@ -515,7 +515,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setEagerCheckCompletion(true);
         ap.setCompletionPredicate(body().isEqualTo("END"));
         if (handler != null) {
@@ -566,7 +566,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         ap.setCompletionSize(10);
         ap.start();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
index 45f0c47..987f9a6 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
@@ -59,7 +59,7 @@ public class AggregateProcessorTimeoutCompletionRestartTest extends ContextTestS
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         // start with a high timeout so no completes before we stop
         ap.setCompletionTimeout(2000);
         ap.start();
@@ -101,7 +101,7 @@ public class AggregateProcessorTimeoutCompletionRestartTest extends ContextTestS
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         // start with a high timeout so no completes before we stop
         ap.setCompletionTimeoutExpression(header("myTimeout"));
         ap.start();
@@ -145,7 +145,7 @@ public class AggregateProcessorTimeoutCompletionRestartTest extends ContextTestS
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
         // start with a high timeout so no completes before we stop
         ap.setCompletionTimeoutExpression(header("myTimeout"));
         ap.start();

http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.java
new file mode 100644
index 0000000..d3b7518
--- /dev/null
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor.aggregator;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.aggregate.AggregateController;
+import org.apache.camel.processor.aggregator.AggregateControllerTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+/**
+ * @version 
+ */
+public class SpringAggregateControllerTest extends AggregateControllerTest {
+
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.xml");
+    }
+
+    @Override
+    public AggregateController getAggregateController() {
+        return context.getRegistry().lookupByNameAndType("myController", AggregateController.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.xml
new file mode 100644
index 0000000..7c2aceb
--- /dev/null
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+
+    <camelContext xmlns="http://camel.apache.org/schema/spring">
+        <route>
+            <from uri="direct:start"/>
+            <aggregate strategyRef="aggregatorStrategy" completionSize="10" aggregateControllerRef="myController">
+                <correlationExpression>
+                    <simple>header.id</simple>
+                </correlationExpression>
+                <completionSize>
+                    <constant>10</constant>
+                </completionSize>
+                <to uri="mock:aggregated"/>
+            </aggregate>
+        </route>
+    </camelContext>
+
+    <bean id="myController" class="org.apache.camel.processor.aggregate.DefaultAggregateController"/>
+    <bean id="aggregatorStrategy" class="org.apache.camel.processor.aggregator.AggregateControllerTest.MyAggregationStrategy"/>
+
+</beans>