You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/12/27 15:22:42 UTC
[camel] branch main updated: CAMEL-18844: Possible memory leak in org.apache.camel.impl.console.EventConsole
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 07a66cf4b0c CAMEL-18844: Possible memory leak in org.apache.camel.impl.console.EventConsole
07a66cf4b0c is described below
commit 07a66cf4b0c995cc2a93714255dc6068d89bb5df
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Dec 27 16:18:14 2022 +0100
CAMEL-18844: Possible memory leak in org.apache.camel.impl.console.EventConsole
---
.../apache/camel/impl/console/EventConsole.java | 54 ++++++++++++++--------
1 file changed, 34 insertions(+), 20 deletions(-)
diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java
index 57b2b9fee66..360a55c390a 100644
--- a/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java
+++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java
@@ -16,11 +16,11 @@
*/
package org.apache.camel.impl.console;
-import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import org.apache.camel.spi.CamelEvent;
import org.apache.camel.spi.Configurer;
@@ -37,9 +37,9 @@ public class EventConsole extends AbstractDevConsole {
@Metadata(defaultValue = "25", description = "Maximum capacity of last number of events to capture")
private int capacity = 25;
- private Queue<CamelEvent> events;
- private Queue<CamelEvent.RouteEvent> routeEvents;
- private Queue<CamelEvent.ExchangeEvent> exchangeEvents;
+ private BlockingQueue<CamelEvent> events;
+ private BlockingQueue<CamelEvent.RouteEvent> routeEvents;
+ private BlockingQueue<CamelEvent.ExchangeEvent> exchangeEvents;
private final ConsoleEventNotifier listener = new ConsoleEventNotifier();
public EventConsole() {
@@ -56,9 +56,10 @@ public class EventConsole extends AbstractDevConsole {
@Override
protected void doInit() throws Exception {
- this.events = new ArrayDeque<>(capacity);
- this.routeEvents = new ArrayDeque<>(capacity);
- this.exchangeEvents = new ArrayDeque<>(capacity);
+ // capacity capped queue using fair to make sure events are in correct order
+ this.events = new ArrayBlockingQueue<>(capacity, true);
+ this.routeEvents = new ArrayBlockingQueue<>(capacity, true);
+ this.exchangeEvents = new ArrayBlockingQueue<>(capacity, true);
}
@Override
@@ -164,21 +165,34 @@ public class EventConsole extends AbstractDevConsole {
@Override
public void notify(CamelEvent event) throws Exception {
+ // offer new event and if false, then remove head and try again
+
if (event instanceof CamelEvent.ExchangeEvent) {
- if (exchangeEvents.size() >= capacity) {
- exchangeEvents.poll();
- }
- exchangeEvents.add((CamelEvent.ExchangeEvent) event);
+ CamelEvent.ExchangeEvent ce = (CamelEvent.ExchangeEvent) event;
+ boolean added;
+ do {
+ added = exchangeEvents.offer(ce);
+ if (!added) {
+ exchangeEvents.poll();
+ }
+ } while (!added);
} else if (event instanceof CamelEvent.RouteEvent) {
- if (routeEvents.size() >= capacity) {
- routeEvents.poll();
- }
- routeEvents.add((CamelEvent.RouteEvent) event);
+ CamelEvent.RouteEvent re = (CamelEvent.RouteEvent) event;
+ boolean added;
+ do {
+ added = routeEvents.offer(re);
+ if (!added) {
+ exchangeEvents.poll();
+ }
+ } while (!added);
} else {
- if (events.size() >= capacity) {
- events.poll();
- }
- events.offer(event);
+ boolean added;
+ do {
+ added = events.offer(event);
+ if (!added) {
+ exchangeEvents.poll();
+ }
+ } while (!added);
}
}