You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/06 09:21:34 UTC

[camel] branch master updated (080ad19 -> 4fdd73f)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 080ad19  CAMEL-10456: Restoring old TCCL should store old value regardless if it was null, as otherwise it can leak changing the TCCL.
     new 3e4cf07  CAMEL-10533: AggregateController - Add forceDiscardOfGroup method
     new 4fdd73f  CAMEL-10533: AggregateController - Add forceDiscardOfGroup method

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../processor/aggregate/AggregateController.java   | 15 +++++
 .../processor/aggregate/AggregateProcessor.java    | 78 +++++++++++++++++++++-
 .../aggregate/AggregateProcessorStatistics.java    |  5 ++
 .../aggregate/DefaultAggregateController.java      | 17 +++++
 .../aggregator/AggregateControllerTest.java        | 63 ++++++++++++++++-
 .../mbean/ManagedAggregateProcessorMBean.java      |  9 +++
 .../mbean/ManagedAggregateProcessor.java           | 23 +++++++
 7 files changed, 207 insertions(+), 3 deletions(-)


[camel] 02/02: CAMEL-10533: AggregateController - Add forceDiscardOfGroup method

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4fdd73f592d496789afb95b767703dca741ef4dd
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Aug 6 11:21:16 2019 +0200

    CAMEL-10533: AggregateController - Add forceDiscardOfGroup method
---
 .../camel/processor/aggregate/AggregateProcessor.java       | 13 ++++++++++++-
 .../processor/aggregate/AggregateProcessorStatistics.java   |  5 +++++
 .../management/mbean/ManagedAggregateProcessorMBean.java    |  3 +++
 .../camel/management/mbean/ManagedAggregateProcessor.java   |  5 +++++
 4 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index edcd71c..ad0f96d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -133,6 +133,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
     private final AtomicLong completedByPredicate = new AtomicLong();
     private final AtomicLong completedByBatchConsumer = new AtomicLong();
     private final AtomicLong completedByForce = new AtomicLong();
+    private final AtomicLong discarded = new AtomicLong();
 
     // keep booking about redelivery
     private class RedeliveryData {
@@ -189,6 +190,11 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
         }
 
         @Override
+        public long getDiscarded() {
+            return discarded.get();
+        }
+
+        @Override
         public void reset() {
             totalIn.set(0);
             totalCompleted.set(0);
@@ -198,6 +204,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
             completedByPredicate.set(0);
             completedByBatchConsumer.set(0);
             completedByForce.set(0);
+            discarded.set(0);
         }
 
         @Override
@@ -733,6 +740,8 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
 
         Exchange answer;
         if (fromTimeout && isDiscardOnCompletionTimeout()) {
+            // this exchange is discarded
+            discarded.incrementAndGet();
             // discard due timeout
             log.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated);
             // must confirm the discarded exchange
