You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/21 11:12:07 UTC
[1/2] camel git commit: CAMEL-8525: Aggregate - Expose statistics
about completed exchanges. Rename forceCompletion to force so its similar to
the others.
Repository: camel
Updated Branches:
refs/heads/master c0f1f02c1 -> 16cc8a920
CAMEL-8525: Aggregate - Expose statistics about completed exchanges. Rename forceCompletion to force so its similar to the others.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/94a6f9a0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/94a6f9a0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/94a6f9a0
Branch: refs/heads/master
Commit: 94a6f9a0c79a3981630f1e64cb0f3ae4281c50b4
Parents: c0f1f02
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Mar 21 10:49:52 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Mar 21 10:49:52 2015 +0100
----------------------------------------------------------------------
.../mbean/ManagedAggregateProcessorMBean.java | 37 +++++++
.../mbean/ManagedAggregateProcessor.java | 40 +++++++
.../processor/aggregate/AggregateProcessor.java | 105 ++++++++++++++++++-
.../aggregate/AggregateProcessorStatistics.java | 86 +++++++++++++++
.../ManagedAggregateControllerTest.java | 28 ++++-
.../aggregator/AggregateControllerTest.java | 4 +-
.../AggregateForceCompletionHeaderTest.java | 4 +-
.../aggregator/AggregateProcessorTest.java | 2 +-
...awtDBAggregateForceCompletionHeaderTest.java | 4 +-
...velDBAggregateForceCompletionHeaderTest.java | 4 +-
.../JdbcAggregateForceCompletionHeaderTest.java | 4 +-
11 files changed, 303 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
index f4bed8d..a1810c7 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
@@ -78,4 +78,41 @@ public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean {
@ManagedOperation(description = "To force complete of all groups")
int forceCompletionOfAllGroups();
+ @ManagedAttribute(description = "Total number of exchanges arrived into the aggregator")
+ long getTotalIn();
+
+ @ManagedAttribute(description = "Total number of exchanges completed and outgoing from the aggregator")
+ long getTotalCompleted();
+
+ @ManagedAttribute(description = "Total number of exchanged completed by completion size trigger")
+ long getCompletedBySize();
+
+ @ManagedAttribute(description = "Total number of exchanged completed by completion aggregation strategy trigger")
+ long getCompletedByStrategy();
+
+ @ManagedAttribute(description = "Total number of exchanged completed by completion interval (timeout) trigger")
+ long getCompletedByInterval();
+
+ @ManagedAttribute(description = "Total number of exchanged completed by completion timeout trigger")
+ long getCompletedByTimeout();
+
+ @ManagedAttribute(description = "Total number of exchanged completed by completion predicate trigger")
+ long getCompletedByPredicate();
+
+ @ManagedAttribute(description = "Total number of exchanged completed by completion batch consumer trigger")
+ long getCompletedByBatchConsumer();
+
+ @ManagedAttribute(description = "Total number of exchanged completed by completion force trigger")
+ long getCompletedByForce();
+
+ @ManagedOperation(description = " Reset the statistics counters")
+ void resetStatistics();
+
+ @ManagedAttribute(description = "Sets whether statistics is enabled")
+ boolean isStatisticsEnabled();
+
+ @ManagedAttribute(description = "Sets whether statistics is enabled")
+ void setStatisticsEnabled(boolean statisticsEnabled);
+
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
index 3b09c52..7a7705d 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
@@ -149,4 +149,44 @@ public class ManagedAggregateProcessor extends ManagedProcessor implements Manag
return 0;
}
}
+
+ public long getTotalIn() {
+ return processor.getStatistics().getTotalIn();
+ }
+
+ public long getTotalCompleted() {
+ return processor.getStatistics().getTotalCompleted();
+ }
+
+ public long getCompletedBySize() {
+ return processor.getStatistics().getCompletedBySize();
+ }
+
+ public long getCompletedByStrategy() {
+ return processor.getStatistics().getCompletedByStrategy();
+ }
+
+ public long getCompletedByInterval() {
+ return processor.getStatistics().getCompletedByInterval();
+ }
+
+ public long getCompletedByTimeout() {
+ return processor.getStatistics().getCompletedByTimeout();
+ }
+
+ public long getCompletedByPredicate() {
+ return processor.getStatistics().getCompletedByPredicate();
+ }
+
+ public long getCompletedByBatchConsumer() {
+ return processor.getStatistics().getCompletedByBatchConsumer();
+ }
+
+ public long getCompletedByForce() {
+ return processor.getStatistics().getCompletedByForce();
+ }
+
+ public void resetStatistics() {
+ processor.getStatistics().reset();
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 8e34284..73bf0da 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -106,11 +107,82 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
private final Set<String> inProgressCompleteExchanges = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>();
+ private final AggregateProcessorStatistics statistics = new Statistics();
+ private final AtomicLong totalIn = new AtomicLong();
+ private final AtomicLong totalCompleted = new AtomicLong();
+ private final AtomicLong completedBySize = new AtomicLong();
+ private final AtomicLong completedByStrategy = new AtomicLong();
+ private final AtomicLong completedByInterval = new AtomicLong();
+ private final AtomicLong completedByTimeout = new AtomicLong();
+ private final AtomicLong completedByPredicate = new AtomicLong();
+ private final AtomicLong completedByBatchConsumer = new AtomicLong();
+ private final AtomicLong completedByForce = new AtomicLong();
+
// keep booking about redelivery
private class RedeliveryData {
int redeliveryCounter;
}
+ private class Statistics implements AggregateProcessorStatistics {
+
+ private boolean statisticsEnabled = true;
+
+ public long getTotalIn() {
+ return totalIn.get();
+ }
+
+ public long getTotalCompleted() {
+ return totalCompleted.get();
+ }
+
+ public long getCompletedBySize() {
+ return completedBySize.get();
+ }
+
+ public long getCompletedByStrategy() {
+ return completedByStrategy.get();
+ }
+
+ public long getCompletedByInterval() {
+ return completedByInterval.get();
+ }
+
+ public long getCompletedByTimeout() {
+ return completedByTimeout.get();
+ }
+
+ public long getCompletedByPredicate() {
+ return completedByPredicate.get();
+ }
+
+ public long getCompletedByBatchConsumer() {
+ return completedByBatchConsumer.get();
+ }
+
+ public long getCompletedByForce() {
+ return completedByForce.get();
+ }
+
+ public void reset() {
+ totalIn.set(0);
+ totalCompleted.set(0);
+ completedBySize.set(0);
+ completedByStrategy.set(0);
+ completedByTimeout.set(0);
+ completedByPredicate.set(0);
+ completedByBatchConsumer.set(0);
+ completedByForce.set(0);
+ }
+
+ public boolean isStatisticsEnabled() {
+ return statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
+ this.statisticsEnabled = statisticsEnabled;
+ }
+ }
+
// options
private boolean ignoreInvalidCorrelationKeys;
private Integer closeCorrelationKeyOnCompletion;
@@ -187,6 +259,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
protected void doProcess(Exchange exchange) throws Exception {
+ if (getStatistics().isStatisticsEnabled()) {
+ totalIn.incrementAndGet();
+ }
+
//check for the special header to force completion of all groups (and ignore the exchange otherwise)
boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
if (completeAllGroups) {
@@ -541,6 +617,27 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
((CompletionAwareAggregationStrategy) aggregationStrategy).onCompletion(exchange);
}
+ if (getStatistics().isStatisticsEnabled()) {
+ totalCompleted.incrementAndGet();
+
+ String completedBy = exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class);
+ if ("interval".equals(completedBy)) {
+ completedByInterval.incrementAndGet();
+ } else if ("timeout".equals(completedBy)) {
+ completedByTimeout.incrementAndGet();
+ } else if ("force".equals(completedBy)) {
+ completedByForce.incrementAndGet();
+ } else if ("consumer".equals(completedBy)) {
+ completedByBatchConsumer.incrementAndGet();
+ } else if ("predicate".equals(completedBy)) {
+ completedByPredicate.incrementAndGet();
+ } else if ("size".equals(completedBy)) {
+ completedBySize.incrementAndGet();
+ } else if ("strategy".equals(completedBy)) {
+ completedByStrategy.incrementAndGet();
+ }
+ }
+
// send this exchange
executorService.submit(new Runnable() {
public void run() {
@@ -610,6 +707,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
timeoutMap.put(key, exchange.getExchangeId(), timeout);
}
+ public AggregateProcessorStatistics getStatistics() {
+ return statistics;
+ }
+
public int getInProgressCompleteExchanges() {
return inProgressCompleteExchanges.size();
}
@@ -1217,7 +1318,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
total = 1;
LOG.trace("Force completion triggered for correlation key: {}", key);
// indicate it was completed by a force completion request
- exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+ exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "force");
Exchange answer = onCompletion(key, exchange, exchange, false);
if (answer != null) {
onSubmitCompletion(key, answer);
@@ -1259,7 +1360,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
if (exchange != null) {
LOG.trace("Force completion triggered for correlation key: {}", key);
// indicate it was completed by a force completion request
- exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+ exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "force");
Exchange answer = onCompletion(key, exchange, exchange, false);
if (answer != null) {
onSubmitCompletion(key, answer);
http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
new file mode 100644
index 0000000..827a594
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+/**
+ * Various statistics of the aggregator
+ */
+public interface AggregateProcessorStatistics {
+
+ /**
+ * Total number of exchanges arrived into the aggregator
+ */
+ long getTotalIn();
+
+ /**
+ * Total number of exchanges completed and outgoing from the aggregator
+ */
+ long getTotalCompleted();
+
+ /**
+ * Total number of exchanged completed by completion size trigger
+ */
+ long getCompletedBySize();
+
+ /**
+ * Total number of exchanged completed by completion strategy trigger
+ */
+ long getCompletedByStrategy();
+
+ /**
+ * Total number of exchanged completed by completion interval trigger
+ */
+ long getCompletedByInterval();
+
+ /**
+ * Total number of exchanged completed by completion timeout trigger
+ */
+ long getCompletedByTimeout();
+
+ /**
+ * Total number of exchanged completed by completion predicate trigger
+ */
+ long getCompletedByPredicate();
+
+ /**
+ * Total number of exchanged completed by completion batch consumer trigger
+ */
+ long getCompletedByBatchConsumer();
+
+ /**
+ * Total number of exchanged completed by completion force trigger
+ */
+ long getCompletedByForce();
+
+ /**
+ * Reset the counters
+ */
+ void reset();
+
+ /**
+ * Whether statistics is enabled.
+ */
+ boolean isStatisticsEnabled();
+
+ /**
+ * Sets whether statistics is enabled.
+ *
+ * @param statisticsEnabled <tt>true</tt> to enable
+ */
+ void setStatisticsEnabled(boolean statisticsEnabled);
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
index 7860eca..2d783f6 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
@@ -54,7 +54,7 @@ public class ManagedAggregateControllerTest extends ManagementTestSupport {
getMockEndpoint("mock:aggregated").expectedMessageCount(2);
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
- getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+ getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
Integer pending = (Integer) mbeanServer.invoke(on, "aggregationRepositoryGroups", null, null);
assertEquals(2, pending.intValue());
@@ -67,6 +67,18 @@ public class ManagedAggregateControllerTest extends ManagementTestSupport {
Long completed = (Long) mbeanServer.getAttribute(on, "ExchangesCompleted");
assertEquals(4, completed.longValue());
+ completed = (Long) mbeanServer.getAttribute(on, "TotalCompleted");
+ assertEquals(2, completed.longValue());
+
+ Long in = (Long) mbeanServer.getAttribute(on, "TotalIn");
+ assertEquals(4, in.longValue());
+
+ Long byForced = (Long) mbeanServer.getAttribute(on, "CompletedByForce");
+ assertEquals(2, byForced.longValue());
+
+ Long bySize = (Long) mbeanServer.getAttribute(on, "CompletedBySize");
+ assertEquals(0, bySize.longValue());
+
Integer size = (Integer) mbeanServer.getAttribute(on, "CompletionSize");
assertEquals(10, size.longValue());
@@ -103,7 +115,7 @@ public class ManagedAggregateControllerTest extends ManagementTestSupport {
getMockEndpoint("mock:aggregated").expectedMessageCount(1);
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3");
- getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+ getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
Integer pending = (Integer) mbeanServer.invoke(on, "aggregationRepositoryGroups", null, null);
assertEquals(2, pending.intValue());
@@ -116,6 +128,18 @@ public class ManagedAggregateControllerTest extends ManagementTestSupport {
Long completed = (Long) mbeanServer.getAttribute(on, "ExchangesCompleted");
assertEquals(4, completed.longValue());
+ completed = (Long) mbeanServer.getAttribute(on, "TotalCompleted");
+ assertEquals(1, completed.longValue());
+
+ Long in = (Long) mbeanServer.getAttribute(on, "TotalIn");
+ assertEquals(4, in.longValue());
+
+ Long byForced = (Long) mbeanServer.getAttribute(on, "CompletedByForce");
+ assertEquals(1, byForced.longValue());
+
+ Long bySize = (Long) mbeanServer.getAttribute(on, "CompletedBySize");
+ assertEquals(0, bySize.longValue());
+
Integer size = (Integer) mbeanServer.getAttribute(on, "CompletionSize");
assertEquals(10, size.longValue());
http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
index e9dee08..dbdad1f 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
@@ -51,7 +51,7 @@ public class AggregateControllerTest extends ContextTestSupport {
getMockEndpoint("mock:aggregated").expectedMessageCount(2);
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
- getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+ getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
int groups = getAggregateController().forceCompletionOfAllGroups();
assertEquals(2, groups);
@@ -72,7 +72,7 @@ public class AggregateControllerTest extends ContextTestSupport {
getMockEndpoint("mock:aggregated").expectedMessageCount(1);
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3");
- getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+ getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
int groups = getAggregateController().forceCompletionOfGroup("1");
assertEquals(1, groups);
http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
index 308fb47..2971825 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
@@ -44,7 +44,7 @@ public class AggregateForceCompletionHeaderTest extends ContextTestSupport {
getMockEndpoint("mock:aggregated").expectedMessageCount(2);
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
- getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+ getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
//now send the signal message to trigger completion of all groups, message should NOT be aggregated
template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
@@ -66,7 +66,7 @@ public class AggregateForceCompletionHeaderTest extends ContextTestSupport {
getMockEndpoint("mock:aggregated").expectedMessageCount(3);
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5");
- getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+ getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
//now send a message to trigger completion of all groups, message should be aggregated
Map<String, Object> headers = new HashMap<String, Object>();
http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
index 9ae8b61..daef63d 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
@@ -560,7 +560,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceivedInAnyOrder("B+END", "A+END");
- mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+ mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
Processor done = new SendProcessor(context.getEndpoint("mock:result"));
Expression corr = header("id");
http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java b/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
index ee646e2..2450856 100644
--- a/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
+++ b/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
@@ -50,7 +50,7 @@ public class HawtDBAggregateForceCompletionHeaderTest extends CamelTestSupport {
getMockEndpoint("mock:aggregated").expectedMessageCount(2);
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
- getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+ getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
//now send the signal message to trigger completion of all groups, message should NOT be aggregated
template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
@@ -72,7 +72,7 @@ public class HawtDBAggregateForceCompletionHeaderTest extends CamelTestSupport {
getMockEndpoint("mock:aggregated").expectedMessageCount(3);
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5");
- getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+ getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
//now send a message to trigger completion of all groups, message should be aggregated
Map<String, Object> headers = new HashMap<String, Object>();
http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
----------------------------------------------------------------------
diff --git a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
index b88e4d4..699c7eb 100644
--- a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
+++ b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
@@ -50,7 +50,7 @@ public class LevelDBAggregateForceCompletionHeaderTest extends CamelTestSupport
getMockEndpoint("mock:aggregated").expectedMessageCount(2);
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
- getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+ getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
//now send the signal message to trigger completion of all groups, message should NOT be aggregated
template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
@@ -72,7 +72,7 @@ public class LevelDBAggregateForceCompletionHeaderTest extends CamelTestSupport
getMockEndpoint("mock:aggregated").expectedMessageCount(3);
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5");
- getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+ getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
//now send a message to trigger completion of all groups, message should be aggregated
Map<String, Object> headers = new HashMap<String, Object>();
http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
index 7a62cfb..7156a3c 100644
--- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
+++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
@@ -42,7 +42,7 @@ public class JdbcAggregateForceCompletionHeaderTest extends AbstractJdbcAggregat
getMockEndpoint("mock:aggregated").expectedMessageCount(2);
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
- getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+ getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
//now send the signal message to trigger completion of all groups, message should NOT be aggregated
template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
@@ -64,7 +64,7 @@ public class JdbcAggregateForceCompletionHeaderTest extends AbstractJdbcAggregat
getMockEndpoint("mock:aggregated").expectedMessageCount(3);
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5");
- getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+ getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
//now send a message to trigger completion of all groups, message should be aggregated
Map<String, Object> headers = new HashMap<String, Object>();
[2/2] camel git commit: Fixed test
Posted by da...@apache.org.
Fixed test
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/16cc8a92
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/16cc8a92
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/16cc8a92
Branch: refs/heads/master
Commit: 16cc8a9202a756def094735cee6356d306c20b03
Parents: 94a6f9a
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Mar 21 11:13:52 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Mar 21 11:13:52 2015 +0100
----------------------------------------------------------------------
.../processor/async/AsyncProcessorAwaitManagerInterruptTest.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/16cc8a92/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
index 2e24346..3d5939a 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
@@ -42,7 +42,6 @@ public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport
try {
template.requestBody("direct:start", "Hello Camel", String.class);
- fail("Should have thrown exception");
} catch (CamelExecutionException e) {
RejectedExecutionException cause = assertIsInstanceOf(RejectedExecutionException.class, e.getCause());
assertTrue(cause.getMessage().startsWith("Interrupted while waiting for asynchronous callback"));