You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/06/23 11:46:41 UTC

[camel] branch CAMEL-18217/suspend-messages updated (22afcbd0418 -> 5d152975ab5)

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a change to branch CAMEL-18217/suspend-messages
in repository https://gitbox.apache.org/repos/asf/camel.git


 discard 22afcbd0418 CAMEL-18217: debugger - Allow to suspend messages
     new 5d152975ab5 CAMEL-18217: debugger - Allow to suspend messages

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (22afcbd0418)
            \
             N -- N -- N   refs/heads/CAMEL-18217/suspend-messages (5d152975ab5)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../camel/impl/debugger/BacklogDebugger.java       | 23 +++++++++++-----------
 1 file changed, 12 insertions(+), 11 deletions(-)


[camel] 01/01: CAMEL-18217: debugger - Allow to suspend messages

Posted by nf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch CAMEL-18217/suspend-messages
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5d152975ab507e054beb34f02327a63ddfb44376
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Thu Jun 23 12:16:31 2022 +0200

    CAMEL-18217: debugger - Allow to suspend messages
---
 camel-dependencies/pom.xml                         |   1 +
 .../camel/impl/debugger/BacklogDebugger.java       | 104 +++++++++++++++++++--
 .../camel/impl/engine/CamelInternalProcessor.java  |  10 +-
 .../mbean/ManagedBacklogDebuggerMBean.java         |   6 ++
 core/camel-management/pom.xml                      |   5 +
 .../management/mbean/ManagedBacklogDebugger.java   |  10 ++
 .../camel/management/BacklogDebuggerTest.java      |  47 ++++++++++
 docs/user-manual/modules/ROOT/pages/debugger.adoc  |   7 +-
 parent/pom.xml                                     |   6 ++
 9 files changed, 180 insertions(+), 16 deletions(-)

diff --git a/camel-dependencies/pom.xml b/camel-dependencies/pom.xml
index 822dcd37cef..a9b2b7b1744 100644
--- a/camel-dependencies/pom.xml
+++ b/camel-dependencies/pom.xml
@@ -355,6 +355,7 @@
     <jt400-version>11.0</jt400-version>
     <jta-api-1.2-version>1.2</jta-api-1.2-version>
     <junit-jupiter-version>5.8.2</junit-jupiter-version>
