You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/07/30 06:40:53 UTC
[camel] branch main updated: Jbang trace (#10905)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 88950472467 Jbang trace (#10905)
88950472467 is described below
commit 8895047246799a8ac623333b1e9789b35e5757af
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Jul 30 08:40:47 2023 +0200
Jbang trace (#10905)
* Deprecated some unusued exchange constants
* CAMEL-19368: camel-core - Backlog tracer to capture endpoint uri of trace events when an exchange is send to endpoints with varioius EIPs.
* CAMEL-19369: camel-jbang - Trace command to show endpoint uri
---
.../camel/spi/BacklogTracerEventMessage.java | 6 ++
.../debugger/DefaultBacklogTracerEventMessage.java | 15 ++++
.../camel/impl/engine/CamelInternalProcessor.java | 86 +++++++++++++++++++++-
.../apache/camel/impl/engine/DefaultChannel.java | 2 +-
.../core/commands/action/CamelTraceAction.java | 16 +++-
5 files changed, 122 insertions(+), 3 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
index 1529d3d4151..69c2ed251f2 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
@@ -121,6 +121,12 @@ public interface BacklogTracerEventMessage {
*/
String getExceptionAsJSon();
+ /**
+ * The endpoint uri if this trace is either from a route input (from), or the exchange was sent to an endpoint such
+ * as (to, toD, wireTap) etc.
+ */
+ String getEndpointUri();
+
/**
* Dumps the event message as XML using the {@link #ROOT_TAG} as root tag.
* <p/>
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
index bcdeb3a24b5..f5e4cc70723 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
@@ -40,6 +40,7 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven
private final String toNode;
private final String exchangeId;
private final String threadName;
+ private String endpointUri;
private final boolean rest;
private final boolean template;
private final String messageAsXml;
@@ -183,6 +184,14 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven
this.exceptionAsJSon = exceptionAsJSon;
}
+ public String getEndpointUri() {
+ return endpointUri;
+ }
+
+ public void setEndpointUri(String endpointUri) {
+ this.endpointUri = endpointUri;
+ }
+
@Override
public String toString() {
return "DefaultBacklogTracerEventMessage[" + exchangeId + " at " + toNode + "]";
@@ -218,6 +227,9 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven
}
// route id is optional and we then use an empty value for no route id
sb.append(prefix).append(" <routeId>").append(routeId != null ? routeId : "").append("</routeId>\n");
+ if (endpointUri != null) {
+ sb.append(prefix).append(" <endpointUri>").append(endpointUri).append("</endpointUri>\n");
+ }
if (toNode != null) {
sb.append(prefix).append(" <toNode>").append(toNode).append("</toNode>\n");
} else {
@@ -254,6 +266,9 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven
if (location != null) {
jo.put("location", location);
}
+ if (endpointUri != null) {
+ jo.put("endpointUri", endpointUri);
+ }
if (routeId != null) {
jo.put("routeId", routeId);
}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 08dc47d675a..ba72efb9b63 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -19,11 +19,14 @@ package org.apache.camel.impl.engine;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.RejectedExecutionException;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Message;
@@ -39,8 +42,10 @@ import org.apache.camel.StreamCacheException;
import org.apache.camel.impl.debugger.BacklogDebugger;
import org.apache.camel.impl.debugger.BacklogTracer;
import org.apache.camel.impl.debugger.DefaultBacklogTracerEventMessage;
+import org.apache.camel.spi.CamelEvent;
import org.apache.camel.spi.CamelInternalProcessorAdvice;
import org.apache.camel.spi.Debugger;
+import org.apache.camel.spi.EventNotifier;
import org.apache.camel.spi.InflightRepository;
import org.apache.camel.spi.InternalProcessor;
import org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor;
@@ -63,6 +68,7 @@ import org.apache.camel.support.LoggerHelper;
import org.apache.camel.support.MessageHelper;
import org.apache.camel.support.OrderedComparator;
import org.apache.camel.support.PluginHelper;
+import org.apache.camel.support.SimpleEventNotifierSupport;
import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.support.UnitOfWorkHelper;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
@@ -552,6 +558,8 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
public static final class BacklogTracerAdvice
implements CamelInternalProcessorAdvice<DefaultBacklogTracerEventMessage>, Ordered {
+ private final BacklogTraceAdviceEventNotifier notifier;
+ private final CamelContext camelContext;
private final BacklogTracer backlogTracer;
private final NamedNode processorDefinition;
private final NamedRoute routeDefinition;
@@ -560,8 +568,9 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
private final boolean template;
private final boolean skip;
- public BacklogTracerAdvice(BacklogTracer backlogTracer, NamedNode processorDefinition,
+ public BacklogTracerAdvice(CamelContext camelContext, BacklogTracer backlogTracer, NamedNode processorDefinition,
NamedRoute routeDefinition, boolean first) {
+ this.camelContext = camelContext;
this.backlogTracer = backlogTracer;
this.processorDefinition = processorDefinition;
this.routeDefinition = routeDefinition;
@@ -582,11 +591,28 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
} else {
this.skip = false;
}
+ this.notifier = getOrCreateEventNotifier(camelContext);
+ }
+
+ private BacklogTraceAdviceEventNotifier getOrCreateEventNotifier(CamelContext camelContext) {
+ // use a single instance of this event notifier
+ for (EventNotifier en : camelContext.getManagementStrategy().getEventNotifiers()) {
+ if (en instanceof BacklogTraceAdviceEventNotifier) {
+ return (BacklogTraceAdviceEventNotifier) en;
+ }
+ }
+ BacklogTraceAdviceEventNotifier answer = new BacklogTraceAdviceEventNotifier();
+ camelContext.getManagementStrategy().addEventNotifier(answer);
+ return answer;
}
@Override
public DefaultBacklogTracerEventMessage before(Exchange exchange) throws Exception {
if (!skip && backlogTracer.shouldTrace(processorDefinition, exchange)) {
+
+ // to capture if the exchange was sent to an endpoint during this event
+ notifier.before(exchange);
+
long timestamp = System.currentTimeMillis();
String toNode = processorDefinition.getId();
String exchangeId = exchange.getExchangeId();
@@ -656,6 +682,25 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
private void doneProcessing(Exchange exchange, DefaultBacklogTracerEventMessage data) {
data.doneProcessing();
+
+ String uri = null;
+ Endpoint endpoint = notifier.after(exchange);
+ if (endpoint != null) {
+ uri = endpoint.getEndpointUri();
+ } else if ((data.isFirst() || data.isLast()) && data.getToNode() == null && routeDefinition != null) {
+ // pseudo first/last event (the from in the route)
+ Route route = camelContext.getRoute(routeDefinition.getRouteId());
+ if (route != null && route.getConsumer() != null) {
+ // get the actual resolved uri
+ uri = route.getConsumer().getEndpoint().getEndpointUri();
+ } else {
+ uri = routeDefinition.getEndpointUrl();
+ }
+ }
+ if (uri != null) {
+ data.setEndpointUri(uri);
+ }
+
if (!data.isFirst()) {
// we want to capture if there was an exception
Throwable e = exchange.getException();
@@ -1212,4 +1257,43 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
}
}
+ /**
+ * Event notifier for {@link BacklogTracerAdvice} to capture
+ * {@link Exchange} sent to endpoints during tracing.
+ */
+ private static final class BacklogTraceAdviceEventNotifier extends SimpleEventNotifierSupport {
+
+ private final Object dummy = new Object();
+
+ private final ConcurrentMap<Exchange, Object> uris = new ConcurrentHashMap<>();
+
+ public BacklogTraceAdviceEventNotifier() {
+ // only capture sending events
+ setIgnoreExchangeEvents(false);
+ setIgnoreExchangeSendingEvents(false);
+ }
+
+ @Override
+ public void notify(CamelEvent event) throws Exception {
+ if (event instanceof CamelEvent.ExchangeSendingEvent ess) {
+ Exchange e = ess.getExchange();
+ if (uris.containsKey(e)) {
+ uris.put(e, ess.getEndpoint());
+ }
+ }
+ }
+
+ public void before(Exchange exchange) {
+ uris.put(exchange, dummy);
+ }
+
+ public Endpoint after(Exchange exchange) {
+ Object o = uris.remove(exchange);
+ if (o == dummy) {
+ return null;
+ }
+ return (Endpoint) o;
+ }
+
+ }
}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
index a3ec2d77988..5d8ede6ca66 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
@@ -196,7 +196,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel {
if (camelContext.isBacklogTracingStandby() || route.isBacklogTracing()) {
// add jmx backlog tracer
BacklogTracer backlogTracer = getOrCreateBacklogTracer(camelContext);
- addAdvice(new BacklogTracerAdvice(backlogTracer, targetOutputDef, routeDefinition, first));
+ addAdvice(new BacklogTracerAdvice(camelContext, backlogTracer, targetOutputDef, routeDefinition, first));
}
if (route.isTracing() || camelContext.isTracingStandby()) {
// add logger tracer
diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java
index 9c1799584a6..e9269b91ca5 100644
--- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java
+++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java
@@ -42,6 +42,7 @@ import org.apache.camel.util.IOHelper;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.StringHelper;
import org.apache.camel.util.TimeUtils;
+import org.apache.camel.util.URISupport;
import org.apache.camel.util.json.JsonArray;
import org.apache.camel.util.json.JsonObject;
import org.apache.camel.util.json.Jsoner;
@@ -137,6 +138,10 @@ public class CamelTraceAction extends ActionBaseCommand {
description = "Only output traces from the latest (follow if necessary until complete and exit)")
boolean latest;
+ @CommandLine.Option(names = { "--mask" },
+ description = "Whether to mask endpoint URIs to avoid printing sensitive information such as password or access keys")
+ boolean mask;
+
@CommandLine.Option(names = { "--pretty" },
description = "Pretty print message body when using JSon or XML format")
boolean pretty;
@@ -421,6 +426,14 @@ public class CamelTraceAction extends ActionBaseCommand {
row.location = jo.getString("location");
row.routeId = jo.getString("routeId");
row.nodeId = jo.getString("nodeId");
+ String uri = jo.getString("endpointUri");
+ if (uri != null) {
+ row.endpoint = new JsonObject();
+ if (mask) {
+ uri = URISupport.sanitizeUri(uri);
+ }
+ row.endpoint.put("endpoint", uri);
+ }
Long ts = jo.getLong("timestamp");
if (ts != null) {
row.timestamp = ts;
@@ -696,7 +709,7 @@ public class CamelTraceAction extends ActionBaseCommand {
}
private String getDataAsTable(Row r) {
- return tableHelper.getDataAsTable(r.exchangeId, r.exchangePattern, null, r.message, r.exception);
+ return tableHelper.getDataAsTable(r.exchangeId, r.exchangePattern, r.endpoint, r.message, r.exception);
}
private String getElapsed(Row r) {
@@ -780,6 +793,7 @@ public class CamelTraceAction extends ActionBaseCommand {
long elapsed;
boolean done;
boolean failed;
+ JsonObject endpoint;
JsonObject message;
JsonObject exception;