You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/07/29 15:31:12 UTC

[camel] branch jbang-trace created (now b1d6ce1e6d8)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch jbang-trace
in repository https://gitbox.apache.org/repos/asf/camel.git


      at b1d6ce1e6d8 CAMEL-19369: camel-jbang - Trace command to show endpoint uri

This branch includes the following new commits:

     new 314d2fa2534 CAMEL-19368: camel-core - Backlog tracer to capture endpoint uri of trace events when an exchange is send to endpoints with varioius EIPs.
     new b1d6ce1e6d8 CAMEL-19369: camel-jbang - Trace command to show endpoint uri

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel] 01/02: CAMEL-19368: camel-core - Backlog tracer to capture endpoint uri of trace events when an exchange is send to endpoints with varioius EIPs.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch jbang-trace
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 314d2fa25343b6c15ed5c973ba619cd41c63bae7
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Jul 29 16:50:37 2023 +0200

    CAMEL-19368: camel-core - Backlog tracer to capture endpoint uri of trace events when an exchange is send to endpoints with varioius EIPs.
---
 .../camel/spi/BacklogTracerEventMessage.java       |  6 ++
 .../debugger/DefaultBacklogTracerEventMessage.java | 15 ++++
 .../camel/impl/engine/CamelInternalProcessor.java  | 86 +++++++++++++++++++++-
 .../apache/camel/impl/engine/DefaultChannel.java   |  2 +-
 .../apache/camel/support/HealthCheckComponent.java |  6 +-
 5 files changed, 110 insertions(+), 5 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
