You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/06 09:21:35 UTC

[camel] 01/02: CAMEL-10533: AggregateController - Add forceDiscardOfGroup method

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3e4cf070d0f866843650ded6bef8893b21e10276
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Aug 6 11:17:47 2019 +0200

    CAMEL-10533: AggregateController - Add forceDiscardOfGroup method
---
 .../processor/aggregate/AggregateController.java   | 15 +++++
 .../processor/aggregate/AggregateProcessor.java    | 65 +++++++++++++++++++++-
 .../aggregate/DefaultAggregateController.java      | 17 ++++++
 .../aggregator/AggregateControllerTest.java        | 63 ++++++++++++++++++++-
 .../mbean/ManagedAggregateProcessorMBean.java      |  6 ++
 .../mbean/ManagedAggregateProcessor.java           | 18 ++++++
 6 files changed, 182 insertions(+), 2 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
index bacaa8e..703065e 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
@@ -51,4 +51,19 @@ public interface AggregateController {
      */
     int forceCompletionOfAllGroups();
 
+    /**
+     * To force discarding a specific group by its key.
+     *
+     * @param key the key
+     * @return <tt>1</tt> if the group was forced discarded, <tt>0</tt> if the group does not exists
+     */
+    int forceDiscardingOfGroup(String key);
+
+    /**
+     * To force discardingof all groups
+     *
+     * @return number of groups that was forced discarded
+     */
+    int forceDiscardingOfAllGroups();
+
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index c7dbb09..edcd71c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -799,7 +799,6 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
             }
         }
 
-
         log.debug("Processing aggregated exchange: {}", exchange);
 
         // add on completion task so we remember to update the inProgressCompleteExchanges
@@ -1665,4 +1664,68 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
         return total;
     }
 
+    public int forceDiscardingOfGroup(String key) {
+        // must acquire the shared aggregation lock to be able to trigger force completion
+        int total = 0;
+
+        lock.lock();
+        try {
+            Exchange exchange = aggregationRepository.get(camelContext, key);
+            if (exchange != null) {
+                total = 1;
+                log.trace("Force discarded triggered for correlation key: {}", key);
+                // force discarding by setting aggregate failed as true
+                onCompletion(key, exchange, exchange, false, true);
+            }
+        } finally {
+            lock.unlock();
+        }
+        log.trace("Completed force discarded of group {}", key);
+
+        if (total > 0) {
+            log.debug("Forcing discarding of group {} with {} exchanges", key, total);
+        }
+        return total;
+    }
+
+    public int forceDiscardingOfAllGroups() {
+
+        // only run if CamelContext has been fully started or is stopping
+        boolean allow = camelContext.getStatus().isStarted() || camelContext.getStatus().isStopping();
+        if (!allow) {
+            log.warn("Cannot start force discarding of all groups because CamelContext({}) has not been started", camelContext.getName());
+            return 0;
+        }
+
+        log.trace("Starting force discarding of all groups task");
+
+        // trigger completion for all in the repository
+        Set<String> keys = aggregationRepository.getKeys();
+
+        int total = 0;
+        if (keys != null && !keys.isEmpty()) {
+            // must acquire the shared aggregation lock to be able to trigger force completion
+            lock.lock();
+            total = keys.size();
+            try {
+                for (String key : keys) {
+                    Exchange exchange = aggregationRepository.get(camelContext, key);
+                    if (exchange != null) {
+                        log.trace("Force discarded triggered for correlation key: {}", key);
+                        // force discarding by setting aggregate failed as true
+                        onCompletion(key, exchange, exchange, false, true);
+                    }
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+        log.trace("Completed force discarding of all groups task");
+
+        if (total > 0) {
+            log.debug("Forcing discarding of all groups with {} exchanges", total);
+        }
+        return total;
+    }
+
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
index 1650133..6765bfd 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
@@ -51,4 +51,21 @@ public class DefaultAggregateController implements AggregateController {
         }
     }
 
+    @Override
+    public int forceDiscardingOfGroup(String key) {
+        if (processor != null) {
+            return processor.forceDiscardingOfGroup(key);
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
+    public int forceDiscardingOfAllGroups() {
+        if (processor != null) {
+            return processor.forceDiscardingOfAllGroups();
+        } else {
+            return 0;
+        }
+    }
 }
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
index be792be..00e348d 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
@@ -77,6 +77,67 @@ public class AggregateControllerTest extends ContextTestSupport {
         assertMockEndpointsSatisfied();
     }
 
+    @Test
+    public void testForceDiscardingOfGroup() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedMessageCount(1);
+        getMockEndpoint("mock:aggregated").expectedHeaderReceived("id", "1");
+        // the first 5 messages are discarded
+        getMockEndpoint("mock:aggregated").message(0).body().startsWith("test6");
+
+        template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test2", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test4", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test5", "id", "1");
+
+        int groups = getAggregateController().forceDiscardingOfGroup("1");
+        assertEquals(1, groups);
+
+        template.sendBodyAndHeader("direct:start", "test6", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test7", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test8", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test9", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test10", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test11", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test12", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test13", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test14", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test15", "id", "1");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testForceDiscardingOfAll() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedMessageCount(1);
+        getMockEndpoint("mock:aggregated").expectedHeaderReceived("id", "1");
+        // the first 5 messages are discarded
+        getMockEndpoint("mock:aggregated").message(0).body().startsWith("test6");
+
+        template.sendBodyAndHeader("direct:start", "test0", "id", "2");
+        template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test2", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test4", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test5", "id", "1");
+
+        int groups = getAggregateController().forceDiscardingOfAllGroups();
+        assertEquals(2, groups);
+
+        template.sendBodyAndHeader("direct:start", "test6", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test7", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test8", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test9", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test10", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test11", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test12", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test13", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test14", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test15", "id", "1");
+
+        assertMockEndpointsSatisfied();
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -85,7 +146,7 @@ public class AggregateControllerTest extends ContextTestSupport {
                 from("direct:start")
                         .aggregate(header("id"), new MyAggregationStrategy()).aggregateController(getAggregateController())
                         .completionSize(10)
-                        .to("mock:aggregated");
+                        .to("log:aggregated", "mock:aggregated");
             }
         };
     }
diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
index cae35d9..fabf1d3 100644
--- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
@@ -99,6 +99,12 @@ public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean {
     @ManagedOperation(description = "To force complete of all groups")
     int forceCompletionOfAllGroups();
 
+    @ManagedOperation(description = "To force discarding a specific group by its key")
+    int forceDiscardingOfGroup(String key);
+
+    @ManagedOperation(description = "To force discarding of all groups")
+    int forceDiscardingOfAllGroups();
+
     @ManagedAttribute(description = "Current number of closed correlation keys in the memory cache")
     int getClosedCorrelationKeysCacheSize();
 
diff --git a/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java b/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
index f77b4ec..e509319 100644
--- a/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
+++ b/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
@@ -225,6 +225,24 @@ public class ManagedAggregateProcessor extends ManagedProcessor implements Manag
     }
 
     @Override
+    public int forceDiscardingOfGroup(String key) {
+        if (processor.getAggregateController() != null) {
+            return processor.getAggregateController().forceDiscardingOfGroup(key);
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
+    public int forceDiscardingOfAllGroups() {
+        if (processor.getAggregateController() != null) {
+            return processor.getAggregateController().forceDiscardingOfAllGroups();
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
     public int getClosedCorrelationKeysCacheSize() {
         return processor.getClosedCorrelationKeysCacheSize();
     }