You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/06 09:21:35 UTC
[camel] 01/02: CAMEL-10533: AggregateController - Add
forceDiscardOfGroup method
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3e4cf070d0f866843650ded6bef8893b21e10276
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Aug 6 11:17:47 2019 +0200
CAMEL-10533: AggregateController - Add forceDiscardOfGroup method
---
.../processor/aggregate/AggregateController.java | 15 +++++
.../processor/aggregate/AggregateProcessor.java | 65 +++++++++++++++++++++-
.../aggregate/DefaultAggregateController.java | 17 ++++++
.../aggregator/AggregateControllerTest.java | 63 ++++++++++++++++++++-
.../mbean/ManagedAggregateProcessorMBean.java | 6 ++
.../mbean/ManagedAggregateProcessor.java | 18 ++++++
6 files changed, 182 insertions(+), 2 deletions(-)
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
index bacaa8e..703065e 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
@@ -51,4 +51,19 @@ public interface AggregateController {
*/
int forceCompletionOfAllGroups();
+ /**
+ * To force discarding a specific group by its key.
+ *
+ * @param key the key
+ * @return <tt>1</tt> if the group was forced discarded, <tt>0</tt> if the group does not exists
+ */
+ int forceDiscardingOfGroup(String key);
+
+ /**
+ * To force discardingof all groups
+ *
+ * @return number of groups that was forced discarded
+ */
+ int forceDiscardingOfAllGroups();
+
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index c7dbb09..edcd71c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -799,7 +799,6 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
}
}
-
log.debug("Processing aggregated exchange: {}", exchange);
// add on completion task so we remember to update the inProgressCompleteExchanges
@@ -1665,4 +1664,68 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
return total;
}
+ public int forceDiscardingOfGroup(String key) {
+ // must acquire the shared aggregation lock to be able to trigger force completion
+ int total = 0;
+
+ lock.lock();
+ try {
+ Exchange exchange = aggregationRepository.get(camelContext, key);
+ if (exchange != null) {
+ total = 1;
+ log.trace("Force discarded triggered for correlation key: {}", key);
+ // force discarding by setting aggregate failed as true
+ onCompletion(key, exchange, exchange, false, true);
+ }
+ } finally {
+ lock.unlock();
+ }
+ log.trace("Completed force discarded of group {}", key);
+
+ if (total > 0) {
+ log.debug("Forcing discarding of group {} with {} exchanges", key, total);
+ }
+ return total;
+ }
+
+ public int forceDiscardingOfAllGroups() {
+
+ // only run if CamelContext has been fully started or is stopping
+ boolean allow = camelContext.getStatus().isStarted() || camelContext.getStatus().isStopping();
+ if (!allow) {
+ log.warn("Cannot start force discarding of all groups because CamelContext({}) has not been started", camelContext.getName());
+ return 0;
+ }
+
+ log.trace("Starting force discarding of all groups task");
+
+ // trigger completion for all in the repository
+ Set<String> keys = aggregationRepository.getKeys();
+
+ int total = 0;
+ if (keys != null && !keys.isEmpty()) {
+ // must acquire the shared aggregation lock to be able to trigger force completion
+ lock.lock();
+ total = keys.size();
+ try {
+ for (String key : keys) {
+ Exchange exchange = aggregationRepository.get(camelContext, key);
+ if (exchange != null) {
+ log.trace("Force discarded triggered for correlation key: {}", key);
+ // force discarding by setting aggregate failed as true
+ onCompletion(key, exchange, exchange, false, true);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ log.trace("Completed force discarding of all groups task");
+
+ if (total > 0) {
+ log.debug("Forcing discarding of all groups with {} exchanges", total);
+ }
+ return total;
+ }
+
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
index 1650133..6765bfd 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
@@ -51,4 +51,21 @@ public class DefaultAggregateController implements AggregateController {
}
}
+ @Override
+ public int forceDiscardingOfGroup(String key) {
+ if (processor != null) {
+ return processor.forceDiscardingOfGroup(key);
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public int forceDiscardingOfAllGroups() {
+ if (processor != null) {
+ return processor.forceDiscardingOfAllGroups();
+ } else {
+ return 0;
+ }
+ }
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
index be792be..00e348d 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
@@ -77,6 +77,67 @@ public class AggregateControllerTest extends ContextTestSupport {
assertMockEndpointsSatisfied();
}
+ @Test
+ public void testForceDiscardingOfGroup() throws Exception {
+ getMockEndpoint("mock:aggregated").expectedMessageCount(1);
+ getMockEndpoint("mock:aggregated").expectedHeaderReceived("id", "1");
+ // the first 5 messages are discarded
+ getMockEndpoint("mock:aggregated").message(0).body().startsWith("test6");
+
+ template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test2", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test4", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test5", "id", "1");
+
+ int groups = getAggregateController().forceDiscardingOfGroup("1");
+ assertEquals(1, groups);
+
+ template.sendBodyAndHeader("direct:start", "test6", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test7", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test8", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test9", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test10", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test11", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test12", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test13", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test14", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test15", "id", "1");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testForceDiscardingOfAll() throws Exception {
+ getMockEndpoint("mock:aggregated").expectedMessageCount(1);
+ getMockEndpoint("mock:aggregated").expectedHeaderReceived("id", "1");
+ // the first 5 messages are discarded
+ getMockEndpoint("mock:aggregated").message(0).body().startsWith("test6");
+
+ template.sendBodyAndHeader("direct:start", "test0", "id", "2");
+ template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test2", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test4", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test5", "id", "1");
+
+ int groups = getAggregateController().forceDiscardingOfAllGroups();
+ assertEquals(2, groups);
+
+ template.sendBodyAndHeader("direct:start", "test6", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test7", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test8", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test9", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test10", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test11", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test12", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test13", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test14", "id", "1");
+ template.sendBodyAndHeader("direct:start", "test15", "id", "1");
+
+ assertMockEndpointsSatisfied();
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@@ -85,7 +146,7 @@ public class AggregateControllerTest extends ContextTestSupport {
from("direct:start")
.aggregate(header("id"), new MyAggregationStrategy()).aggregateController(getAggregateController())
.completionSize(10)
- .to("mock:aggregated");
+ .to("log:aggregated", "mock:aggregated");
}
};
}
diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
index cae35d9..fabf1d3 100644
--- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
@@ -99,6 +99,12 @@ public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean {
@ManagedOperation(description = "To force complete of all groups")
int forceCompletionOfAllGroups();
+ @ManagedOperation(description = "To force discarding a specific group by its key")
+ int forceDiscardingOfGroup(String key);
+
+ @ManagedOperation(description = "To force discarding of all groups")
+ int forceDiscardingOfAllGroups();
+
@ManagedAttribute(description = "Current number of closed correlation keys in the memory cache")
int getClosedCorrelationKeysCacheSize();
diff --git a/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java b/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
index f77b4ec..e509319 100644
--- a/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
+++ b/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
@@ -225,6 +225,24 @@ public class ManagedAggregateProcessor extends ManagedProcessor implements Manag
}
@Override
+ public int forceDiscardingOfGroup(String key) {
+ if (processor.getAggregateController() != null) {
+ return processor.getAggregateController().forceDiscardingOfGroup(key);
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public int forceDiscardingOfAllGroups() {
+ if (processor.getAggregateController() != null) {
+ return processor.getAggregateController().forceDiscardingOfAllGroups();
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
public int getClosedCorrelationKeysCacheSize() {
return processor.getClosedCorrelationKeysCacheSize();
}