You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/06/23 11:36:15 UTC

[camel] branch CAMEL-18217/suspend-messages created (now 22afcbd0418)

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a change to branch CAMEL-18217/suspend-messages
in repository https://gitbox.apache.org/repos/asf/camel.git


      at 22afcbd0418 CAMEL-18217: debugger - Allow to suspend messages

This branch includes the following new commits:

     new 22afcbd0418 CAMEL-18217: debugger - Allow to suspend messages

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel] 01/01: CAMEL-18217: debugger - Allow to suspend messages

Posted by nf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch CAMEL-18217/suspend-messages
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 22afcbd041824f70e5ed66bd01dd7edf0ce82af1
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Thu Jun 23 12:16:31 2022 +0200

    CAMEL-18217: debugger - Allow to suspend messages
---
 camel-dependencies/pom.xml                         |   1 +
 .../camel/impl/debugger/BacklogDebugger.java       | 103 +++++++++++++++++++--
 .../camel/impl/engine/CamelInternalProcessor.java  |  10 +-
 .../mbean/ManagedBacklogDebuggerMBean.java         |   6 ++
 core/camel-management/pom.xml                      |   5 +
 .../management/mbean/ManagedBacklogDebugger.java   |  10 ++
 .../camel/management/BacklogDebuggerTest.java      |  47 ++++++++++
 docs/user-manual/modules/ROOT/pages/debugger.adoc  |   7 +-
 parent/pom.xml                                     |   6 ++
 9 files changed, 179 insertions(+), 16 deletions(-)

diff --git a/camel-dependencies/pom.xml b/camel-dependencies/pom.xml
index 822dcd37cef..a9b2b7b1744 100644
--- a/camel-dependencies/pom.xml
+++ b/camel-dependencies/pom.xml
@@ -355,6 +355,7 @@
     <jt400-version>11.0</jt400-version>
     <jta-api-1.2-version>1.2</jta-api-1.2-version>
     <junit-jupiter-version>5.8.2</junit-jupiter-version>
