You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/21 11:12:07 UTC

[1/2] camel git commit: CAMEL-8525: Aggregate - Expose statistics about completed exchanges. Rename forceCompletion to force so its similar to the others.

Repository: camel
Updated Branches:
  refs/heads/master c0f1f02c1 -> 16cc8a920


CAMEL-8525: Aggregate - Expose statistics about completed exchanges. Rename forceCompletion to force so its similar to the others.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/94a6f9a0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/94a6f9a0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/94a6f9a0

Branch: refs/heads/master
Commit: 94a6f9a0c79a3981630f1e64cb0f3ae4281c50b4
Parents: c0f1f02
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Mar 21 10:49:52 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Mar 21 10:49:52 2015 +0100

----------------------------------------------------------------------
 .../mbean/ManagedAggregateProcessorMBean.java   |  37 +++++++
 .../mbean/ManagedAggregateProcessor.java        |  40 +++++++
 .../processor/aggregate/AggregateProcessor.java | 105 ++++++++++++++++++-
 .../aggregate/AggregateProcessorStatistics.java |  86 +++++++++++++++
 .../ManagedAggregateControllerTest.java         |  28 ++++-
 .../aggregator/AggregateControllerTest.java     |   4 +-
 .../AggregateForceCompletionHeaderTest.java     |   4 +-
 .../aggregator/AggregateProcessorTest.java      |   2 +-
 ...awtDBAggregateForceCompletionHeaderTest.java |   4 +-
 ...velDBAggregateForceCompletionHeaderTest.java |   4 +-
 .../JdbcAggregateForceCompletionHeaderTest.java |   4 +-
 11 files changed, 303 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
index f4bed8d..a1810c7 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
@@ -78,4 +78,41 @@ public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean {
     @ManagedOperation(description = "To force complete of all groups")
     int forceCompletionOfAllGroups();
 
+    @ManagedAttribute(description = "Total number of exchanges arrived into the aggregator")
+    long getTotalIn();
+
+    @ManagedAttribute(description = "Total number of exchanges completed and outgoing from the aggregator")
+    long getTotalCompleted();
+
+    @ManagedAttribute(description = "Total number of exchanged completed by completion size trigger")
+    long getCompletedBySize();
+
+    @ManagedAttribute(description = "Total number of exchanged completed by completion aggregation strategy trigger")
+    long getCompletedByStrategy();
+
+    @ManagedAttribute(description = "Total number of exchanged completed by completion interval (timeout) trigger")
+    long getCompletedByInterval();
+
+    @ManagedAttribute(description = "Total number of exchanged completed by completion timeout trigger")
+    long getCompletedByTimeout();
+
+    @ManagedAttribute(description = "Total number of exchanged completed by completion predicate trigger")
+    long getCompletedByPredicate();
+
+    @ManagedAttribute(description = "Total number of exchanged completed by completion batch consumer trigger")
+    long getCompletedByBatchConsumer();
+
+    @ManagedAttribute(description = "Total number of exchanged completed by completion force trigger")
+    long getCompletedByForce();
+
+    @ManagedOperation(description = " Reset the statistics counters")
+    void resetStatistics();
+
+    @ManagedAttribute(description = "Sets whether statistics is enabled")
+    boolean isStatisticsEnabled();
+
+    @ManagedAttribute(description = "Sets whether statistics is enabled")
+    void setStatisticsEnabled(boolean statisticsEnabled);
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
index 3b09c52..7a7705d 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
@@ -149,4 +149,44 @@ public class ManagedAggregateProcessor extends ManagedProcessor implements Manag
             return 0;
         }
     }
