You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/06 09:21:36 UTC

[camel] 02/02: CAMEL-10533: AggregateController - Add forceDiscardOfGroup method

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4fdd73f592d496789afb95b767703dca741ef4dd
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Aug 6 11:21:16 2019 +0200

    CAMEL-10533: AggregateController - Add forceDiscardOfGroup method
---
 .../camel/processor/aggregate/AggregateProcessor.java       | 13 ++++++++++++-
 .../processor/aggregate/AggregateProcessorStatistics.java   |  5 +++++
 .../management/mbean/ManagedAggregateProcessorMBean.java    |  3 +++
 .../camel/management/mbean/ManagedAggregateProcessor.java   |  5 +++++
 4 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index edcd71c..ad0f96d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -133,6 +133,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
     private final AtomicLong completedByPredicate = new AtomicLong();
     private final AtomicLong completedByBatchConsumer = new AtomicLong();
     private final AtomicLong completedByForce = new AtomicLong();
+    private final AtomicLong discarded = new AtomicLong();
 
     // keep booking about redelivery
     private class RedeliveryData {
@@ -189,6 +190,11 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
         }
 
         @Override
+        public long getDiscarded() {
+            return discarded.get();
+        }
+
+        @Override
         public void reset() {
             totalIn.set(0);
             totalCompleted.set(0);
@@ -198,6 +204,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
             completedByPredicate.set(0);
             completedByBatchConsumer.set(0);
             completedByForce.set(0);
+            discarded.set(0);
         }
 
         @Override
@@ -733,6 +740,8 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
 
         Exchange answer;
         if (fromTimeout && isDiscardOnCompletionTimeout()) {
+            // this exchange is discarded
+            discarded.incrementAndGet();
             // discard due timeout
             log.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated);
             // must confirm the discarded exchange
@@ -742,7 +751,9 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
             // the completion was from timeout and we should just discard it
             answer = null;
         } else if (aggregateFailed && isDiscardOnAggregationFailure()) {
-            // discard due aggregation failed
+            // this exchange is discarded
+            discarded.incrementAndGet();
+            // discard due aggregation failed (or by force)
             log.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated);
             // must confirm the discarded exchange
             aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId());
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
index 9cb3d50..c082613 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
@@ -67,6 +67,11 @@ public interface AggregateProcessorStatistics {
     long getCompletedByForce();
 
     /**
+     * Total number of exchanged discarded
+     */
+    long getDiscarded();
+
+    /**
      * Reset the counters
      */
     void reset();
diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
index fabf1d3..1db37b2 100644
--- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
@@ -138,6 +138,9 @@ public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean {
     @ManagedAttribute(description = "Total number of exchanged completed by completion force trigger")
     long getCompletedByForce();
 
+    @ManagedAttribute(description = "Total number of exchanged discarded")
+    long getDiscarded();
+
     @ManagedOperation(description = " Reset the statistics counters")
     void resetStatistics();
 
diff --git a/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java b/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
index e509319..468c00e 100644
--- a/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
+++ b/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
@@ -298,6 +298,11 @@ public class ManagedAggregateProcessor extends ManagedProcessor implements Manag
     }
 
     @Override
+    public long getDiscarded() {
+        return processor.getStatistics().getDiscarded();
+    }
+
+    @Override
     public void resetStatistics() {
         processor.getStatistics().reset();
     }