@@ -742,7 +751,9 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
             // the completion was from timeout and we should just discard it
             answer = null;
         } else if (aggregateFailed && isDiscardOnAggregationFailure()) {
-            // discard due aggregation failed
+            // this exchange is discarded
+            discarded.incrementAndGet();
+            // discard due aggregation failed (or by force)
             log.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated);
             // must confirm the discarded exchange
             aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId());
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
index 9cb3d50..c082613 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java
@@ -67,6 +67,11 @@ public interface AggregateProcessorStatistics {
     long getCompletedByForce();
 
     /**
+     * Total number of exchanged discarded
+     */
+    long getDiscarded();
+
+    /**
      * Reset the counters
      */
     void reset();
diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
index fabf1d3..1db37b2 100644
--- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
@@ -138,6 +138,9 @@ public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean {
     @ManagedAttribute(description = "Total number of exchanged completed by completion force trigger")
     long getCompletedByForce();
 
+    @ManagedAttribute(description = "Total number of exchanged discarded")
+    long getDiscarded();
+
     @ManagedOperation(description = " Reset the statistics counters")
     void resetStatistics();
 
diff --git a/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java b/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
index e509319..468c00e 100644
--- a/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
+++ b/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
@@ -298,6 +298,11 @@ public class ManagedAggregateProcessor extends ManagedProcessor implements Manag
     }
 
     @Override
+    public long getDiscarded() {
+        return processor.getStatistics().getDiscarded();
+    }
+
+    @Override
     public void resetStatistics() {
         processor.getStatistics().reset();
     }


[camel] 01/02: CAMEL-10533: AggregateController - Add forceDiscardOfGroup method

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3e4cf070d0f866843650ded6bef8893b21e10276
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Aug 6 11:17:47 2019 +0200

    CAMEL-10533: AggregateController - Add forceDiscardOfGroup method
---
 .../processor/aggregate/AggregateController.java   | 15 +++++
 .../processor/aggregate/AggregateProcessor.java    | 65 +++++++++++++++++++++-
 .../aggregate/DefaultAggregateController.java      | 17 ++++++
 .../aggregator/AggregateControllerTest.java        | 63 ++++++++++++++++++++-
 .../mbean/ManagedAggregateProcessorMBean.java      |  6 ++
 .../mbean/ManagedAggregateProcessor.java           | 18 ++++++
 6 files changed, 182 insertions(+), 2 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
index bacaa8e..703065e 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
@@ -51,4 +51,19 @@ public interface AggregateController {
      */
     int forceCompletionOfAllGroups();
 
+    /**
+     * To force discarding a specific group by its key.
+     *
+     * @param key the key
+     * @return <tt>1</tt> if the group was forced discarded, <tt>0</tt> if the group does not exists
+     */
+    int forceDiscardingOfGroup(String key);
+
+    /**
+     * To force discardingof all groups
+     *
+     * @return number of groups that was forced discarded
+     */
+    int forceDiscardingOfAllGroups();
+
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index c7dbb09..edcd71c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -799,7 +799,6 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
             }
         }
 
-
         log.debug("Processing aggregated exchange: {}", exchange);
 
         // add on completion task so we remember to update the inProgressCompleteExchanges
@@ -1665,4 +1664,68 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
         return total;
     }
 