+
+    public long getTotalIn() {
+        return processor.getStatistics().getTotalIn();
+    }
+
+    public long getTotalCompleted() {
+        return processor.getStatistics().getTotalCompleted();
+    }
+
+    public long getCompletedBySize() {
+        return processor.getStatistics().getCompletedBySize();
+    }
+
+    public long getCompletedByStrategy() {
+        return processor.getStatistics().getCompletedByStrategy();
+    }
+
+    public long getCompletedByInterval() {
+        return processor.getStatistics().getCompletedByInterval();
+    }
+
+    public long getCompletedByTimeout() {
+        return processor.getStatistics().getCompletedByTimeout();
+    }
+
+    public long getCompletedByPredicate() {
+        return processor.getStatistics().getCompletedByPredicate();
+    }
+
+    public long getCompletedByBatchConsumer() {
+        return processor.getStatistics().getCompletedByBatchConsumer();
+    }
+
+    public long getCompletedByForce() {
+        return processor.getStatistics().getCompletedByForce();
+    }
+
+    public void resetStatistics() {
+        processor.getStatistics().reset();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 8e34284..73bf0da 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -106,11 +107,82 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
     private final Set<String> inProgressCompleteExchanges = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
     private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>();
 
+    private final AggregateProcessorStatistics statistics = new Statistics();
+    private final AtomicLong totalIn = new AtomicLong();
+    private final AtomicLong totalCompleted = new AtomicLong();
+    private final AtomicLong completedBySize = new AtomicLong();
+    private final AtomicLong completedByStrategy = new AtomicLong();
+    private final AtomicLong completedByInterval = new AtomicLong();
+    private final AtomicLong completedByTimeout = new AtomicLong();
+    private final AtomicLong completedByPredicate = new AtomicLong();
+    private final AtomicLong completedByBatchConsumer = new AtomicLong();
+    private final AtomicLong completedByForce = new AtomicLong();
+
     // keep booking about redelivery
     private class RedeliveryData {
         int redeliveryCounter;
     }
 
+    private class Statistics implements AggregateProcessorStatistics {
+
+        private boolean statisticsEnabled = true;
+
+        public long getTotalIn() {
+            return totalIn.get();
+        }
+
+        public long getTotalCompleted() {
+            return totalCompleted.get();
+        }
+
+        public long getCompletedBySize() {
+            return completedBySize.get();
+        }
+
+        public long getCompletedByStrategy() {
+            return completedByStrategy.get();
+        }
+
+        public long getCompletedByInterval() {
+            return completedByInterval.get();
+        }
+
+        public long getCompletedByTimeout() {
+            return completedByTimeout.get();
+        }
+
+        public long getCompletedByPredicate() {
+            return completedByPredicate.get();
+        }
+
+        public long getCompletedByBatchConsumer() {
+            return completedByBatchConsumer.get();
+        }
+
+        public long getCompletedByForce() {
+            return completedByForce.get();
+        }
+
+        public void reset() {
+            totalIn.set(0);
+            totalCompleted.set(0);
+            completedBySize.set(0);
+            completedByStrategy.set(0);
+            completedByTimeout.set(0);
+            completedByPredicate.set(0);
+            completedByBatchConsumer.set(0);
+            completedByForce.set(0);
+        }
+
+        public boolean isStatisticsEnabled() {
+            return statisticsEnabled;
+        }
+
+        public void setStatisticsEnabled(boolean statisticsEnabled) {
+            this.statisticsEnabled = statisticsEnabled;
+        }
+    }
+
     // options
     private boolean ignoreInvalidCorrelationKeys;
     private Integer closeCorrelationKeyOnCompletion;
@@ -187,6 +259,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
 
     protected void doProcess(Exchange exchange) throws Exception {
 
+        if (getStatistics().isStatisticsEnabled()) {
+            totalIn.incrementAndGet();
+        }
+
         //check for the special header to force completion of all groups (and ignore the exchange otherwise)
         boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
         if (completeAllGroups) {
@@ -541,6 +617,27 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             ((CompletionAwareAggregationStrategy) aggregationStrategy).onCompletion(exchange);
         }
 
+        if (getStatistics().isStatisticsEnabled()) {
+            totalCompleted.incrementAndGet();
+
+            String completedBy = exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class);
+            if ("interval".equals(completedBy)) {
+                completedByInterval.incrementAndGet();
+            } else if ("timeout".equals(completedBy)) {
+                completedByTimeout.incrementAndGet();
+            } else if ("force".equals(completedBy)) {
+                completedByForce.incrementAndGet();
+            } else if ("consumer".equals(completedBy)) {
+                completedByBatchConsumer.incrementAndGet();
+            } else if ("predicate".equals(completedBy)) {
+                completedByPredicate.incrementAndGet();
+            } else if ("size".equals(completedBy)) {
+                completedBySize.incrementAndGet();
+            } else if ("strategy".equals(completedBy)) {
+                completedByStrategy.incrementAndGet();
+            }
+        }
+
         // send this exchange
         executorService.submit(new Runnable() {
             public void run() {
@@ -610,6 +707,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         timeoutMap.put(key, exchange.getExchangeId(), timeout);
     }
 
+    public AggregateProcessorStatistics getStatistics() {
+        return statistics;
+    }
+
     public int getInProgressCompleteExchanges() {
         return inProgressCompleteExchanges.size();
     }
@@ -1217,7 +1318,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
                 total = 1;
                 LOG.trace("Force completion triggered for correlation key: {}", key);
                 // indicate it was completed by a force completion request
-                exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+                exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "force");
                 Exchange answer = onCompletion(key, exchange, exchange, false);
                 if (answer != null) {
                     onSubmitCompletion(key, answer);
@@ -1259,7 +1360,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
                     if (exchange != null) {
                         LOG.trace("Force completion triggered for correlation key: {}", key);
                         // indicate it was completed by a force completion request
-                        exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+                        exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "force");
                         Exchange answer = onCompletion(key, exchange, exchange, false);
                         if (answer != null) {
                             onSubmitCompletion(key, answer);

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
new file mode 100644
index 0000000..827a594
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+/**
+ * Various statistics of the aggregator
+ */
+public interface AggregateProcessorStatistics {
+
+    /**
+     * Total number of exchanges arrived into the aggregator
+     */
+    long getTotalIn();
+
+    /**
+     * Total number of exchanges completed and outgoing from the aggregator
+     */
+    long getTotalCompleted();
+
+    /**
+     * Total number of exchanged completed by completion size trigger
+     */
+    long getCompletedBySize();
+
+    /**
+     * Total number of exchanged completed by completion strategy trigger
+     */
+    long getCompletedByStrategy();
+
+    /**
+     * Total number of exchanged completed by completion interval trigger
+     */
+    long getCompletedByInterval();
+
+    /**
+     * Total number of exchanged completed by completion timeout trigger
+     */
+    long getCompletedByTimeout();
+
+    /**
+     * Total number of exchanged completed by completion predicate trigger
+     */
+    long getCompletedByPredicate();
+
+    /**
+     * Total number of exchanged completed by completion batch consumer trigger
+     */
+    long getCompletedByBatchConsumer();
+
+    /**
+     * Total number of exchanged completed by completion force trigger
+     */
+    long getCompletedByForce();
+
+    /**
+     * Reset the counters
+     */
+    void reset();
+
+    /**
+     * Whether statistics is enabled.
+     */
+    boolean isStatisticsEnabled();
+
+    /**
+     * Sets whether statistics is enabled.
+     *
+     * @param statisticsEnabled <tt>true</tt> to enable
+     */
+    void setStatisticsEnabled(boolean statisticsEnabled);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
index 7860eca..2d783f6 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedAggregateControllerTest.java
@@ -54,7 +54,7 @@ public class ManagedAggregateControllerTest extends ManagementTestSupport {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
-        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
 
         Integer pending = (Integer) mbeanServer.invoke(on, "aggregationRepositoryGroups", null, null);
         assertEquals(2, pending.intValue());
@@ -67,6 +67,18 @@ public class ManagedAggregateControllerTest extends ManagementTestSupport {
         Long completed = (Long) mbeanServer.getAttribute(on, "ExchangesCompleted");
         assertEquals(4, completed.longValue());
 
+        completed = (Long) mbeanServer.getAttribute(on, "TotalCompleted");
+        assertEquals(2, completed.longValue());
+
+        Long in = (Long) mbeanServer.getAttribute(on, "TotalIn");
+        assertEquals(4, in.longValue());
+
+        Long byForced = (Long) mbeanServer.getAttribute(on, "CompletedByForce");
+        assertEquals(2, byForced.longValue());
+
+        Long bySize = (Long) mbeanServer.getAttribute(on, "CompletedBySize");
+        assertEquals(0, bySize.longValue());
+
         Integer size = (Integer) mbeanServer.getAttribute(on, "CompletionSize");
         assertEquals(10, size.longValue());
 
@@ -103,7 +115,7 @@ public class ManagedAggregateControllerTest extends ManagementTestSupport {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(1);
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3");
-        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
 
         Integer pending = (Integer) mbeanServer.invoke(on, "aggregationRepositoryGroups", null, null);
         assertEquals(2, pending.intValue());
@@ -116,6 +128,18 @@ public class ManagedAggregateControllerTest extends ManagementTestSupport {
         Long completed = (Long) mbeanServer.getAttribute(on, "ExchangesCompleted");
         assertEquals(4, completed.longValue());
 
+        completed = (Long) mbeanServer.getAttribute(on, "TotalCompleted");
+        assertEquals(1, completed.longValue());
+
+        Long in = (Long) mbeanServer.getAttribute(on, "TotalIn");
+        assertEquals(4, in.longValue());
+
+        Long byForced = (Long) mbeanServer.getAttribute(on, "CompletedByForce");
+        assertEquals(1, byForced.longValue());
+
+        Long bySize = (Long) mbeanServer.getAttribute(on, "CompletedBySize");
+        assertEquals(0, bySize.longValue());
+
         Integer size = (Integer) mbeanServer.getAttribute(on, "CompletionSize");
         assertEquals(10, size.longValue());
 

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
index e9dee08..dbdad1f 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
@@ -51,7 +51,7 @@ public class AggregateControllerTest extends ContextTestSupport {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
-        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
 
         int groups = getAggregateController().forceCompletionOfAllGroups();
         assertEquals(2, groups);
@@ -72,7 +72,7 @@ public class AggregateControllerTest extends ContextTestSupport {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(1);
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3");
-        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
 
         int groups = getAggregateController().forceCompletionOfGroup("1");
         assertEquals(1, groups);

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
index 308fb47..2971825 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionHeaderTest.java
@@ -44,7 +44,7 @@ public class AggregateForceCompletionHeaderTest extends ContextTestSupport {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
-        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
 
         //now send the signal message to trigger completion of all groups, message should NOT be aggregated
         template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
@@ -66,7 +66,7 @@ public class AggregateForceCompletionHeaderTest extends ContextTestSupport {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(3);
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5");
-        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
 
         //now send a message to trigger completion of all groups, message should be aggregated
         Map<String, Object> headers = new HashMap<String, Object>();

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
index 9ae8b61..daef63d 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
@@ -560,7 +560,7 @@ public class AggregateProcessorTest extends ContextTestSupport {
 
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceivedInAnyOrder("B+END", "A+END");
-        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
 
         Processor done = new SendProcessor(context.getEndpoint("mock:result"));
         Expression corr = header("id");

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java b/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
index ee646e2..2450856 100644
--- a/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
+++ b/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateForceCompletionHeaderTest.java
@@ -50,7 +50,7 @@ public class HawtDBAggregateForceCompletionHeaderTest extends CamelTestSupport {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
-        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
 
         //now send the signal message to trigger completion of all groups, message should NOT be aggregated
         template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
@@ -72,7 +72,7 @@ public class HawtDBAggregateForceCompletionHeaderTest extends CamelTestSupport {
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(3);
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5");
-        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
 
         //now send a message to trigger completion of all groups, message should be aggregated
         Map<String, Object> headers = new HashMap<String, Object>();

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
----------------------------------------------------------------------
diff --git a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
index b88e4d4..699c7eb 100644
--- a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
+++ b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateForceCompletionHeaderTest.java
@@ -50,7 +50,7 @@ public class LevelDBAggregateForceCompletionHeaderTest extends CamelTestSupport
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
-        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
 
         //now send the signal message to trigger completion of all groups, message should NOT be aggregated
         template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
@@ -72,7 +72,7 @@ public class LevelDBAggregateForceCompletionHeaderTest extends CamelTestSupport
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(3);
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5");
-        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
 
         //now send a message to trigger completion of all groups, message should be aggregated
         Map<String, Object> headers = new HashMap<String, Object>();

http://git-wip-us.apache.org/repos/asf/camel/blob/94a6f9a0/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
index 7a62cfb..7156a3c 100644
--- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
+++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateForceCompletionHeaderTest.java
@@ -42,7 +42,7 @@ public class JdbcAggregateForceCompletionHeaderTest extends AbstractJdbcAggregat
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(2);
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4");
-        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
 
         //now send the signal message to trigger completion of all groups, message should NOT be aggregated
         template.sendBodyAndHeader("direct:start", "test5", Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
@@ -64,7 +64,7 @@ public class JdbcAggregateForceCompletionHeaderTest extends AbstractJdbcAggregat
 
         getMockEndpoint("mock:aggregated").expectedMessageCount(3);
         getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3", "test2test4", "test5");
-        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "force");
 
         //now send a message to trigger completion of all groups, message should be aggregated
         Map<String, Object> headers = new HashMap<String, Object>();


[2/2] camel git commit: Fixed test

Posted by da...@apache.org.
Fixed test


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/16cc8a92
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/16cc8a92
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/16cc8a92

Branch: refs/heads/master
Commit: 16cc8a9202a756def094735cee6356d306c20b03
Parents: 94a6f9a
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Mar 21 11:13:52 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Mar 21 11:13:52 2015 +0100

----------------------------------------------------------------------
 .../processor/async/AsyncProcessorAwaitManagerInterruptTest.java    | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/16cc8a92/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
index 2e24346..3d5939a 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
@@ -42,7 +42,6 @@ public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport
 
         try {
             template.requestBody("direct:start", "Hello Camel", String.class);
-            fail("Should have thrown exception");
         } catch (CamelExecutionException e) {
             RejectedExecutionException cause = assertIsInstanceOf(RejectedExecutionException.class, e.getCause());
             assertTrue(cause.getMessage().startsWith("Interrupted while waiting for asynchronous callback"));