+    <junit-pioneer-version>1.7.1</junit-pioneer-version>
     <junit-toolbox-version>2.4</junit-toolbox-version>
     <junit-version>4.13.2</junit-version>
     <jxmpp-version>0.6.4</jxmpp-version>
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
index a7e92b902c3..246bb05bd27 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -44,6 +45,7 @@ import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +63,11 @@ import org.slf4j.LoggerFactory;
  */
 public final class BacklogDebugger extends ServiceSupport {
 
+    /**
+     * The name of the environment variable that contains the value of the flag indicating whether the
+     * {@code BacklogDebugger} should suspend processing the messages and wait for a debugger to attach or not.
+     */
+    public static final String SUSPEND_MODE_ENV_VAR_NAME = "CAMEL_DEBUGGER_SUSPEND";
     private static final Logger LOG = LoggerFactory.getLogger(BacklogDebugger.class);
 
     private long fallbackTimeout = 300;
@@ -73,6 +80,15 @@ public final class BacklogDebugger extends ServiceSupport {
     private final ConcurrentMap<String, NodeBreakpoint> breakpoints = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, SuspendedExchange> suspendedBreakpoints = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, BacklogTracerEventMessage> suspendedBreakpointMessages = new ConcurrentHashMap<>();
+    /**
+     * Indicates whether the <i>suspend mode</i> is enabled or not.
+     */
+    private final boolean suspendMode;
+    /**
+     * The reference to the {@code CountDownLatch} used to suspend Camel from processing the incoming messages when the
+     * <i>suspend mode</i> is enabled.
+     */
+    private final AtomicReference<CountDownLatch> suspend = new AtomicReference<>();
     private volatile String singleStepExchangeId;
     private int bodyMaxChars = 128 * 1024;
     private boolean bodyIncludeStreams;
@@ -103,19 +119,31 @@ public final class BacklogDebugger extends ServiceSupport {
         }
     }
 
-    private BacklogDebugger(CamelContext camelContext) {
+    /**
+     * Constructs a {@code BacklogDebugger} with the given parameters.
+     *
+     * @param camelContext the camel context
+     * @param suspendMode  Indicates whether the <i>suspend mode</i> is enabled or not. If {@code true} the message
+     *                     processing is immediately suspended until the {@link #attach()} is called.
+     */
+    private BacklogDebugger(CamelContext camelContext, boolean suspendMode) {
         this.camelContext = camelContext;
         this.debugger = new DefaultDebugger(camelContext);
+        this.suspendMode = suspendMode;
+        detach();
     }
 
     /**
      * Creates a new backlog debugger.
+     * <p>
+     * In case the environment variable {@link #SUSPEND_MODE_ENV_VAR_NAME} has been set to {@code true}, the message
+     * processing is directly suspended.
      *
      * @param  context Camel context
      * @return         a new backlog debugger
      */
     public static BacklogDebugger createDebugger(CamelContext context) {
-        return new BacklogDebugger(context);
+        return new BacklogDebugger(context, Boolean.parseBoolean(System.getenv(SUSPEND_MODE_ENV_VAR_NAME)));
     }
 
     /**
@@ -169,6 +197,65 @@ public final class BacklogDebugger extends ServiceSupport {
         return singleStepExchangeId != null;
     }
 
+    /**
+     * Attach the debugger which will resume the message processing in case the <i>suspend mode</i> is enabled. Do
+     * nothing otherwise.
+     */
+    public void attach() {
+        if (suspendMode) {
+            logger.log("A debugger has been attached");
+            resumeMessageProcessing();
+        }
+    }
+
+    /**
+     * Detach the debugger which will suspend the message processing in case the <i>suspend mode</i> is enabled. Do
+     * nothing otherwise.
+     */
+    public void detach() {
+        if (suspendMode) {
+            logger.log("Waiting for a debugger to attach");
+            suspendMessageProcessing();
+        }
+    }
+
+    /**
+     * Suspend the current thread if the <i>suspend mode</i> is enabled as long as the method {@link #attach()} is not
+     * called. Do nothing otherwise.
+     */
+    private void suspendIfNeeded() {
+        final CountDownLatch countDownLatch = suspend.get();
+        if (countDownLatch != null) {
+            logger.log("Incoming message suspended");
+            try {
+                countDownLatch.await();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Make Camel suspend processing incoming messages.
+     */
+    private void suspendMessageProcessing() {
+        suspend.compareAndSet(null, new CountDownLatch(1));
+    }
+
+    /**
+     * Resume the processing of the incoming messages.
+     */
+    private void resumeMessageProcessing() {
+        for (;;) {
+            final CountDownLatch countDownLatch = suspend.get();
+            if (countDownLatch == null) {
+                break;
+            } else if (suspend.compareAndSet(countDownLatch, null)) {
+                countDownLatch.countDown();
+            }
+        }
+    }
+
     public void addBreakpoint(String nodeId) {
         NodeBreakpoint breakpoint = breakpoints.get(nodeId);
         if (breakpoint == null) {
@@ -454,13 +541,18 @@ public final class BacklogDebugger extends ServiceSupport {
         debugCounter.set(0);
     }
 
-    public boolean beforeProcess(Exchange exchange, Processor processor, NamedNode definition) {
-        return debugger.beforeProcess(exchange, processor, definition);
+    public StopWatch beforeProcess(Exchange exchange, Processor processor, NamedNode definition) {
+        suspendIfNeeded();
+        if (isEnabled() && (hasBreakpoint(definition.getId()) || isSingleStepMode())) {
+            StopWatch watch = new StopWatch();
+            debugger.beforeProcess(exchange, processor, definition);
+            return watch;
+        }
+        return null;
     }
 
-    public boolean afterProcess(Exchange exchange, Processor processor, NamedNode definition, long timeTaken) {
+    public void afterProcess(Exchange exchange, Processor processor, NamedNode definition, long timeTaken) {
         // noop
-        return false;
     }
 
     @Override
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 6a031a1f402..7babd956ea8 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -639,24 +639,16 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
         private final BacklogDebugger backlogDebugger;
         private final Processor target;
         private final NamedNode definition;
-        private final String nodeId;
 
         public BacklogDebuggerAdvice(BacklogDebugger backlogDebugger, Processor target, NamedNode definition) {
             this.backlogDebugger = backlogDebugger;
             this.target = target;
             this.definition = definition;
-            this.nodeId = definition.getId();
         }
 
         @Override
         public StopWatch before(Exchange exchange) throws Exception {
-            if (backlogDebugger.isEnabled() && (backlogDebugger.hasBreakpoint(nodeId) || backlogDebugger.isSingleStepMode())) {
-                StopWatch watch = new StopWatch();
-                backlogDebugger.beforeProcess(exchange, target, definition);
-                return watch;
-            } else {
-                return null;
-            }
+            return backlogDebugger.beforeProcess(exchange, target, definition);
         }
 
         @Override
diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
index b824a84c356..57c2f7573e8 100644
--- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
@@ -175,4 +175,10 @@ public interface ManagedBacklogDebuggerMBean {
 
     @ManagedOperation(description = "Returns the message history at the given node id as XML")
     String messageHistoryOnBreakpointAsXml(String nodeId);
+
+    @ManagedOperation(description = "Attach the debugger")
+    void attach();
+
+    @ManagedOperation(description = "Detach the debugger")
+    void detach();
 }
diff --git a/core/camel-management/pom.xml b/core/camel-management/pom.xml
index 577572d5cef..1e225b27d4c 100644
--- a/core/camel-management/pom.xml
+++ b/core/camel-management/pom.xml
@@ -69,6 +69,11 @@
             <artifactId>awaitility</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.junit-pioneer</groupId>
+            <artifactId>junit-pioneer</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-core</artifactId>
diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
index 0f78390a6fc..0fb57d6fc0b 100644
--- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
+++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
@@ -441,6 +441,16 @@ public class ManagedBacklogDebugger implements ManagedBacklogDebuggerMBean {
         return messageHistoryBuffer.toString();
     }
 
+    @Override
+    public void attach() {
+        backlogDebugger.attach();
+    }
+
+    @Override
+    public void detach() {
+        backlogDebugger.detach();
+    }
+
     private String dumpExchangePropertiesAsXml(String id) {
         StringBuilder sb = new StringBuilder();
         sb.append("  <exchangeProperties>\n");
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java b/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
index 01b8702f5d8..24a1e03f758 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
@@ -24,9 +24,11 @@ import javax.management.ObjectName;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.debugger.BacklogDebugger;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
+import org.junitpioneer.jupiter.SetEnvironmentVariable;
 
 import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -890,6 +892,51 @@ public class BacklogDebuggerTest extends ManagementTestSupport {
         assertEquals(0, nodes.size());
     }
 
+    /**
+     * Ensure that the suspend mode works as expected.
+     */
+    @Test
+    @SetEnvironmentVariable(key = BacklogDebugger.SUSPEND_MODE_ENV_VAR_NAME, value = "true")
+    public void testSuspendMode() throws Exception {
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on = new ObjectName(
+                "org.apache.camel:context=" + context.getManagementName() + ",type=tracer,name=BacklogDebugger");
+        assertNotNull(on);
+        mbeanServer.isRegistered(on);
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+        mock.setSleepForEmptyTest(100);
+
+        template.sendBody("seda:start", "Hello World");
+        assertMockEndpointsSatisfied();
+
+        resetMocks();
+
+        // Attach debugger
+        mbeanServer.invoke(on, "attach", null, null);
+
+        mock.expectedMessageCount(1);
+
+        resetMocks();
+
+        // Detach debugger
+        mbeanServer.invoke(on, "detach", null, null);
+
+        mock.expectedMessageCount(0);
+        mock.setSleepForEmptyTest(100);
+
+        template.sendBody("seda:start", "Hello World 2");
+        assertMockEndpointsSatisfied();
+
+        resetMocks();
+
+        // Attach debugger
+        mbeanServer.invoke(on, "attach", null, null);
+
+        mock.expectedMessageCount(1);
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
diff --git a/docs/user-manual/modules/ROOT/pages/debugger.adoc b/docs/user-manual/modules/ROOT/pages/debugger.adoc
index 1ac971f9b30..845ed592305 100644
--- a/docs/user-manual/modules/ROOT/pages/debugger.adoc
+++ b/docs/user-manual/modules/ROOT/pages/debugger.adoc
@@ -90,7 +90,12 @@ which can be used to extend for custom implementations.
 
 === JMX debugger
 
-There is also a xref:backlog-debugger.adoc[Backlog Debugger] which allows debugging from JMX.
+There is also a xref:backlog-debugger.adoc[Backlog Debugger] which allows debugging from JMX that is included into `camel-debug`.
+
+To be able to have enough time to add your breakpoints, you could need to suspend the message processing of Camel to make sure
+that you won't miss any messages. For this kind of need, you have to set the environment variable `CAMEL_DEBUGGER_SUSPEND` to `true`
+within the context of your application, then the `Backlog Debugger` suspends the message processing until the JMX operation `attach` is called. Calling the JMX operation `detach` suspends again the message processing.
+
 Several 3rd party tooling are using it:
 - https://hawt.io/[hawtio] uses this for its web based debugging functionality
 - https://marketplace.visualstudio.com/items?itemName=redhat.vscode-debug-adapter-apache-camel[VS Code Debug Adapter for Camel]
diff --git a/parent/pom.xml b/parent/pom.xml
index 67d70ff8e80..772cc4ed68b 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -341,6 +341,7 @@
         <junit-toolbox-version>2.4</junit-toolbox-version>
         <junit-version>4.13.2</junit-version>
         <junit-jupiter-version>5.8.2</junit-jupiter-version>
+        <junit-pioneer-version>1.7.1</junit-pioneer-version>
         <jxmpp-version>0.6.4</jxmpp-version>
         <jython-version>2.7.2</jython-version>
         <jython-standalone-version>2.7.2</jython-standalone-version>
@@ -3614,6 +3615,11 @@
                 <type>pom</type>
                 <scope>import</scope>
             </dependency>
+            <dependency>
+                <groupId>org.junit-pioneer</groupId>
+                <artifactId>junit-pioneer</artifactId>
+                <version>${junit-pioneer-version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.awaitility</groupId>
                 <artifactId>awaitility</artifactId>