You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/21 10:04:04 UTC

[2/3] camel git commit: CAMEL-7434: Aggregate now allows using a controller to control completion of groups from external source.

CAMEL-7434: Aggregate now allows using a controller to control completion of groups from external source.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a664b08c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a664b08c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a664b08c

Branch: refs/heads/master
Commit: a664b08cb3beb975ee5fe4e6971ecb5525e9a501
Parents: 53500cf
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Mar 21 09:04:51 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Mar 21 09:08:20 2015 +0100

----------------------------------------------------------------------
 .../mbean/ManagedAggregateProcessorMBean.java   |  81 ++++++++++
 .../DefaultManagementObjectStrategy.java        |   4 +
 .../mbean/ManagedAggregateProcessor.java        | 152 ++++++++++++++++++
 .../processor/aggregate/AggregateProcessor.java |  16 +-
 .../ManagedAggregateControllerTest.java         | 159 +++++++++++++++++++
 .../aggregator/AggregateProcessorTest.java      |  22 +--
 ...teProcessorTimeoutCompletionRestartTest.java |   6 +-
 7 files changed, 424 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a664b08c/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
new file mode 100644
index 0000000..f4bed8d
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+
+public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean {
+
+    @ManagedAttribute(description = "Correlation Expression")
+    String getCorrelationExpression();
+
+    @ManagedAttribute(description = "Completion timeout in millis")
+    long getCompletionTimeout();
+
+    @ManagedAttribute(description = "Completion timeout expression")
+    String getCompletionTimeoutExpression();
+
+    @ManagedAttribute(description = "Completion interval in millis")
+    long getCompletionInterval();
+
+    @ManagedAttribute(description = "Completion size")
+    int getCompletionSize();
+
+    @ManagedAttribute(description = "Completion size expression")
+    String getCompletionSizeExpression();
+
+    @ManagedAttribute(description = "Complete from batch consumers")
+    boolean isCompletionFromBatchConsumer();
+
+    @ManagedAttribute(description = "Ignore invalid correlation keys")
+    boolean isIgnoreInvalidCorrelationKeys();
+
+    @ManagedAttribute(description = "Whether to close the correlation group on completion")
+    Integer getCloseCorrelationKeyOnCompletion();
+
+    @ManagedAttribute(description = "Parallel mode")
+    boolean isParallelProcessing();
+
+    @ManagedAttribute(description = "Optimistic locking")
+    boolean isOptimisticLocking();
+
+    @ManagedAttribute(description = "Whether or not to eager check for completion when a new incoming Exchange has been received")
+    boolean isEagerCheckCompletion();
+
+    @ManagedAttribute(description = "A Predicate to indicate when an aggregated exchange is complete")
+    String getCompletionPredicate();
+
+    @ManagedAttribute(description = "Whether or not exchanges which complete due to a timeout should be discarded")
+    boolean isDiscardOnCompletionTimeout();
+
+    @ManagedAttribute(description = "Indicates to complete all current aggregated exchanges when the context is stopped")
+    boolean isForceCompletionOnStop();
+
+    @ManagedAttribute(description = "Number of completed exchanges which are currently in-flight")
+    int getInProgressCompleteExchanges();
+
+    @ManagedOperation(description = "Number of groups currently in the aggregation repository")
+    int aggregationRepositoryGroups();
+
+    @ManagedOperation(description = "To force completing a specific group by its key")
+    int forceCompletionOfGroup(String key);
+
+    @ManagedOperation(description = "To force complete of all groups")
+    int forceCompletionOfAllGroups();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/a664b08c/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
index e4bff8b..76a169c 100644
--- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
@@ -31,6 +31,7 @@ import org.apache.camel.Service;
 import org.apache.camel.component.bean.BeanProcessor;
 import org.apache.camel.component.log.LogEndpoint;
 import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.management.mbean.ManagedAggregateProcessor;
 import org.apache.camel.management.mbean.ManagedBeanProcessor;
 import org.apache.camel.management.mbean.ManagedBrowsableEndpoint;
 import org.apache.camel.management.mbean.ManagedCamelContext;
@@ -58,6 +59,7 @@ import org.apache.camel.processor.ErrorHandler;
 import org.apache.camel.processor.SendProcessor;
 import org.apache.camel.processor.Throttler;
 import org.apache.camel.processor.ThroughputLogger;
+import org.apache.camel.processor.aggregate.AggregateProcessor;
 import org.apache.camel.processor.idempotent.IdempotentConsumer;
 import org.apache.camel.spi.BrowsableEndpoint;
 import org.apache.camel.spi.EventNotifier;
@@ -197,6 +199,8 @@ public class DefaultManagementObjectStrategy implements ManagementObjectStrategy
                 answer = new ManagedBeanProcessor(context, (BeanProcessor) target, definition);
             } else if (target instanceof IdempotentConsumer) {
                 answer = new ManagedIdempotentConsumer(context, (IdempotentConsumer) target, definition);
+            } else if (target instanceof AggregateProcessor) {
+                answer = new ManagedAggregateProcessor(context, (AggregateProcessor) target, (org.apache.camel.model.AggregateDefinition) definition);
             } else if (target instanceof org.apache.camel.spi.ManagementAware) {
                 return ((org.apache.camel.spi.ManagementAware<Processor>) target).getManagedObject(processor);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/a664b08c/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
new file mode 100644
index 0000000..3b09c52
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import java.util.Set;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.ManagedAggregateProcessorMBean;
+import org.apache.camel.model.AggregateDefinition;
+import org.apache.camel.processor.aggregate.AggregateProcessor;
+import org.apache.camel.spi.ManagementStrategy;
+
+/**
+ * @version 
+ */
+@ManagedResource(description = "Managed AggregateProcessor")
+public class ManagedAggregateProcessor extends ManagedProcessor implements ManagedAggregateProcessorMBean {
+    private final AggregateProcessor processor;
+
+    public ManagedAggregateProcessor(CamelContext context, AggregateProcessor processor, AggregateDefinition definition) {
+        super(context, processor, definition);
+        this.processor = processor;
+    }
+
+    public void init(ManagementStrategy strategy) {
+        super.init(strategy);
+    }
+
+    public AggregateProcessor getProcessor() {
+        return processor;
+    }
+
+    public String getCorrelationExpression() {
+        if (processor.getCorrelationExpression() != null) {
+            return processor.getCorrelationExpression().toString();
+        } else {
+            return null;
+        }
+    }
+
+    public long getCompletionTimeout() {
+        return processor.getCompletionTimeout();
+    }
+
+    public String getCompletionTimeoutExpression() {
+        if (processor.getCompletionTimeoutExpression() != null) {
+            return processor.getCompletionTimeoutExpression().toString();
+        } else {
+            return null;
+        }
+    }
+
+    public long getCompletionInterval() {
+        return processor.getCompletionInterval();
+    }
+
+    public int getCompletionSize() {
+        return processor.getCompletionSize();
+    }
+
+    public String getCompletionSizeExpression() {
+        if (processor.getCompletionSizeExpression() != null) {
+            return processor.getCompletionSizeExpression().toString();
+        } else {
+            return null;
+        }
+    }
+
+    public boolean isCompletionFromBatchConsumer() {
+        return processor.isCompletionFromBatchConsumer();
+    }
+
+    public boolean isIgnoreInvalidCorrelationKeys() {
+        return processor.isIgnoreInvalidCorrelationKeys();
+    }
+
+    public Integer getCloseCorrelationKeyOnCompletion() {
+        return processor.getCloseCorrelationKeyOnCompletion();
+    }
+
+    public boolean isParallelProcessing() {
+        return processor.isParallelProcessing();
+    }
+
+    public boolean isOptimisticLocking() {
+        return processor.isOptimisticLocking();
+    }
+
+    public boolean isEagerCheckCompletion() {
+        return processor.isEagerCheckCompletion();
+    }
+
+    public String getCompletionPredicate() {
+        if (processor.getCompletionPredicate() != null) {
+            return processor.getCompletionPredicate().toString();
+        } else {
+            return null;
+        }
+    }
+
+    public boolean isDiscardOnCompletionTimeout() {
+        return processor.isDiscardOnCompletionTimeout();
+    }
+
+    public boolean isForceCompletionOnStop() {
+        return processor.isCompletionFromBatchConsumer();
+    }
+
+    public int getInProgressCompleteExchanges() {
+        return processor.getInProgressCompleteExchanges();
+    }
+
+    public int aggregationRepositoryGroups() {
+        Set<String> keys = processor.getAggregationRepository().getKeys();
+        if (keys != null) {
+            return keys.size();
+        } else {
+            return 0;
+        }
+    }
+
+    public int forceCompletionOfGroup(String key) {
+        if (processor.getAggregateController() != null) {
+            return processor.getAggregateController().forceCompletionOfGroup(key);
+        } else {
+            return 0;
+        }
+    }
+
+    public int forceCompletionOfAllGroups() {
+        if (processor.getAggregateController() != null) {
+            return processor.getAggregateController().forceCompletionOfAllGroups();
+        } else {
+            return 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a664b08c/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 33c97b4..1a773ea 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -134,6 +134,13 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
 
     private ProducerTemplate deadLetterProducerTemplate;
 
+    @Deprecated
+    public AggregateProcessor(CamelContext camelContext, Processor processor,
+                              Expression correlationExpression, AggregationStrategy aggregationStrategy,
+                              ExecutorService executorService, boolean shutdownExecutorService) {
+        this(camelContext, "aggregate", processor, correlationExpression, aggregationStrategy, executorService, shutdownExecutorService);
+    }
+
     public AggregateProcessor(CamelContext camelContext, String id, Processor processor,
                               Expression correlationExpression, AggregationStrategy aggregationStrategy,
                               ExecutorService executorService, boolean shutdownExecutorService) {
@@ -618,6 +625,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         timeoutMap.put(key, exchange.getExchangeId(), timeout);
     }
 
+    public int getInProgressCompleteExchanges() {
+        return inProgressCompleteExchanges.size();
+    }
+
     public Predicate getCompletionPredicate() {
         return completionPredicate;
     }
@@ -1130,9 +1141,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             ServiceHelper.startService(timeoutMap);
         }
 
-        if (aggregateController != null) {
-            aggregateController.onStart(id, this);
+        if (aggregateController == null) {
+            aggregateController = new DefaultAggregateController();
         }
+        aggregateController.onStart(id, this);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/a664b08c/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
new file mode 100644
index 0000000..7860eca
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregateController;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.DefaultAggregateController;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ManagedAggregateControllerTest extends ManagementTestSupport {
+
+    private AggregateController controller = new DefaultAggregateController();
+
+    @Test
+    public void testForceCompletionOfAll() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        MBeanServer mbeanServer = getMBeanServer();
+
+        ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"myAggregator\"");
+        assertTrue(mbeanServer.isRegistered(on));
+
+        getMockEndpoint("mock:aggregated").expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test2", "id", "2");
+        template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test4", "id", "2");
+
+        getMockEndpoint("mock:aggregated").expectedMessageCount(2);
+        getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
+        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+
+        Integer pending = (Integer) mbeanServer.invoke(on, "aggregationRepositoryGroups", null, null);
+        assertEquals(2, pending.intValue());
+
+        Integer groups = (Integer) mbeanServer.invoke(on, "forceCompletionOfAllGroups", null, null);
+        assertEquals(2, groups.intValue());
+
+        assertMockEndpointsSatisfied();
+
+        Long completed = (Long) mbeanServer.getAttribute(on, "ExchangesCompleted");
+        assertEquals(4, completed.longValue());
+
+        Integer size = (Integer) mbeanServer.getAttribute(on, "CompletionSize");
+        assertEquals(10, size.longValue());
+
+        String cor = (String) mbeanServer.getAttribute(on, "CorrelationExpression");
+        assertEquals("header(id)", cor);
+
+        Integer inflight = (Integer) mbeanServer.getAttribute(on, "InProgressCompleteExchanges");
+        assertEquals(0, inflight.intValue());
+
+        pending = (Integer) mbeanServer.invoke(on, "aggregationRepositoryGroups", null, null);
+        assertEquals(0, pending.intValue());
+    }
+
+    @Test
+    public void testForceCompletionOfGroup() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        MBeanServer mbeanServer = getMBeanServer();
+
+        ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"myAggregator\"");
+        assertTrue(mbeanServer.isRegistered(on));
+
+        getMockEndpoint("mock:aggregated").expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test2", "id", "2");
+        template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test4", "id", "2");
+
+        assertMockEndpointsSatisfied();
+
+        getMockEndpoint("mock:aggregated").expectedMessageCount(1);
+        getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3");
+        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+
+        Integer pending = (Integer) mbeanServer.invoke(on, "aggregationRepositoryGroups", null, null);
+        assertEquals(2, pending.intValue());
+
+        Integer groups = (Integer) mbeanServer.invoke(on, "forceCompletionOfGroup", new Object[]{"1"}, new String[]{"java.lang.String"});
+        assertEquals(1, groups.intValue());
+
+        assertMockEndpointsSatisfied();
+
+        Long completed = (Long) mbeanServer.getAttribute(on, "ExchangesCompleted");
+        assertEquals(4, completed.longValue());
+
+        Integer size = (Integer) mbeanServer.getAttribute(on, "CompletionSize");
+        assertEquals(10, size.longValue());
+
+        String cor = (String) mbeanServer.getAttribute(on, "CorrelationExpression");
+        assertEquals("header(id)", cor);
+
+        Integer inflight = (Integer) mbeanServer.getAttribute(on, "InProgressCompleteExchanges");
+        assertEquals(0, inflight.intValue());
+
+        pending = (Integer) mbeanServer.invoke(on, "aggregationRepositoryGroups", null, null);
+        assertEquals(1, pending.intValue());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new MyAggregationStrategy()).aggregateController(controller).id("myAggregator")
+                        .completionSize(10)
+                    .to("mock:aggregated");
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/a664b08c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
index 9ae8b61..e541442 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
@@ -61,7 +61,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().contains("END");
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
         ap.setCompletionPredicate(complete);
         ap.setEagerCheckCompletion(false);
         ap.start();
@@ -102,7 +102,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().isEqualTo("END");
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
         ap.setCompletionPredicate(complete);
         ap.setEagerCheckCompletion(true);
         ap.start();
@@ -150,7 +150,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
         ap.setCompletionSize(3);
         ap.setEagerCheckCompletion(eager);
         ap.start();
@@ -198,7 +198,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
         ap.setCompletionTimeout(3000);
         ap.setEagerCheckCompletion(eager);
         ap.start();
@@ -247,7 +247,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
         ap.setCompletionInterval(3000);
         ap.start();
 
@@ -288,7 +288,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().contains("END");
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
         ap.setCompletionPredicate(complete);
         ap.setIgnoreInvalidCorrelationKeys(true);
 
@@ -328,7 +328,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().contains("END");
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
         ap.setCompletionPredicate(complete);
 
         ap.start();
@@ -373,7 +373,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         AggregationStrategy as = new BodyInAggregatingStrategy();
         Predicate complete = body().contains("END");
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
         ap.setCompletionPredicate(complete);
         ap.setCloseCorrelationKeyOnCompletion(1000);
 
@@ -418,7 +418,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
         ap.setCompletionSize(100);
         ap.setCompletionFromBatchConsumer(true);
 
@@ -515,7 +515,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
         ap.setEagerCheckCompletion(true);
         ap.setCompletionPredicate(body().isEqualTo("END"));
         if (handler != null) {
@@ -566,7 +566,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
         ap.setCompletionSize(10);
         ap.start();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/a664b08c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
index 987f9a6..45f0c47 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
@@ -59,7 +59,7 @@ public class AggregateProcessorTimeoutCompletionRestartTest extends ContextTestS
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
         // start with a high timeout so no completes before we stop
         ap.setCompletionTimeout(2000);
         ap.start();
@@ -101,7 +101,7 @@ public class AggregateProcessorTimeoutCompletionRestartTest extends ContextTestS
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
         // start with a high timeout so no completes before we stop
         ap.setCompletionTimeoutExpression(header("myTimeout"));
         ap.start();
@@ -145,7 +145,7 @@ public class AggregateProcessorTimeoutCompletionRestartTest extends ContextTestS
         Expression corr = header("id");
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+        AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
         // start with a high timeout so no completes before we stop
         ap.setCompletionTimeoutExpression(header("myTimeout"));
         ap.start();