+    public int forceDiscardingOfGroup(String key) {
+        // must acquire the shared aggregation lock to be able to trigger force completion
+        int total = 0;
+
+        lock.lock();
+        try {
+            Exchange exchange = aggregationRepository.get(camelContext, key);
+            if (exchange != null) {
+                total = 1;
+                log.trace("Force discarded triggered for correlation key: {}", key);
+                // force discarding by setting aggregate failed as true
+                onCompletion(key, exchange, exchange, false, true);
+            }
+        } finally {
+            lock.unlock();
+        }
+        log.trace("Completed force discarded of group {}", key);
+
+        if (total > 0) {
+            log.debug("Forcing discarding of group {} with {} exchanges", key, total);
+        }
+        return total;
+    }
+
+    public int forceDiscardingOfAllGroups() {
+
+        // only run if CamelContext has been fully started or is stopping
+        boolean allow = camelContext.getStatus().isStarted() || camelContext.getStatus().isStopping();
+        if (!allow) {
+            log.warn("Cannot start force discarding of all groups because CamelContext({}) has not been started", camelContext.getName());
+            return 0;
+        }
+
+        log.trace("Starting force discarding of all groups task");
+
+        // trigger completion for all in the repository
+        Set<String> keys = aggregationRepository.getKeys();
+
+        int total = 0;
+        if (keys != null && !keys.isEmpty()) {
+            // must acquire the shared aggregation lock to be able to trigger force completion
+            lock.lock();
+            total = keys.size();
+            try {
+                for (String key : keys) {
+                    Exchange exchange = aggregationRepository.get(camelContext, key);
+                    if (exchange != null) {
+                        log.trace("Force discarded triggered for correlation key: {}", key);
+                        // force discarding by setting aggregate failed as true
+                        onCompletion(key, exchange, exchange, false, true);
+                    }
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+        log.trace("Completed force discarding of all groups task");
+
+        if (total > 0) {
+            log.debug("Forcing discarding of all groups with {} exchanges", total);
+        }
+        return total;
+    }
+
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
index 1650133..6765bfd 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
@@ -51,4 +51,21 @@ public class DefaultAggregateController implements AggregateController {
         }
     }
 
+    @Override
+    public int forceDiscardingOfGroup(String key) {
+        if (processor != null) {
+            return processor.forceDiscardingOfGroup(key);
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
+    public int forceDiscardingOfAllGroups() {
+        if (processor != null) {
+            return processor.forceDiscardingOfAllGroups();
+        } else {
+            return 0;
+        }
+    }
 }
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
index be792be..00e348d 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
@@ -77,6 +77,67 @@ public class AggregateControllerTest extends ContextTestSupport {
         assertMockEndpointsSatisfied();
     }
 
+    @Test
+    public void testForceDiscardingOfGroup() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedMessageCount(1);
+        getMockEndpoint("mock:aggregated").expectedHeaderReceived("id", "1");
+        // the first 5 messages are discarded
+        getMockEndpoint("mock:aggregated").message(0).body().startsWith("test6");
+
+        template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test2", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test4", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test5", "id", "1");
+
+        int groups = getAggregateController().forceDiscardingOfGroup("1");
+        assertEquals(1, groups);
+
+        template.sendBodyAndHeader("direct:start", "test6", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test7", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test8", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test9", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test10", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test11", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test12", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test13", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test14", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test15", "id", "1");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testForceDiscardingOfAll() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedMessageCount(1);
+        getMockEndpoint("mock:aggregated").expectedHeaderReceived("id", "1");
+        // the first 5 messages are discarded
+        getMockEndpoint("mock:aggregated").message(0).body().startsWith("test6");
+
+        template.sendBodyAndHeader("direct:start", "test0", "id", "2");
+        template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test2", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test4", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test5", "id", "1");
+
+        int groups = getAggregateController().forceDiscardingOfAllGroups();
+        assertEquals(2, groups);
+
+        template.sendBodyAndHeader("direct:start", "test6", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test7", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test8", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test9", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test10", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test11", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test12", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test13", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test14", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test15", "id", "1");
+
+        assertMockEndpointsSatisfied();
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -85,7 +146,7 @@ public class AggregateControllerTest extends ContextTestSupport {
                 from("direct:start")
                         .aggregate(header("id"), new MyAggregationStrategy()).aggregateController(getAggregateController())
                         .completionSize(10)
-                        .to("mock:aggregated");
+                        .to("log:aggregated", "mock:aggregated");
             }
         };
     }
diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
index cae35d9..fabf1d3 100644
--- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
@@ -99,6 +99,12 @@ public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean {
     @ManagedOperation(description = "To force complete of all groups")
     int forceCompletionOfAllGroups();
 
+    @ManagedOperation(description = "To force discarding a specific group by its key")
+    int forceDiscardingOfGroup(String key);
+
+    @ManagedOperation(description = "To force discarding of all groups")
+    int forceDiscardingOfAllGroups();
+
     @ManagedAttribute(description = "Current number of closed correlation keys in the memory cache")
     int getClosedCorrelationKeysCacheSize();
 
diff --git a/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java b/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
index f77b4ec..e509319 100644
--- a/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
+++ b/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
@@ -225,6 +225,24 @@ public class ManagedAggregateProcessor extends ManagedProcessor implements Manag
     }
 
     @Override
+    public int forceDiscardingOfGroup(String key) {
+        if (processor.getAggregateController() != null) {
+            return processor.getAggregateController().forceDiscardingOfGroup(key);
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
+    public int forceDiscardingOfAllGroups() {
+        if (processor.getAggregateController() != null) {
+            return processor.getAggregateController().forceDiscardingOfAllGroups();
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
     public int getClosedCorrelationKeysCacheSize() {
         return processor.getClosedCorrelationKeysCacheSize();
     }