index 1529d3d4151..69c2ed251f2 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
@@ -121,6 +121,12 @@ public interface BacklogTracerEventMessage {
      */
     String getExceptionAsJSon();
 
+    /**
+     * The endpoint uri if this trace is either from a route input (from), or the exchange was sent to an endpoint such
+     * as (to, toD, wireTap) etc.
+     */
+    String getEndpointUri();
+
     /**
      * Dumps the event message as XML using the {@link #ROOT_TAG} as root tag.
      * <p/>
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
index bcdeb3a24b5..f5e4cc70723 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
@@ -40,6 +40,7 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven
     private final String toNode;
     private final String exchangeId;
     private final String threadName;
+    private String endpointUri;
     private final boolean rest;
     private final boolean template;
     private final String messageAsXml;
@@ -183,6 +184,14 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven
         this.exceptionAsJSon = exceptionAsJSon;
     }
 
+    public String getEndpointUri() {
+        return endpointUri;
+    }
+
+    public void setEndpointUri(String endpointUri) {
+        this.endpointUri = endpointUri;
+    }
+
     @Override
     public String toString() {
         return "DefaultBacklogTracerEventMessage[" + exchangeId + " at " + toNode + "]";
@@ -218,6 +227,9 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven
         }
         // route id is optional and we then use an empty value for no route id
         sb.append(prefix).append("  <routeId>").append(routeId != null ? routeId : "").append("</routeId>\n");
+        if (endpointUri != null) {
+            sb.append(prefix).append("  <endpointUri>").append(endpointUri).append("</endpointUri>\n");
+        }
         if (toNode != null) {
             sb.append(prefix).append("  <toNode>").append(toNode).append("</toNode>\n");
         } else {
@@ -254,6 +266,9 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven
         if (location != null) {
             jo.put("location", location);
         }
+        if (endpointUri != null) {
+            jo.put("endpointUri", endpointUri);
+        }
         if (routeId != null) {
             jo.put("routeId", routeId);
         }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 08dc47d675a..ba72efb9b63 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -19,11 +19,14 @@ package org.apache.camel.impl.engine;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Message;
@@ -39,8 +42,10 @@ import org.apache.camel.StreamCacheException;
 import org.apache.camel.impl.debugger.BacklogDebugger;
 import org.apache.camel.impl.debugger.BacklogTracer;
 import org.apache.camel.impl.debugger.DefaultBacklogTracerEventMessage;
+import org.apache.camel.spi.CamelEvent;
 import org.apache.camel.spi.CamelInternalProcessorAdvice;
 import org.apache.camel.spi.Debugger;
+import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.InternalProcessor;
 import org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor;
@@ -63,6 +68,7 @@ import org.apache.camel.support.LoggerHelper;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.OrderedComparator;
 import org.apache.camel.support.PluginHelper;
+import org.apache.camel.support.SimpleEventNotifierSupport;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
@@ -552,6 +558,8 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
     public static final class BacklogTracerAdvice
             implements CamelInternalProcessorAdvice<DefaultBacklogTracerEventMessage>, Ordered {
 
+        private final BacklogTraceAdviceEventNotifier notifier;
+        private final CamelContext camelContext;
         private final BacklogTracer backlogTracer;
         private final NamedNode processorDefinition;
         private final NamedRoute routeDefinition;
@@ -560,8 +568,9 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
         private final boolean template;
         private final boolean skip;
 
-        public BacklogTracerAdvice(BacklogTracer backlogTracer, NamedNode processorDefinition,
+        public BacklogTracerAdvice(CamelContext camelContext, BacklogTracer backlogTracer, NamedNode processorDefinition,
                                    NamedRoute routeDefinition, boolean first) {
+            this.camelContext = camelContext;
             this.backlogTracer = backlogTracer;
             this.processorDefinition = processorDefinition;
             this.routeDefinition = routeDefinition;
@@ -582,11 +591,28 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
             } else {
                 this.skip = false;
             }
+            this.notifier = getOrCreateEventNotifier(camelContext);
+        }
+
+        private BacklogTraceAdviceEventNotifier getOrCreateEventNotifier(CamelContext camelContext) {
+            // use a single instance of this event notifier
+            for (EventNotifier en : camelContext.getManagementStrategy().getEventNotifiers()) {
+                if (en instanceof BacklogTraceAdviceEventNotifier) {
+                    return (BacklogTraceAdviceEventNotifier) en;
+                }
+            }
+            BacklogTraceAdviceEventNotifier answer = new BacklogTraceAdviceEventNotifier();
+            camelContext.getManagementStrategy().addEventNotifier(answer);
+            return answer;
         }
 
         @Override
         public DefaultBacklogTracerEventMessage before(Exchange exchange) throws Exception {
             if (!skip && backlogTracer.shouldTrace(processorDefinition, exchange)) {
+
+                // to capture if the exchange was sent to an endpoint during this event
+                notifier.before(exchange);
+
                 long timestamp = System.currentTimeMillis();
                 String toNode = processorDefinition.getId();
                 String exchangeId = exchange.getExchangeId();
@@ -656,6 +682,25 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
 
         private void doneProcessing(Exchange exchange, DefaultBacklogTracerEventMessage data) {
             data.doneProcessing();
+
+            String uri = null;
+            Endpoint endpoint = notifier.after(exchange);
+            if (endpoint != null) {
+                uri = endpoint.getEndpointUri();
+            } else if ((data.isFirst() || data.isLast()) && data.getToNode() == null && routeDefinition != null) {
+                // pseudo first/last event (the from in the route)
+                Route route = camelContext.getRoute(routeDefinition.getRouteId());
+                if (route != null && route.getConsumer() != null) {
+                    // get the actual resolved uri
+                    uri = route.getConsumer().getEndpoint().getEndpointUri();
+                } else {
+                    uri = routeDefinition.getEndpointUrl();
+                }
+            }
+            if (uri != null) {
+                data.setEndpointUri(uri);
+            }
+
             if (!data.isFirst()) {
                 // we want to capture if there was an exception
                 Throwable e = exchange.getException();
@@ -1212,4 +1257,43 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
         }
     }
 
+    /**
+     * Event notifier for {@link BacklogTracerAdvice} to capture
+     * {@link Exchange} sent to endpoints during tracing.
+     */
+    private static final class BacklogTraceAdviceEventNotifier extends SimpleEventNotifierSupport {
+
+        private final Object dummy = new Object();
+
+        private final ConcurrentMap<Exchange, Object> uris = new ConcurrentHashMap<>();
+
+        public BacklogTraceAdviceEventNotifier() {
+            // only capture sending events
+            setIgnoreExchangeEvents(false);
+            setIgnoreExchangeSendingEvents(false);
+        }
+
+        @Override
+        public void notify(CamelEvent event) throws Exception {
+            if (event instanceof CamelEvent.ExchangeSendingEvent ess) {
+                Exchange e = ess.getExchange();
+                if (uris.containsKey(e)) {
+                    uris.put(e, ess.getEndpoint());
+                }
+            }
+        }
+
+        public void before(Exchange exchange) {
+            uris.put(exchange, dummy);
+        }
+
+        public Endpoint after(Exchange exchange) {
+            Object o = uris.remove(exchange);
+            if (o == dummy) {
+                return null;
+            }
+            return (Endpoint) o;
+        }
+
+    }
 }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
