You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/07/29 15:31:13 UTC

[camel] 01/02: CAMEL-19368: camel-core - Backlog tracer to capture endpoint uri of trace events when an exchange is send to endpoints with varioius EIPs.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch jbang-trace
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 314d2fa25343b6c15ed5c973ba619cd41c63bae7
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Jul 29 16:50:37 2023 +0200

    CAMEL-19368: camel-core - Backlog tracer to capture endpoint uri of trace events when an exchange is send to endpoints with varioius EIPs.
---
 .../camel/spi/BacklogTracerEventMessage.java       |  6 ++
 .../debugger/DefaultBacklogTracerEventMessage.java | 15 ++++
 .../camel/impl/engine/CamelInternalProcessor.java  | 86 +++++++++++++++++++++-
 .../apache/camel/impl/engine/DefaultChannel.java   |  2 +-
 .../apache/camel/support/HealthCheckComponent.java |  6 +-
 5 files changed, 110 insertions(+), 5 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
index 1529d3d4151..69c2ed251f2 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
@@ -121,6 +121,12 @@ public interface BacklogTracerEventMessage {
      */
     String getExceptionAsJSon();
 
+    /**
+     * The endpoint uri if this trace is either from a route input (from), or the exchange was sent to an endpoint such
+     * as (to, toD, wireTap) etc.
+     */
+    String getEndpointUri();
+
     /**
      * Dumps the event message as XML using the {@link #ROOT_TAG} as root tag.
      * <p/>
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
index bcdeb3a24b5..f5e4cc70723 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
@@ -40,6 +40,7 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven
     private final String toNode;
     private final String exchangeId;
     private final String threadName;
+    private String endpointUri;
     private final boolean rest;
     private final boolean template;
     private final String messageAsXml;
@@ -183,6 +184,14 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven
         this.exceptionAsJSon = exceptionAsJSon;
     }
 
+    public String getEndpointUri() {
+        return endpointUri;
+    }
+
+    public void setEndpointUri(String endpointUri) {
+        this.endpointUri = endpointUri;
+    }
+
     @Override
     public String toString() {
         return "DefaultBacklogTracerEventMessage[" + exchangeId + " at " + toNode + "]";
@@ -218,6 +227,9 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven
         }
         // route id is optional and we then use an empty value for no route id
         sb.append(prefix).append("  <routeId>").append(routeId != null ? routeId : "").append("</routeId>\n");
+        if (endpointUri != null) {
+            sb.append(prefix).append("  <endpointUri>").append(endpointUri).append("</endpointUri>\n");
+        }
         if (toNode != null) {
             sb.append(prefix).append("  <toNode>").append(toNode).append("</toNode>\n");
         } else {
@@ -254,6 +266,9 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven
         if (location != null) {
             jo.put("location", location);
         }
+        if (endpointUri != null) {
+            jo.put("endpointUri", endpointUri);
+        }
         if (routeId != null) {
             jo.put("routeId", routeId);
         }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 08dc47d675a..ba72efb9b63 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -19,11 +19,14 @@ package org.apache.camel.impl.engine;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Message;
@@ -39,8 +42,10 @@ import org.apache.camel.StreamCacheException;
 import org.apache.camel.impl.debugger.BacklogDebugger;
 import org.apache.camel.impl.debugger.BacklogTracer;
 import org.apache.camel.impl.debugger.DefaultBacklogTracerEventMessage;
+import org.apache.camel.spi.CamelEvent;
 import org.apache.camel.spi.CamelInternalProcessorAdvice;
 import org.apache.camel.spi.Debugger;
+import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.InternalProcessor;
 import org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor;
@@ -63,6 +68,7 @@ import org.apache.camel.support.LoggerHelper;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.OrderedComparator;
 import org.apache.camel.support.PluginHelper;
+import org.apache.camel.support.SimpleEventNotifierSupport;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
@@ -552,6 +558,8 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
     public static final class BacklogTracerAdvice
             implements CamelInternalProcessorAdvice<DefaultBacklogTracerEventMessage>, Ordered {
 
+        private final BacklogTraceAdviceEventNotifier notifier;
+        private final CamelContext camelContext;
         private final BacklogTracer backlogTracer;
         private final NamedNode processorDefinition;
         private final NamedRoute routeDefinition;
@@ -560,8 +568,9 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
         private final boolean template;
         private final boolean skip;
 
-        public BacklogTracerAdvice(BacklogTracer backlogTracer, NamedNode processorDefinition,
+        public BacklogTracerAdvice(CamelContext camelContext, BacklogTracer backlogTracer, NamedNode processorDefinition,
                                    NamedRoute routeDefinition, boolean first) {
+            this.camelContext = camelContext;
             this.backlogTracer = backlogTracer;
             this.processorDefinition = processorDefinition;
             this.routeDefinition = routeDefinition;
@@ -582,11 +591,28 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
             } else {
                 this.skip = false;
             }
+            this.notifier = getOrCreateEventNotifier(camelContext);
+        }
+
+        private BacklogTraceAdviceEventNotifier getOrCreateEventNotifier(CamelContext camelContext) {
+            // use a single instance of this event notifier
+            for (EventNotifier en : camelContext.getManagementStrategy().getEventNotifiers()) {
+                if (en instanceof BacklogTraceAdviceEventNotifier) {
+                    return (BacklogTraceAdviceEventNotifier) en;
+                }
+            }
+            BacklogTraceAdviceEventNotifier answer = new BacklogTraceAdviceEventNotifier();
+            camelContext.getManagementStrategy().addEventNotifier(answer);
+            return answer;
         }
 
         @Override
         public DefaultBacklogTracerEventMessage before(Exchange exchange) throws Exception {
             if (!skip && backlogTracer.shouldTrace(processorDefinition, exchange)) {
+
+                // to capture if the exchange was sent to an endpoint during this event
+                notifier.before(exchange);
+
                 long timestamp = System.currentTimeMillis();
                 String toNode = processorDefinition.getId();
                 String exchangeId = exchange.getExchangeId();
@@ -656,6 +682,25 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
 
         private void doneProcessing(Exchange exchange, DefaultBacklogTracerEventMessage data) {
             data.doneProcessing();
+
+            String uri = null;
+            Endpoint endpoint = notifier.after(exchange);
+            if (endpoint != null) {
+                uri = endpoint.getEndpointUri();
+            } else if ((data.isFirst() || data.isLast()) && data.getToNode() == null && routeDefinition != null) {
+                // pseudo first/last event (the from in the route)
+                Route route = camelContext.getRoute(routeDefinition.getRouteId());
+                if (route != null && route.getConsumer() != null) {
+                    // get the actual resolved uri
+                    uri = route.getConsumer().getEndpoint().getEndpointUri();
+                } else {
+                    uri = routeDefinition.getEndpointUrl();
+                }
+            }
+            if (uri != null) {
+                data.setEndpointUri(uri);
+            }
+
             if (!data.isFirst()) {
                 // we want to capture if there was an exception
                 Throwable e = exchange.getException();
@@ -1212,4 +1257,43 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
         }
     }
 
+    /**
+     * Event notifier for {@link BacklogTracerAdvice} to capture
+     * {@link Exchange} sent to endpoints during tracing.
+     */
+    private static final class BacklogTraceAdviceEventNotifier extends SimpleEventNotifierSupport {
+
+        private final Object dummy = new Object();
+
+        private final ConcurrentMap<Exchange, Object> uris = new ConcurrentHashMap<>();
+
+        public BacklogTraceAdviceEventNotifier() {
+            // only capture sending events
+            setIgnoreExchangeEvents(false);
+            setIgnoreExchangeSendingEvents(false);
+        }
+
+        @Override
+        public void notify(CamelEvent event) throws Exception {
+            if (event instanceof CamelEvent.ExchangeSendingEvent ess) {
+                Exchange e = ess.getExchange();
+                if (uris.containsKey(e)) {
+                    uris.put(e, ess.getEndpoint());
+                }
+            }
+        }
+
+        public void before(Exchange exchange) {
+            uris.put(exchange, dummy);
+        }
+
+        public Endpoint after(Exchange exchange) {
+            Object o = uris.remove(exchange);
+            if (o == dummy) {
+                return null;
+            }
+            return (Endpoint) o;
+        }
+
+    }
 }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