+    <junit-pioneer-version>1.7.1</junit-pioneer-version>
     <junit-toolbox-version>2.4</junit-toolbox-version>
     <junit-version>4.13.2</junit-version>
     <jxmpp-version>0.6.4</jxmpp-version>
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
index a7e92b902c3..db6a759afde 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -44,6 +45,7 @@ import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +63,11 @@ import org.slf4j.LoggerFactory;
  */
 public final class BacklogDebugger extends ServiceSupport {
 
+    /**
+     * The name of the environment variable that contains the value of the flag indicating whether Camel should suspend
+     * processing the messages and wait for a debugger to attach or not.
+     */
+    public static final String SUSPEND_MODE_ENV_VAR_NAME = "CAMEL_DEBUGGER_SUSPEND";
     private static final Logger LOG = LoggerFactory.getLogger(BacklogDebugger.class);
 
     private long fallbackTimeout = 300;
@@ -73,6 +80,14 @@ public final class BacklogDebugger extends ServiceSupport {
     private final ConcurrentMap<String, NodeBreakpoint> breakpoints = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, SuspendedExchange> suspendedBreakpoints = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, BacklogTracerEventMessage> suspendedBreakpointMessages = new ConcurrentHashMap<>();
+    /**
+     * Indicates whether the suspend mode is enabled or not.
+     */
+    private final boolean suspendMode;
+    /**
+     * The reference to the {@code CountDownLatch} used to suspend Camel from processing the incoming messages.
+     */
+    private final AtomicReference<CountDownLatch> suspend = new AtomicReference<>();
     private volatile String singleStepExchangeId;
     private int bodyMaxChars = 128 * 1024;
     private boolean bodyIncludeStreams;
@@ -103,19 +118,31 @@ public final class BacklogDebugger extends ServiceSupport {
         }
     }
 
-    private BacklogDebugger(CamelContext camelContext) {
+    /**
+     * Constructs a {@code BacklogDebugger} with the given parameters.
+     *
+     * @param camelContext the camel context
+     * @param suspendMode  Indicates whether the suspend mode is enabled or not. If {@code true} the message processing
+     *                     is immediately suspended until the {@link #attach()} is called.
+     */
+    private BacklogDebugger(CamelContext camelContext, boolean suspendMode) {
         this.camelContext = camelContext;
         this.debugger = new DefaultDebugger(camelContext);
+        this.suspendMode = suspendMode;
+        detach();
     }
 
     /**
      * Creates a new backlog debugger.
+     * <p>
+     * In case the environment variable {@link #SUSPEND_MODE_ENV_VAR_NAME} has been set to {@code true}, the message
+     * processing is directly suspended.
      *
      * @param  context Camel context
      * @return         a new backlog debugger
      */
     public static BacklogDebugger createDebugger(CamelContext context) {
-        return new BacklogDebugger(context);
+        return new BacklogDebugger(context, Boolean.parseBoolean(System.getenv(SUSPEND_MODE_ENV_VAR_NAME)));
     }
 
     /**
@@ -169,6 +196,65 @@ public final class BacklogDebugger extends ServiceSupport {
         return singleStepExchangeId != null;
     }
 
+    /**
+     * Attach the debugger which will resume the message processing in case the <i>suspend mode</i> is active. Do
+     * nothing otherwise.
+     */
+    public void attach() {
+        if (suspendMode) {
+            logger.log("A debugger has been attached");
+            resumeMessageProcessing();
+        }
+    }
+
+    /**
+     * Detach the debugger which will suspend the message processing in case the <i>suspend mode</i> is active. Do
+     * nothing otherwise.
+     */
+    public void detach() {
+        if (suspendMode) {
+            logger.log("Waiting for a debugger to attach");
+            suspendMessageProcessing();
+        }
+    }
+
+    /**
+     * Suspend the current thread from processing the message if the <i>suspend mode</i> is active as long as the method
+     * {@link #attach()} is not called. Do nothing otherwise.
+     */
+    private void suspendIfNeeded() {
+        final CountDownLatch countDownLatch = suspend.get();
+        if (countDownLatch != null) {
+            logger.log("Incoming message suspended");
+            try {
+                countDownLatch.await();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Make Camel suspend processing of the incoming messages.
+     */
+    private void suspendMessageProcessing() {
+        suspend.compareAndSet(null, new CountDownLatch(1));
+    }
+
+    /**
+     * Resume the processing of the incoming messages.
+     */
+    private void resumeMessageProcessing() {
+        for (;;) {
+            final CountDownLatch countDownLatch = suspend.get();
+            if (countDownLatch == null) {
+                break;
+            } else if (suspend.compareAndSet(countDownLatch, null)) {
+                countDownLatch.countDown();
+            }
+        }
+    }
+
     public void addBreakpoint(String nodeId) {
         NodeBreakpoint breakpoint = breakpoints.get(nodeId);
         if (breakpoint == null) {
@@ -454,13 +540,18 @@ public final class BacklogDebugger extends ServiceSupport {
         debugCounter.set(0);
     }
 
-    public boolean beforeProcess(Exchange exchange, Processor processor, NamedNode definition) {
-        return debugger.beforeProcess(exchange, processor, definition);
+    public StopWatch beforeProcess(Exchange exchange, Processor processor, NamedNode definition) {
+        suspendIfNeeded();
+        if (isEnabled() && (hasBreakpoint(definition.getId()) || isSingleStepMode())) {
+            StopWatch watch = new StopWatch();
+            debugger.beforeProcess(exchange, processor, definition);
+            return watch;
+        }
+        return null;
     }
 
-    public boolean afterProcess(Exchange exchange, Processor processor, NamedNode definition, long timeTaken) {
+    public void afterProcess(Exchange exchange, Processor processor, NamedNode definition, long timeTaken) {
         // noop
-        return false;
     }
 
     @Override
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 6a031a1f402..7babd956ea8 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -639,24 +639,16 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
         private final BacklogDebugger backlogDebugger;
         private final Processor target;
         private final NamedNode definition;
-        private final String nodeId;
 
         public BacklogDebuggerAdvice(BacklogDebugger backlogDebugger, Processor target, NamedNode definition) {
             this.backlogDebugger = backlogDebugger;
             this.target = target;
             this.definition = definition;
-            this.nodeId = definition.getId();
         }
 
         @Override
         public StopWatch before(Exchange exchange) throws Exception {
-            if (backlogDebugger.isEnabled() && (backlogDebugger.hasBreakpoint(nodeId) || backlogDebugger.isSingleStepMode())) {
-                StopWatch watch = new StopWatch();
-                backlogDebugger.beforeProcess(exchange, target, definition);
-                return watch;
-            } else {
-                return null;
-            }
+            return backlogDebugger.beforeProcess(exchange, target, definition);
         }
 
         @Override
diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
index b824a84c356..57c2f7573e8 100644
--- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
@@ -175,4 +175,10 @@ public interface ManagedBacklogDebuggerMBean {
 
     @ManagedOperation(description = "Returns the message history at the given node id as XML")
     String messageHistoryOnBreakpointAsXml(String nodeId);
+
+    @ManagedOperation(description = "Attach the debugger")
+    void attach();
+
+    @ManagedOperation(description = "Detach the debugger")
+    void detach();
 }
diff --git a/core/camel-management/pom.xml b/core/camel-management/pom.xml
index 577572d5cef..1e225b27d4c 100644
--- a/core/camel-management/pom.xml
+++ b/core/camel-management/pom.xml
@@ -69,6 +69,11 @@
             <artifactId>awaitility</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.junit-pioneer</groupId>
+            <artifactId>junit-pioneer</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-core</artifactId>
diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
index 0f78390a6fc..0fb57d6fc0b 100644
--- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
+++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
@@ -441,6 +441,16 @@ public class ManagedBacklogDebugger implements ManagedBacklogDebuggerMBean {
         return messageHistoryBuffer.toString();
     }
 
+    @Override
+    public void attach() {
+        backlogDebugger.attach();
+    }
+
+    @Override
+    public void detach() {
+        backlogDebugger.detach();
+    }
+
     private String dumpExchangePropertiesAsXml(String id) {
         StringBuilder sb = new StringBuilder();
         sb.append("  <exchangeProperties>\n");
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java b/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
index 01b8702f5d8..24a1e03f758 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
@@ -24,9 +24,11 @@ import javax.management.ObjectName;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.debugger.BacklogDebugger;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
+import org.junitpioneer.jupiter.SetEnvironmentVariable;
 
 import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -890,6 +892,51 @@ public class BacklogDebuggerTest extends ManagementTestSupport {
         assertEquals(0, nodes.size());
     }
 
+    /**
+     * Ensure that the suspend mode works as expected.
+     */
+    @Test
+    @SetEnvironmentVariable(key = BacklogDebugger.SUSPEND_MODE_ENV_VAR_NAME, value = "true")
+    public void testSuspendMode() throws Exception {
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on = new ObjectName(
+                "org.apache.camel:context=" + context.getManagementName() + ",type=tracer,name=BacklogDebugger");
+        assertNotNull(on);
+        mbeanServer.isRegistered(on);
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+        mock.setSleepForEmptyTest(100);
+
+        template.sendBody("seda:start", "Hello World");
+        assertMockEndpointsSatisfied();
+
+        resetMocks();
+
+        // Attach debugger
+        mbeanServer.invoke(on, "attach", null, null);
+
+        mock.expectedMessageCount(1);
+
+        resetMocks();
+
+        // Detach debugger
+        mbeanServer.invoke(on, "detach", null, null);
+
+        mock.expectedMessageCount(0);
+        mock.setSleepForEmptyTest(100);
+
+        template.sendBody("seda:start", "Hello World 2");
+        assertMockEndpointsSatisfied();
+
+        resetMocks();
+
+        // Attach debugger
+        mbeanServer.invoke(on, "attach", null, null);
+
+        mock.expectedMessageCount(1);
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
diff --git a/docs/user-manual/modules/ROOT/pages/debugger.adoc b/docs/user-manual/modules/ROOT/pages/debugger.adoc
index 1ac971f9b30..845ed592305 100644
--- a/docs/user-manual/modules/ROOT/pages/debugger.adoc
+++ b/docs/user-manual/modules/ROOT/pages/debugger.adoc
@@ -90,7 +90,12 @@ which can be used to extend for custom implementations.
 
 === JMX debugger
 
-There is also a xref:backlog-debugger.adoc[Backlog Debugger] which allows debugging from JMX.
+There is also a xref:backlog-debugger.adoc[Backlog Debugger] which allows debugging from JMX that is included into `camel-debug`.
+
+To be able to have enough time to add your breakpoints, you could need to suspend the message processing of Camel to make sure
+that you won't miss any messages. For this kind of need, you have to set the environment variable `CAMEL_DEBUGGER_SUSPEND` to `true`
+within the context of your application, then the `Backlog Debugger` suspends the message processing until the JMX operation `attach` is called. Calling the JMX operation `detach` suspends again the message processing.
+
 Several 3rd party tooling are using it:
 - https://hawt.io/[hawtio] uses this for its web based debugging functionality
 - https://marketplace.visualstudio.com/items?itemName=redhat.vscode-debug-adapter-apache-camel[VS Code Debug Adapter for Camel]
diff --git a/parent/pom.xml b/parent/pom.xml
index 67d70ff8e80..772cc4ed68b 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -341,6 +341,7 @@
         <junit-toolbox-version>2.4</junit-toolbox-version>
         <junit-version>4.13.2</junit-version>
         <junit-jupiter-version>5.8.2</junit-jupiter-version>
+        <junit-pioneer-version>1.7.1</junit-pioneer-version>
         <jxmpp-version>0.6.4</jxmpp-version>
         <jython-version>2.7.2</jython-version>
         <jython-standalone-version>2.7.2</jython-standalone-version>
@@ -3614,6 +3615,11 @@
                 <type>pom</type>
                 <scope>import</scope>
             </dependency>
+            <dependency>
+                <groupId>org.junit-pioneer</groupId>
+                <artifactId>junit-pioneer</artifactId>
+                <version>${junit-pioneer-version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.awaitility</groupId>
                 <artifactId>awaitility</artifactId>