index a3ec2d77988..5d8ede6ca66 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
@@ -196,7 +196,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
         if (camelContext.isBacklogTracingStandby() || route.isBacklogTracing()) {
             // add jmx backlog tracer
             BacklogTracer backlogTracer = getOrCreateBacklogTracer(camelContext);
-            addAdvice(new BacklogTracerAdvice(backlogTracer, targetOutputDef, routeDefinition, first));
+            addAdvice(new BacklogTracerAdvice(camelContext, backlogTracer, targetOutputDef, routeDefinition, first));
         }
         if (route.isTracing() || camelContext.isTracingStandby()) {
             // add logger tracer
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/HealthCheckComponent.java b/core/camel-support/src/main/java/org/apache/camel/support/HealthCheckComponent.java
index 99cfd4f40f8..de0107f75a2 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/HealthCheckComponent.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/HealthCheckComponent.java
@@ -57,9 +57,9 @@ public abstract class HealthCheckComponent extends DefaultComponent {
     }
 
     /**
-     * Used for enabling or disabling all producer based health checks from this component.
-     * Notice: Camel has by default disabled all producer based health-checks.
-     * You can turn on producer checks globally by setting camel.health.producersEnabled=true.
+     * Used for enabling or disabling all producer based health checks from this component. Notice: Camel has by default
+     * disabled all producer based health-checks. You can turn on producer checks globally by setting
+     * camel.health.producersEnabled=true.
      */
     public void setHealthCheckProducerEnabled(boolean healthCheckProducerEnabled) {
         this.healthCheckProducerEnabled = healthCheckProducerEnabled;


[camel] 02/02: CAMEL-19369: camel-jbang - Trace command to show endpoint uri

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch jbang-trace
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b1d6ce1e6d897e0d12edc278ac87359db21387d1
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Jul 29 17:29:49 2023 +0200

    CAMEL-19369: camel-jbang - Trace command to show endpoint uri
---
 .../dsl/jbang/core/commands/action/CamelTraceAction.java | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java
index 9c1799584a6..e9269b91ca5 100644
--- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java
+++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java
@@ -42,6 +42,7 @@ import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.StringHelper;
 import org.apache.camel.util.TimeUtils;
+import org.apache.camel.util.URISupport;
 import org.apache.camel.util.json.JsonArray;
 import org.apache.camel.util.json.JsonObject;
 import org.apache.camel.util.json.Jsoner;
@@ -137,6 +138,10 @@ public class CamelTraceAction extends ActionBaseCommand {
                         description = "Only output traces from the latest (follow if necessary until complete and exit)")
     boolean latest;
 
+    @CommandLine.Option(names = { "--mask" },
+                        description = "Whether to mask endpoint URIs to avoid printing sensitive information such as password or access keys")
+    boolean mask;
+
     @CommandLine.Option(names = { "--pretty" },
                         description = "Pretty print message body when using JSon or XML format")
     boolean pretty;
@@ -421,6 +426,14 @@ public class CamelTraceAction extends ActionBaseCommand {
                     row.location = jo.getString("location");
                     row.routeId = jo.getString("routeId");
                     row.nodeId = jo.getString("nodeId");
+                    String uri = jo.getString("endpointUri");
+                    if (uri != null) {
+                        row.endpoint = new JsonObject();
+                        if (mask) {
+                            uri = URISupport.sanitizeUri(uri);
+                        }
+                        row.endpoint.put("endpoint", uri);
+                    }
                     Long ts = jo.getLong("timestamp");
                     if (ts != null) {
                         row.timestamp = ts;
@@ -696,7 +709,7 @@ public class CamelTraceAction extends ActionBaseCommand {
     }
 
     private String getDataAsTable(Row r) {
-        return tableHelper.getDataAsTable(r.exchangeId, r.exchangePattern, null, r.message, r.exception);
+        return tableHelper.getDataAsTable(r.exchangeId, r.exchangePattern, r.endpoint, r.message, r.exception);
     }
 
     private String getElapsed(Row r) {
@@ -780,6 +793,7 @@ public class CamelTraceAction extends ActionBaseCommand {
         long elapsed;
         boolean done;
         boolean failed;
+        JsonObject endpoint;
         JsonObject message;
         JsonObject exception;