index a3ec2d77988..5d8ede6ca66 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
@@ -196,7 +196,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
         if (camelContext.isBacklogTracingStandby() || route.isBacklogTracing()) {
             // add jmx backlog tracer
             BacklogTracer backlogTracer = getOrCreateBacklogTracer(camelContext);
-            addAdvice(new BacklogTracerAdvice(backlogTracer, targetOutputDef, routeDefinition, first));
+            addAdvice(new BacklogTracerAdvice(camelContext, backlogTracer, targetOutputDef, routeDefinition, first));
         }
         if (route.isTracing() || camelContext.isTracingStandby()) {
             // add logger tracer
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/HealthCheckComponent.java b/core/camel-support/src/main/java/org/apache/camel/support/HealthCheckComponent.java
index 99cfd4f40f8..de0107f75a2 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/HealthCheckComponent.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/HealthCheckComponent.java
@@ -57,9 +57,9 @@ public abstract class HealthCheckComponent extends DefaultComponent {
     }
 
     /**
-     * Used for enabling or disabling all producer based health checks from this component.
-     * Notice: Camel has by default disabled all producer based health-checks.
-     * You can turn on producer checks globally by setting camel.health.producersEnabled=true.
+     * Used for enabling or disabling all producer based health checks from this component. Notice: Camel has by default
+     * disabled all producer based health-checks. You can turn on producer checks globally by setting
+     * camel.health.producersEnabled=true.
      */
     public void setHealthCheckProducerEnabled(boolean healthCheckProducerEnabled) {
         this.healthCheckProducerEnabled = healthCheckProducerEnabled;