You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/21 10:04:03 UTC
[1/3] camel git commit: CAMEL-7434: Aggregate now allows using a
controller to control completion of groups from external source.
Repository: camel
Updated Branches:
refs/heads/master ae8ce7379 -> 4bb4be751
CAMEL-7434: Aggregate now allows using a controller to control completion of groups from external source.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/53500cf7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/53500cf7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/53500cf7
Branch: refs/heads/master
Commit: 53500cf7aca91373fad5e3f77c5537f493351df7
Parents: ae8ce73
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Mar 21 08:31:43 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Mar 21 08:31:43 2015 +0100
----------------------------------------------------------------------
.../apache/camel/model/AggregateDefinition.java | 41 +++++++-
.../aggregate/AggregateController.java | 56 ++++++++++
.../processor/aggregate/AggregateProcessor.java | 57 +++++++++-
.../aggregate/DefaultAggregateController.java | 71 +++++++++++++
.../aggregator/AggregateControllerTest.java | 103 +++++++++++++++++++
5 files changed, 324 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/53500cf7/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index f5d3746..087bc74 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -35,6 +35,7 @@ import org.apache.camel.Processor;
import org.apache.camel.builder.ExpressionClause;
import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.processor.CamelInternalProcessor;
+import org.apache.camel.processor.aggregate.AggregateController;
import org.apache.camel.processor.aggregate.AggregateProcessor;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
@@ -113,6 +114,10 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
private Boolean discardOnCompletionTimeout;
@XmlAttribute
private Boolean forceCompletionOnStop;
+ @XmlTransient
+ private AggregateController aggregateController;
+ @XmlAttribute
+ private String aggregateControllerRef;
@XmlElementRef
private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
@@ -182,7 +187,7 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
shutdownThreadPool = true;
}
- AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), internal,
+ AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), getId(), internal,
correlation, strategy, threadPool, shutdownThreadPool);
AggregationRepository repository = createAggregationRepository(routeContext);
@@ -190,6 +195,10 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
answer.setAggregationRepository(repository);
}
+ if (getAggregateController() == null && getAggregateControllerRef() != null) {
+ setAggregateController(routeContext.mandatoryLookup(getAggregateControllerRef(), AggregateController.class));
+ }
+
// this EIP supports using a shared timeout checker thread pool or fallback to create a new thread pool
boolean shutdownTimeoutThreadPool = false;
ScheduledExecutorService timeoutThreadPool = timeoutCheckerExecutorService;
@@ -264,6 +273,9 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
} else {
answer.setOptimisticLockRetryPolicy(optimisticLockRetryPolicy);
}
+ if (getAggregateController() != null) {
+ answer.setAggregateController(getAggregateController());
+ }
return answer;
}
@@ -613,6 +625,22 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
this.forceCompletionOnStop = forceCompletionOnStop;
}
+ public AggregateController getAggregateController() {
+ return aggregateController;
+ }
+
+ public void setAggregateController(AggregateController aggregateController) {
+ this.aggregateController = aggregateController;
+ }
+
+ public String getAggregateControllerRef() {
+ return aggregateControllerRef;
+ }
+
+ public void setAggregateControllerRef(String aggregateControllerRef) {
+ this.aggregateControllerRef = aggregateControllerRef;
+ }
+
// Fluent API
//-------------------------------------------------------------------------
@@ -900,7 +928,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
setTimeoutCheckerExecutorServiceRef(executorServiceRef);
return this;
}
-
+
+ /**
+ * To use a {@link org.apache.camel.processor.aggregate.AggregateController} to allow external sources to control
+ * this aggregator.
+ */
+ public AggregateDefinition aggregateController(AggregateController aggregateController) {
+ setAggregateController(aggregateController);
+ return this;
+ }
+
// Section - Methods from ExpressionNode
// Needed to copy methods from ExpressionNode here so that I could specify the
// correlation expression as optional in JAXB
http://git-wip-us.apache.org/repos/asf/camel/blob/53500cf7/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
new file mode 100644
index 0000000..dab9d6e
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+/**
+ * A controller which allows controlling a {@link org.apache.camel.processor.aggregate.AggregateProcessor} from an
+ * external source, such as Java API or JMX. This can be used to force completion of aggregated groups, and more.
+ */
+public interface AggregateController {
+
+ /**
+ * Callback when the aggregate processor is started.
+ *
+ * @param id the aggregator id
+ * @param processor the aggregate processor
+ */
+ void onStart(String id, AggregateProcessor processor);
+
+ /**
+ * Callback when the aggregate processor is stopped.
+ *
+ * @param id the aggregator id
+ * @param processor the aggregate processor
+ */
+ void onStop(String id, AggregateProcessor processor);
+
+ /**
+ * To force completing a specific group by its key.
+ *
+ * @param key the key
+ * @return <tt>1</tt> if the group was forced completed, <tt>0</tt> if the group does not exists
+ */
+ int forceCompletionOfGroup(String key);
+
+ /**
+ * To force complete of all groups
+ *
+ * @return number of groups that was forced completed
+ */
+ int forceCompletionOfAllGroups();
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/53500cf7/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 244501a..33c97b4 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -47,6 +47,7 @@ import org.apache.camel.TimeoutMap;
import org.apache.camel.Traceable;
import org.apache.camel.spi.AggregationRepository;
import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.HasId;
import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.spi.ShutdownPrepared;
@@ -79,7 +80,7 @@ import org.slf4j.LoggerFactory;
* and older prices are discarded). Another idea is to combine line item messages
* together into a single invoice message.
*/
-public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared {
+public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared, HasId {
public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker";
@@ -87,9 +88,11 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
private final Lock lock = new ReentrantLock();
private final CamelContext camelContext;
+ private final String id;
private final Processor processor;
private AggregationStrategy aggregationStrategy;
private Expression correlationExpression;
+ private AggregateController aggregateController;
private final ExecutorService executorService;
private final boolean shutdownExecutorService;
private OptimisticLockRetryPolicy optimisticLockRetryPolicy = new OptimisticLockRetryPolicy();
@@ -131,15 +134,17 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
private ProducerTemplate deadLetterProducerTemplate;
- public AggregateProcessor(CamelContext camelContext, Processor processor,
+ public AggregateProcessor(CamelContext camelContext, String id, Processor processor,
Expression correlationExpression, AggregationStrategy aggregationStrategy,
ExecutorService executorService, boolean shutdownExecutorService) {
ObjectHelper.notNull(camelContext, "camelContext");
+ ObjectHelper.notNull(id, "id");
ObjectHelper.notNull(processor, "processor");
ObjectHelper.notNull(correlationExpression, "correlationExpression");
ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy");
ObjectHelper.notNull(executorService, "executorService");
this.camelContext = camelContext;
+ this.id = id;
this.processor = processor;
this.correlationExpression = correlationExpression;
this.aggregationStrategy = aggregationStrategy;
@@ -148,6 +153,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass());
}
+ public String getId() {
+ return id;
+ }
+
@Override
public String toString() {
return "AggregateProcessor[to: " + processor + "]";
@@ -773,6 +782,14 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
this.correlationExpression = correlationExpression;
}
+ public AggregateController getAggregateController() {
+ return aggregateController;
+ }
+
+ public void setAggregateController(AggregateController aggregateController) {
+ this.aggregateController = aggregateController;
+ }
+
/**
* On completion task which keeps the booking of the in progress up to date
*/
@@ -1112,6 +1129,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
restoreTimeoutMapFromAggregationRepository();
ServiceHelper.startService(timeoutMap);
}
+
+ if (aggregateController != null) {
+ aggregateController.onStart(id, this);
+ }
}
@Override
@@ -1120,6 +1141,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
// as this is handled in the prepareShutdown method which is also invoked when stopping a route
// and is better suited for preparing to shutdown than this doStop method is
+ if (aggregateController != null) {
+ aggregateController.onStop(id, this);
+ }
+
if (recoverService != null) {
camelContext.getExecutorServiceManager().shutdown(recoverService);
}
@@ -1184,6 +1209,34 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
super.doShutdown();
}
+ public int forceCompletionOfGroup(String key) {
+ // must acquire the shared aggregation lock to be able to trigger force completion
+ int total = 0;
+
+ if (!optimisticLocking) { lock.lock(); }
+ try {
+ Exchange exchange = aggregationRepository.get(camelContext, key);
+ if (exchange != null) {
+ total = 1;
+ LOG.trace("Force completion triggered for correlation key: {}", key);
+ // indicate it was completed by a force completion request
+ exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+ Exchange answer = onCompletion(key, exchange, exchange, false);
+ if (answer != null) {
+ onSubmitCompletion(key, answer);
+ }
+ }
+ } finally {
+ if (!optimisticLocking) { lock.unlock(); }
+ }
+ LOG.trace("Completed force completion of group {}", key);
+
+ if (total > 0) {
+ LOG.debug("Forcing completion of group {} with {} exchanges", key, total);
+ }
+ return total;
+ }
+
public int forceCompletionOfAllGroups() {
// only run if CamelContext has been fully started or is stopping
http://git-wip-us.apache.org/repos/asf/camel/blob/53500cf7/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
new file mode 100644
index 0000000..7bb3448
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.support.ServiceSupport;
+
+/**
+ * A default {@link org.apache.camel.processor.aggregate.AggregateController} that offers Java and JMX API.
+ */
+@ManagedResource(description = "Aggregation controller")
+public class DefaultAggregateController extends ServiceSupport implements AggregateController {
+
+ private AggregateProcessor processor;
+ private String id;
+
+ public void onStart(String id, AggregateProcessor processor) {
+ this.id = id;
+ this.processor = processor;
+ }
+
+ public void onStop(String id, AggregateProcessor processor) {
+ this.id = id;
+ this.processor = null;
+ }
+
+ @ManagedOperation(description = "To force completion a group on the aggregator")
+ public int forceCompletionOfGroup(String key) {
+ if (processor != null) {
+ return processor.forceCompletionOfGroup(key);
+ } else {
+ return 0;
+ }
+ }
+
+ @ManagedOperation(description = "To force completion all groups on the aggregator")
+ public int forceCompletionOfAllGroups() {
+ if (processor != null) {
+ return processor.forceCompletionOfAllGroups();
+ } else {
+ return 0;
+ }
+ }
+
+ protected void doStart() throws Exception {
+ // noop
+ }
+
+ protected void doStop() throws Exception {
+ // noop
+ }
+
+ public String toString() {
+ return "DefaultAggregateController[" + id + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/53500cf7/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
new file mode 100644
index 0000000..e00fdfd
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregateController;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.DefaultAggregateController;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class AggregateControllerTest extends ContextTestSupport {
+
+ private AggregateController controller = new DefaultAggregateController();
+
+ @Test
+ public void testForceCompletionOfAll() throws Exception {
+ getMockEndpoint("mock:aggregated").expectedMessageCount(0);
+
+ template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test2", "id", "2");
+ template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test4", "id", "2");
+
+ assertMockEndpointsSatisfied();
+
+ getMockEndpoint("mock:aggregated").expectedMessageCount(2);
+ getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
+ getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+
+ int groups = controller.forceCompletionOfAllGroups();
+ assertEquals(2, groups);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testForceCompletionOfGroup() throws Exception {
+ getMockEndpoint("mock:aggregated").expectedMessageCount(0);
+
+ template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test2", "id", "2");
+ template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test4", "id", "2");
+
+ assertMockEndpointsSatisfied();
+
+ getMockEndpoint("mock:aggregated").expectedMessageCount(1);
+ getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3");
+ getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+
+ int groups = controller.forceCompletionOfGroup("1");
+ assertEquals(1, groups);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new MyAggregationStrategy()).aggregateController(controller)
+ .completionSize(10)
+ .to("mock:aggregated");
+ }
+ };
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ return oldExchange;
+ }
+ }
+}
\ No newline at end of file
[3/3] camel git commit: CAMEL-7434: Aggregate now allows using a
controller to control completion of groups from external source.
Posted by da...@apache.org.
CAMEL-7434: Aggregate now allows using a controller to control completion of groups from external source.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4bb4be75
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4bb4be75
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4bb4be75
Branch: refs/heads/master
Commit: 4bb4be75181a89d8f5f12217188be3455d35571b
Parents: a664b08
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Mar 21 10:05:58 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Mar 21 10:05:58 2015 +0100
----------------------------------------------------------------------
.../apache/camel/model/AggregateDefinition.java | 2 +-
.../aggregate/AggregateController.java | 6 +--
.../processor/aggregate/AggregateProcessor.java | 21 ++--------
.../aggregate/DefaultAggregateController.java | 27 ++----------
.../aggregator/AggregateControllerTest.java | 18 +++++---
.../aggregator/AggregateProcessorTest.java | 22 +++++-----
...teProcessorTimeoutCompletionRestartTest.java | 6 +--
.../SpringAggregateControllerTest.java | 39 ++++++++++++++++++
.../SpringAggregateControllerTest.xml | 43 ++++++++++++++++++++
9 files changed, 117 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 087bc74..c880c8a 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -187,7 +187,7 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
shutdownThreadPool = true;
}
- AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), getId(), internal,
+ AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), internal,
correlation, strategy, threadPool, shutdownThreadPool);
AggregationRepository repository = createAggregationRepository(routeContext);
http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
index dab9d6e..974ee9a 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
@@ -25,18 +25,16 @@ public interface AggregateController {
/**
* Callback when the aggregate processor is started.
*
- * @param id the aggregator id
* @param processor the aggregate processor
*/
- void onStart(String id, AggregateProcessor processor);
+ void onStart(AggregateProcessor processor);
/**
* Callback when the aggregate processor is stopped.
*
- * @param id the aggregator id
* @param processor the aggregate processor
*/
- void onStop(String id, AggregateProcessor processor);
+ void onStop(AggregateProcessor processor);
/**
* To force completing a specific group by its key.
http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 1a773ea..8e34284 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -47,7 +47,6 @@ import org.apache.camel.TimeoutMap;
import org.apache.camel.Traceable;
import org.apache.camel.spi.AggregationRepository;
import org.apache.camel.spi.ExceptionHandler;
-import org.apache.camel.spi.HasId;
import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.spi.ShutdownPrepared;
@@ -80,7 +79,7 @@ import org.slf4j.LoggerFactory;
* and older prices are discarded). Another idea is to combine line item messages
* together into a single invoice message.
*/
-public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared, HasId {
+public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared {
public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker";
@@ -88,7 +87,6 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
private final Lock lock = new ReentrantLock();
private final CamelContext camelContext;
- private final String id;
private final Processor processor;
private AggregationStrategy aggregationStrategy;
private Expression correlationExpression;
@@ -134,24 +132,15 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
private ProducerTemplate deadLetterProducerTemplate;
- @Deprecated
public AggregateProcessor(CamelContext camelContext, Processor processor,
Expression correlationExpression, AggregationStrategy aggregationStrategy,
ExecutorService executorService, boolean shutdownExecutorService) {
- this(camelContext, "aggregate", processor, correlationExpression, aggregationStrategy, executorService, shutdownExecutorService);
- }
-
- public AggregateProcessor(CamelContext camelContext, String id, Processor processor,
- Expression correlationExpression, AggregationStrategy aggregationStrategy,
- ExecutorService executorService, boolean shutdownExecutorService) {
ObjectHelper.notNull(camelContext, "camelContext");
- ObjectHelper.notNull(id, "id");
ObjectHelper.notNull(processor, "processor");
ObjectHelper.notNull(correlationExpression, "correlationExpression");
ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy");
ObjectHelper.notNull(executorService, "executorService");
this.camelContext = camelContext;
- this.id = id;
this.processor = processor;
this.correlationExpression = correlationExpression;
this.aggregationStrategy = aggregationStrategy;
@@ -160,10 +149,6 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass());
}
- public String getId() {
- return id;
- }
-
@Override
public String toString() {
return "AggregateProcessor[to: " + processor + "]";
@@ -1144,7 +1129,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
if (aggregateController == null) {
aggregateController = new DefaultAggregateController();
}
- aggregateController.onStart(id, this);
+ aggregateController.onStart(this);
}
@Override
@@ -1154,7 +1139,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
// and is better suited for preparing to shutdown than this doStop method is
if (aggregateController != null) {
- aggregateController.onStop(id, this);
+ aggregateController.onStop(this);
}
if (recoverService != null) {
http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
index 7bb3448..0888ae5 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
@@ -16,30 +16,21 @@
*/
package org.apache.camel.processor.aggregate;
-import org.apache.camel.api.management.ManagedOperation;
-import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.support.ServiceSupport;
-
/**
* A default {@link org.apache.camel.processor.aggregate.AggregateController} that offers Java and JMX API.
*/
-@ManagedResource(description = "Aggregation controller")
-public class DefaultAggregateController extends ServiceSupport implements AggregateController {
+public class DefaultAggregateController implements AggregateController {
private AggregateProcessor processor;
- private String id;
- public void onStart(String id, AggregateProcessor processor) {
- this.id = id;
+ public void onStart(AggregateProcessor processor) {
this.processor = processor;
}
- public void onStop(String id, AggregateProcessor processor) {
- this.id = id;
+ public void onStop(AggregateProcessor processor) {
this.processor = null;
}
- @ManagedOperation(description = "To force completion a group on the aggregator")
public int forceCompletionOfGroup(String key) {
if (processor != null) {
return processor.forceCompletionOfGroup(key);
@@ -48,7 +39,6 @@ public class DefaultAggregateController extends ServiceSupport implements Aggreg
}
}
- @ManagedOperation(description = "To force completion all groups on the aggregator")
public int forceCompletionOfAllGroups() {
if (processor != null) {
return processor.forceCompletionOfAllGroups();
@@ -57,15 +47,4 @@ public class DefaultAggregateController extends ServiceSupport implements Aggreg
}
}
- protected void doStart() throws Exception {
- // noop
- }
-
- protected void doStop() throws Exception {
- // noop
- }
-
- public String toString() {
- return "DefaultAggregateController[" + id + "]";
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
index e00fdfd..e9dee08 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
@@ -29,7 +29,14 @@ import org.junit.Test;
*/
public class AggregateControllerTest extends ContextTestSupport {
- private AggregateController controller = new DefaultAggregateController();
+ private AggregateController controller;
+
+ public AggregateController getAggregateController() {
+ if (controller == null) {
+ controller = new DefaultAggregateController();
+ }
+ return controller;
+ }
@Test
public void testForceCompletionOfAll() throws Exception {
@@ -46,7 +53,7 @@ public class AggregateControllerTest extends ContextTestSupport {
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
- int groups = controller.forceCompletionOfAllGroups();
+ int groups = getAggregateController().forceCompletionOfAllGroups();
assertEquals(2, groups);
assertMockEndpointsSatisfied();
@@ -67,7 +74,7 @@ public class AggregateControllerTest extends ContextTestSupport {
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3");
getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
- int groups = controller.forceCompletionOfGroup("1");
+ int groups = getAggregateController().forceCompletionOfGroup("1");
assertEquals(1, groups);
assertMockEndpointsSatisfied();
@@ -76,13 +83,12 @@ public class AggregateControllerTest extends ContextTestSupport {
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
-
@Override
public void configure() throws Exception {
from("direct:start")
- .aggregate(header("id"), new MyAggregationStrategy()).aggregateController(controller)
+ .aggregate(header("id"), new MyAggregationStrategy()).aggregateController(getAggregateController())
.completionSize(10)
- .to("mock:aggregated");
+ .to("mock:aggregated");
}
};
}
http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
index e541442..9ae8b61 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
@@ -61,7 +61,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().contains("END");
- AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionPredicate(complete);
ap.setEagerCheckCompletion(false);
ap.start();
@@ -102,7 +102,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().isEqualTo("END");
- AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionPredicate(complete);
ap.setEagerCheckCompletion(true);
ap.start();
@@ -150,7 +150,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionSize(3);
ap.setEagerCheckCompletion(eager);
ap.start();
@@ -198,7 +198,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionTimeout(3000);
ap.setEagerCheckCompletion(eager);
ap.start();
@@ -247,7 +247,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionInterval(3000);
ap.start();
@@ -288,7 +288,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().contains("END");
- AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionPredicate(complete);
ap.setIgnoreInvalidCorrelationKeys(true);
@@ -328,7 +328,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().contains("END");
- AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionPredicate(complete);
ap.start();
@@ -373,7 +373,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().contains("END");
- AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionPredicate(complete);
ap.setCloseCorrelationKeyOnCompletion(1000);
@@ -418,7 +418,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionSize(100);
ap.setCompletionFromBatchConsumer(true);
@@ -515,7 +515,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setEagerCheckCompletion(true);
ap.setCompletionPredicate(body().isEqualTo("END"));
if (handler != null) {
@@ -566,7 +566,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
ap.setCompletionSize(10);
ap.start();
http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
index 45f0c47..987f9a6 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
@@ -59,7 +59,7 @@ public class AggregateProcessorTimeoutCompletionRestartTest extends ContextTestS
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
// start with a high timeout so no completes before we stop
ap.setCompletionTimeout(2000);
ap.start();
@@ -101,7 +101,7 @@ public class AggregateProcessorTimeoutCompletionRestartTest extends ContextTestS
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
// start with a high timeout so no completes before we stop
ap.setCompletionTimeoutExpression(header("myTimeout"));
ap.start();
@@ -145,7 +145,7 @@ public class AggregateProcessorTimeoutCompletionRestartTest extends ContextTestS
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
// start with a high timeout so no completes before we stop
ap.setCompletionTimeoutExpression(header("myTimeout"));
ap.start();
http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.java
new file mode 100644
index 0000000..d3b7518
--- /dev/null
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor.aggregator;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.aggregate.AggregateController;
+import org.apache.camel.processor.aggregator.AggregateControllerTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+/**
+ * @version
+ */
+public class SpringAggregateControllerTest extends AggregateControllerTest {
+
+ protected CamelContext createCamelContext() throws Exception {
+ return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.xml");
+ }
+
+ @Override
+ public AggregateController getAggregateController() {
+ return context.getRegistry().lookupByNameAndType("myController", AggregateController.class);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/4bb4be75/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.xml
new file mode 100644
index 0000000..7c2aceb
--- /dev/null
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateControllerTest.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+ ">
+
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+ <route>
+ <from uri="direct:start"/>
+ <aggregate strategyRef="aggregatorStrategy" completionSize="10" aggregateControllerRef="myController">
+ <correlationExpression>
+ <simple>header.id</simple>
+ </correlationExpression>
+ <completionSize>
+ <constant>10</constant>
+ </completionSize>
+ <to uri="mock:aggregated"/>
+ </aggregate>
+ </route>
+ </camelContext>
+
+ <bean id="myController" class="org.apache.camel.processor.aggregate.DefaultAggregateController"/>
+ <bean id="aggregatorStrategy" class="org.apache.camel.processor.aggregator.AggregateControllerTest.MyAggregationStrategy"/>
+
+</beans>
[2/3] camel git commit: CAMEL-7434: Aggregate now allows using a
controller to control completion of groups from external source.
Posted by da...@apache.org.
CAMEL-7434: Aggregate now allows using a controller to control completion of groups from external source.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a664b08c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a664b08c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a664b08c
Branch: refs/heads/master
Commit: a664b08cb3beb975ee5fe4e6971ecb5525e9a501
Parents: 53500cf
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Mar 21 09:04:51 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Mar 21 09:08:20 2015 +0100
----------------------------------------------------------------------
.../mbean/ManagedAggregateProcessorMBean.java | 81 ++++++++++
.../DefaultManagementObjectStrategy.java | 4 +
.../mbean/ManagedAggregateProcessor.java | 152 ++++++++++++++++++
.../processor/aggregate/AggregateProcessor.java | 16 +-
.../ManagedAggregateControllerTest.java | 159 +++++++++++++++++++
.../aggregator/AggregateProcessorTest.java | 22 +--
...teProcessorTimeoutCompletionRestartTest.java | 6 +-
7 files changed, 424 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a664b08c/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
new file mode 100644
index 0000000..f4bed8d
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+
+public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean {
+
+ @ManagedAttribute(description = "Correlation Expression")
+ String getCorrelationExpression();
+
+ @ManagedAttribute(description = "Completion timeout in millis")
+ long getCompletionTimeout();
+
+ @ManagedAttribute(description = "Completion timeout expression")
+ String getCompletionTimeoutExpression();
+
+ @ManagedAttribute(description = "Completion interval in millis")
+ long getCompletionInterval();
+
+ @ManagedAttribute(description = "Completion size")
+ int getCompletionSize();
+
+ @ManagedAttribute(description = "Completion size expression")
+ String getCompletionSizeExpression();
+
+ @ManagedAttribute(description = "Complete from batch consumers")
+ boolean isCompletionFromBatchConsumer();
+
+ @ManagedAttribute(description = "Ignore invalid correlation keys")
+ boolean isIgnoreInvalidCorrelationKeys();
+
+ @ManagedAttribute(description = "Whether to close the correlation group on completion")
+ Integer getCloseCorrelationKeyOnCompletion();
+
+ @ManagedAttribute(description = "Parallel mode")
+ boolean isParallelProcessing();
+
+ @ManagedAttribute(description = "Optimistic locking")
+ boolean isOptimisticLocking();
+
+ @ManagedAttribute(description = "Whether or not to eager check for completion when a new incoming Exchange has been received")
+ boolean isEagerCheckCompletion();
+
+ @ManagedAttribute(description = "A Predicate to indicate when an aggregated exchange is complete")
+ String getCompletionPredicate();
+
+ @ManagedAttribute(description = "Whether or not exchanges which complete due to a timeout should be discarded")
+ boolean isDiscardOnCompletionTimeout();
+
+ @ManagedAttribute(description = "Indicates to complete all current aggregated exchanges when the context is stopped")
+ boolean isForceCompletionOnStop();
+
+ @ManagedAttribute(description = "Number of completed exchanges which are currently in-flight")
+ int getInProgressCompleteExchanges();
+
+ @ManagedOperation(description = "Number of groups currently in the aggregation repository")
+ int aggregationRepositoryGroups();
+
+ @ManagedOperation(description = "To force completing a specific group by its key")
+ int forceCompletionOfGroup(String key);
+
+ @ManagedOperation(description = "To force complete of all groups")
+ int forceCompletionOfAllGroups();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/a664b08c/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
index e4bff8b..76a169c 100644
--- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
@@ -31,6 +31,7 @@ import org.apache.camel.Service;
import org.apache.camel.component.bean.BeanProcessor;
import org.apache.camel.component.log.LogEndpoint;
import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.management.mbean.ManagedAggregateProcessor;
import org.apache.camel.management.mbean.ManagedBeanProcessor;
import org.apache.camel.management.mbean.ManagedBrowsableEndpoint;
import org.apache.camel.management.mbean.ManagedCamelContext;
@@ -58,6 +59,7 @@ import org.apache.camel.processor.ErrorHandler;
import org.apache.camel.processor.SendProcessor;
import org.apache.camel.processor.Throttler;
import org.apache.camel.processor.ThroughputLogger;
+import org.apache.camel.processor.aggregate.AggregateProcessor;
import org.apache.camel.processor.idempotent.IdempotentConsumer;
import org.apache.camel.spi.BrowsableEndpoint;
import org.apache.camel.spi.EventNotifier;
@@ -197,6 +199,8 @@ public class DefaultManagementObjectStrategy implements ManagementObjectStrategy
answer = new ManagedBeanProcessor(context, (BeanProcessor) target, definition);
} else if (target instanceof IdempotentConsumer) {
answer = new ManagedIdempotentConsumer(context, (IdempotentConsumer) target, definition);
+ } else if (target instanceof AggregateProcessor) {
+ answer = new ManagedAggregateProcessor(context, (AggregateProcessor) target, (org.apache.camel.model.AggregateDefinition) definition);
} else if (target instanceof org.apache.camel.spi.ManagementAware) {
return ((org.apache.camel.spi.ManagementAware<Processor>) target).getManagedObject(processor);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a664b08c/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
new file mode 100644
index 0000000..3b09c52
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import java.util.Set;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.ManagedAggregateProcessorMBean;
+import org.apache.camel.model.AggregateDefinition;
+import org.apache.camel.processor.aggregate.AggregateProcessor;
+import org.apache.camel.spi.ManagementStrategy;
+
+/**
+ * @version
+ */
+@ManagedResource(description = "Managed AggregateProcessor")
+public class ManagedAggregateProcessor extends ManagedProcessor implements ManagedAggregateProcessorMBean {
+ private final AggregateProcessor processor;
+
+ public ManagedAggregateProcessor(CamelContext context, AggregateProcessor processor, AggregateDefinition definition) {
+ super(context, processor, definition);
+ this.processor = processor;
+ }
+
+ public void init(ManagementStrategy strategy) {
+ super.init(strategy);
+ }
+
+ public AggregateProcessor getProcessor() {
+ return processor;
+ }
+
+ public String getCorrelationExpression() {
+ if (processor.getCorrelationExpression() != null) {
+ return processor.getCorrelationExpression().toString();
+ } else {
+ return null;
+ }
+ }
+
+ public long getCompletionTimeout() {
+ return processor.getCompletionTimeout();
+ }
+
+ public String getCompletionTimeoutExpression() {
+ if (processor.getCompletionTimeoutExpression() != null) {
+ return processor.getCompletionTimeoutExpression().toString();
+ } else {
+ return null;
+ }
+ }
+
+ public long getCompletionInterval() {
+ return processor.getCompletionInterval();
+ }
+
+ public int getCompletionSize() {
+ return processor.getCompletionSize();
+ }
+
+ public String getCompletionSizeExpression() {
+ if (processor.getCompletionSizeExpression() != null) {
+ return processor.getCompletionSizeExpression().toString();
+ } else {
+ return null;
+ }
+ }
+
+ public boolean isCompletionFromBatchConsumer() {
+ return processor.isCompletionFromBatchConsumer();
+ }
+
+ public boolean isIgnoreInvalidCorrelationKeys() {
+ return processor.isIgnoreInvalidCorrelationKeys();
+ }
+
+ public Integer getCloseCorrelationKeyOnCompletion() {
+ return processor.getCloseCorrelationKeyOnCompletion();
+ }
+
+ public boolean isParallelProcessing() {
+ return processor.isParallelProcessing();
+ }
+
+ public boolean isOptimisticLocking() {
+ return processor.isOptimisticLocking();
+ }
+
+ public boolean isEagerCheckCompletion() {
+ return processor.isEagerCheckCompletion();
+ }
+
+ public String getCompletionPredicate() {
+ if (processor.getCompletionPredicate() != null) {
+ return processor.getCompletionPredicate().toString();
+ } else {
+ return null;
+ }
+ }
+
+ public boolean isDiscardOnCompletionTimeout() {
+ return processor.isDiscardOnCompletionTimeout();
+ }
+
+ public boolean isForceCompletionOnStop() {
+ return processor.isCompletionFromBatchConsumer();
+ }
+
+ public int getInProgressCompleteExchanges() {
+ return processor.getInProgressCompleteExchanges();
+ }
+
+ public int aggregationRepositoryGroups() {
+ Set<String> keys = processor.getAggregationRepository().getKeys();
+ if (keys != null) {
+ return keys.size();
+ } else {
+ return 0;
+ }
+ }
+
+ public int forceCompletionOfGroup(String key) {
+ if (processor.getAggregateController() != null) {
+ return processor.getAggregateController().forceCompletionOfGroup(key);
+ } else {
+ return 0;
+ }
+ }
+
+ public int forceCompletionOfAllGroups() {
+ if (processor.getAggregateController() != null) {
+ return processor.getAggregateController().forceCompletionOfAllGroups();
+ } else {
+ return 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a664b08c/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 33c97b4..1a773ea 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -134,6 +134,13 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
private ProducerTemplate deadLetterProducerTemplate;
+ @Deprecated
+ public AggregateProcessor(CamelContext camelContext, Processor processor,
+ Expression correlationExpression, AggregationStrategy aggregationStrategy,
+ ExecutorService executorService, boolean shutdownExecutorService) {
+ this(camelContext, "aggregate", processor, correlationExpression, aggregationStrategy, executorService, shutdownExecutorService);
+ }
+
public AggregateProcessor(CamelContext camelContext, String id, Processor processor,
Expression correlationExpression, AggregationStrategy aggregationStrategy,
ExecutorService executorService, boolean shutdownExecutorService) {
@@ -618,6 +625,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
timeoutMap.put(key, exchange.getExchangeId(), timeout);
}
+ public int getInProgressCompleteExchanges() {
+ return inProgressCompleteExchanges.size();
+ }
+
public Predicate getCompletionPredicate() {
return completionPredicate;
}
@@ -1130,9 +1141,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
ServiceHelper.startService(timeoutMap);
}
- if (aggregateController != null) {
- aggregateController.onStart(id, this);
+ if (aggregateController == null) {
+ aggregateController = new DefaultAggregateController();
}
+ aggregateController.onStart(id, this);
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/a664b08c/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
new file mode 100644
index 0000000..7860eca
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregateController;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.DefaultAggregateController;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ManagedAggregateControllerTest extends ManagementTestSupport {
+
+ private AggregateController controller = new DefaultAggregateController();
+
+ @Test
+ public void testForceCompletionOfAll() throws Exception {
+ // JMX tests dont work well on AIX CI servers (hangs them)
+ if (isPlatform("aix")) {
+ return;
+ }
+
+ MBeanServer mbeanServer = getMBeanServer();
+
+ ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"myAggregator\"");
+ assertTrue(mbeanServer.isRegistered(on));
+
+ getMockEndpoint("mock:aggregated").expectedMessageCount(0);
+
+ template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test2", "id", "2");
+ template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test4", "id", "2");
+
+ getMockEndpoint("mock:aggregated").expectedMessageCount(2);
+ getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
+ getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+
+ Integer pending = (Integer) mbeanServer.invoke(on, "aggregationRepositoryGroups", null, null);
+ assertEquals(2, pending.intValue());
+
+ Integer groups = (Integer) mbeanServer.invoke(on, "forceCompletionOfAllGroups", null, null);
+ assertEquals(2, groups.intValue());
+
+ assertMockEndpointsSatisfied();
+
+ Long completed = (Long) mbeanServer.getAttribute(on, "ExchangesCompleted");
+ assertEquals(4, completed.longValue());
+
+ Integer size = (Integer) mbeanServer.getAttribute(on, "CompletionSize");
+ assertEquals(10, size.longValue());
+
+ String cor = (String) mbeanServer.getAttribute(on, "CorrelationExpression");
+ assertEquals("header(id)", cor);
+
+ Integer inflight = (Integer) mbeanServer.getAttribute(on, "InProgressCompleteExchanges");
+ assertEquals(0, inflight.intValue());
+
+ pending = (Integer) mbeanServer.invoke(on, "aggregationRepositoryGroups", null, null);
+ assertEquals(0, pending.intValue());
+ }
+
+ @Test
+ public void testForceCompletionOfGroup() throws Exception {
+ // JMX tests dont work well on AIX CI servers (hangs them)
+ if (isPlatform("aix")) {
+ return;
+ }
+
+ MBeanServer mbeanServer = getMBeanServer();
+
+ ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"myAggregator\"");
+ assertTrue(mbeanServer.isRegistered(on));
+
+ getMockEndpoint("mock:aggregated").expectedMessageCount(0);
+
+ template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test2", "id", "2");
+ template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test4", "id", "2");
+
+ assertMockEndpointsSatisfied();
+
+ getMockEndpoint("mock:aggregated").expectedMessageCount(1);
+ getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3");
+ getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+
+ Integer pending = (Integer) mbeanServer.invoke(on, "aggregationRepositoryGroups", null, null);
+ assertEquals(2, pending.intValue());
+
+ Integer groups = (Integer) mbeanServer.invoke(on, "forceCompletionOfGroup", new Object[]{"1"}, new String[]{"java.lang.String"});
+ assertEquals(1, groups.intValue());
+
+ assertMockEndpointsSatisfied();
+
+ Long completed = (Long) mbeanServer.getAttribute(on, "ExchangesCompleted");
+ assertEquals(4, completed.longValue());
+
+ Integer size = (Integer) mbeanServer.getAttribute(on, "CompletionSize");
+ assertEquals(10, size.longValue());
+
+ String cor = (String) mbeanServer.getAttribute(on, "CorrelationExpression");
+ assertEquals("header(id)", cor);
+
+ Integer inflight = (Integer) mbeanServer.getAttribute(on, "InProgressCompleteExchanges");
+ assertEquals(0, inflight.intValue());
+
+ pending = (Integer) mbeanServer.invoke(on, "aggregationRepositoryGroups", null, null);
+ assertEquals(1, pending.intValue());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new MyAggregationStrategy()).aggregateController(controller).id("myAggregator")
+ .completionSize(10)
+ .to("mock:aggregated");
+ }
+ };
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ return oldExchange;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/a664b08c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
index 9ae8b61..e541442 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
@@ -61,7 +61,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().contains("END");
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
ap.setCompletionPredicate(complete);
ap.setEagerCheckCompletion(false);
ap.start();
@@ -102,7 +102,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().isEqualTo("END");
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
ap.setCompletionPredicate(complete);
ap.setEagerCheckCompletion(true);
ap.start();
@@ -150,7 +150,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
ap.setCompletionSize(3);
ap.setEagerCheckCompletion(eager);
ap.start();
@@ -198,7 +198,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
ap.setCompletionTimeout(3000);
ap.setEagerCheckCompletion(eager);
ap.start();
@@ -247,7 +247,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
ap.setCompletionInterval(3000);
ap.start();
@@ -288,7 +288,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().contains("END");
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
ap.setCompletionPredicate(complete);
ap.setIgnoreInvalidCorrelationKeys(true);
@@ -328,7 +328,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().contains("END");
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
ap.setCompletionPredicate(complete);
ap.start();
@@ -373,7 +373,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
AggregationStrategy as = new BodyInAggregatingStrategy();
Predicate complete = body().contains("END");
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
ap.setCompletionPredicate(complete);
ap.setCloseCorrelationKeyOnCompletion(1000);
@@ -418,7 +418,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
ap.setCompletionSize(100);
ap.setCompletionFromBatchConsumer(true);
@@ -515,7 +515,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
ap.setEagerCheckCompletion(true);
ap.setCompletionPredicate(body().isEqualTo("END"));
if (handler != null) {
@@ -566,7 +566,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
ap.setCompletionSize(10);
ap.start();
http://git-wip-us.apache.org/repos/asf/camel/blob/a664b08c/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
index 987f9a6..45f0c47 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTimeoutCompletionRestartTest.java
@@ -59,7 +59,7 @@ public class AggregateProcessorTimeoutCompletionRestartTest extends ContextTestS
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
// start with a high timeout so no completes before we stop
ap.setCompletionTimeout(2000);
ap.start();
@@ -101,7 +101,7 @@ public class AggregateProcessorTimeoutCompletionRestartTest extends ContextTestS
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
// start with a high timeout so no completes before we stop
ap.setCompletionTimeoutExpression(header("myTimeout"));
ap.start();
@@ -145,7 +145,7 @@ public class AggregateProcessorTimeoutCompletionRestartTest extends ContextTestS
Expression corr = header("id");
AggregationStrategy as = new BodyInAggregatingStrategy();
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService, true);
+ AggregateProcessor ap = new AggregateProcessor(context, "a", done, corr, as, executorService, true);
// start with a high timeout so no completes before we stop
ap.setCompletionTimeoutExpression(header("myTimeout"));
ap.start();