You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/06 09:21:36 UTC
[camel] 02/02: CAMEL-10533: AggregateController - Add
forceDiscardOfGroup method
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4fdd73f592d496789afb95b767703dca741ef4dd
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Aug 6 11:21:16 2019 +0200
CAMEL-10533: AggregateController - Add forceDiscardOfGroup method
---
.../camel/processor/aggregate/AggregateProcessor.java | 13 ++++++++++++-
.../processor/aggregate/AggregateProcessorStatistics.java | 5 +++++
.../management/mbean/ManagedAggregateProcessorMBean.java | 3 +++
.../camel/management/mbean/ManagedAggregateProcessor.java | 5 +++++
4 files changed, 25 insertions(+), 1 deletion(-)
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index edcd71c..ad0f96d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -133,6 +133,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
private final AtomicLong completedByPredicate = new AtomicLong();
private final AtomicLong completedByBatchConsumer = new AtomicLong();
private final AtomicLong completedByForce = new AtomicLong();
+ private final AtomicLong discarded = new AtomicLong();
// keep booking about redelivery
private class RedeliveryData {
@@ -189,6 +190,11 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
}
@Override
+ public long getDiscarded() {
+ return discarded.get();
+ }
+
+ @Override
public void reset() {
totalIn.set(0);
totalCompleted.set(0);
@@ -198,6 +204,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
completedByPredicate.set(0);
completedByBatchConsumer.set(0);
completedByForce.set(0);
+ discarded.set(0);
}
@Override
@@ -733,6 +740,8 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
Exchange answer;
if (fromTimeout && isDiscardOnCompletionTimeout()) {
+ // this exchange is discarded
+ discarded.incrementAndGet();
// discard due timeout
log.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated);
// must confirm the discarded exchange
@@ -742,7 +751,9 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
// the completion was from timeout and we should just discard it
answer = null;
} else if (aggregateFailed && isDiscardOnAggregationFailure()) {
- // discard due aggregation failed
+ // this exchange is discarded
+ discarded.incrementAndGet();
+ // discard due aggregation failed (or by force)
log.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated);
// must confirm the discarded exchange
aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId());
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
index 9cb3d50..c082613 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
@@ -67,6 +67,11 @@ public interface AggregateProcessorStatistics {
long getCompletedByForce();
/**
+ * Total number of exchanged discarded
+ */
+ long getDiscarded();
+
+ /**
* Reset the counters
*/
void reset();
diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
index fabf1d3..1db37b2 100644
--- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
@@ -138,6 +138,9 @@ public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean {
@ManagedAttribute(description = "Total number of exchanged completed by completion force trigger")
long getCompletedByForce();
+ @ManagedAttribute(description = "Total number of exchanged discarded")
+ long getDiscarded();
+
@ManagedOperation(description = " Reset the statistics counters")
void resetStatistics();
diff --git a/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java b/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
index e509319..468c00e 100644
--- a/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
+++ b/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
@@ -298,6 +298,11 @@ public class ManagedAggregateProcessor extends ManagedProcessor implements Manag
}
@Override
+ public long getDiscarded() {
+ return processor.getStatistics().getDiscarded();
+ }
+
+ @Override
public void resetStatistics() {
processor.getStatistics().reset();
}