You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/12/27 15:22:42 UTC

[camel] branch main updated: CAMEL-18844: Possible memory leak in org.apache.camel.impl.console.EventConsole

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 07a66cf4b0c CAMEL-18844: Possible memory leak in org.apache.camel.impl.console.EventConsole
07a66cf4b0c is described below

commit 07a66cf4b0c995cc2a93714255dc6068d89bb5df
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Dec 27 16:18:14 2022 +0100

    CAMEL-18844: Possible memory leak in org.apache.camel.impl.console.EventConsole
---
 .../apache/camel/impl/console/EventConsole.java    | 54 ++++++++++++++--------
 1 file changed, 34 insertions(+), 20 deletions(-)

diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java
index 57b2b9fee66..360a55c390a 100644
--- a/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java
+++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java
@@ -16,11 +16,11 @@
  */
 package org.apache.camel.impl.console;
 
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 
 import org.apache.camel.spi.CamelEvent;
 import org.apache.camel.spi.Configurer;
@@ -37,9 +37,9 @@ public class EventConsole extends AbstractDevConsole {
     @Metadata(defaultValue = "25", description = "Maximum capacity of last number of events to capture")
     private int capacity = 25;
 
-    private Queue<CamelEvent> events;
-    private Queue<CamelEvent.RouteEvent> routeEvents;
-    private Queue<CamelEvent.ExchangeEvent> exchangeEvents;
+    private BlockingQueue<CamelEvent> events;
+    private BlockingQueue<CamelEvent.RouteEvent> routeEvents;
+    private BlockingQueue<CamelEvent.ExchangeEvent> exchangeEvents;
     private final ConsoleEventNotifier listener = new ConsoleEventNotifier();
 
     public EventConsole() {
@@ -56,9 +56,10 @@ public class EventConsole extends AbstractDevConsole {
 
     @Override
     protected void doInit() throws Exception {
-        this.events = new ArrayDeque<>(capacity);
-        this.routeEvents = new ArrayDeque<>(capacity);
-        this.exchangeEvents = new ArrayDeque<>(capacity);
+        // capacity capped queue using fair to make sure events are in correct order
+        this.events = new ArrayBlockingQueue<>(capacity, true);
+        this.routeEvents = new ArrayBlockingQueue<>(capacity, true);
+        this.exchangeEvents = new ArrayBlockingQueue<>(capacity, true);
     }
 
     @Override
@@ -164,21 +165,34 @@ public class EventConsole extends AbstractDevConsole {
 
         @Override
         public void notify(CamelEvent event) throws Exception {
+            // offer new event and if false, then remove head and try again
+
             if (event instanceof CamelEvent.ExchangeEvent) {
-                if (exchangeEvents.size() >= capacity) {
-                    exchangeEvents.poll();
-                }
-                exchangeEvents.add((CamelEvent.ExchangeEvent) event);
+                CamelEvent.ExchangeEvent ce = (CamelEvent.ExchangeEvent) event;
+                boolean added;
+                do {
+                    added = exchangeEvents.offer(ce);
+                    if (!added) {
+                        exchangeEvents.poll();
+                    }
+                } while (!added);
             } else if (event instanceof CamelEvent.RouteEvent) {
-                if (routeEvents.size() >= capacity) {
-                    routeEvents.poll();
-                }
-                routeEvents.add((CamelEvent.RouteEvent) event);
+                CamelEvent.RouteEvent re = (CamelEvent.RouteEvent) event;
+                boolean added;
+                do {
+                    added = routeEvents.offer(re);
+                    if (!added) {
+                        exchangeEvents.poll();
+                    }
+                } while (!added);
             } else {
-                if (events.size() >= capacity) {
-                    events.poll();
-                }
-                events.offer(event);
+                boolean added;
+                do {
+                    added = events.offer(event);
+                    if (!added) {
+                        exchangeEvents.poll();
+                    }
+                } while (!added);
             }
         }