You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bv...@apache.org on 2022/05/05 19:08:39 UTC

[camel] 01/01: CAMEL-18055: Create tracing SpanDecorator for ServiceBusComponent

This is an automated email from the ASF dual-hosted git repository.

bvahdat pushed a commit to branch CAMEL-18055
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3179795c3f165b54b0acf307547f9c5eb35a1f2a
Author: Babak Vahdat <bv...@apache.org>
AuthorDate: Thu May 5 21:08:14 2022 +0200

    CAMEL-18055: Create tracing SpanDecorator for ServiceBusComponent
---
 .../decorators/ServiceBusSpanDecorator.java        | 108 +++++++++++++++++++++
 .../decorators/ServiceBusSpanDecoratorTest.java    |  81 ++++++++++++++++
 2 files changed, 189 insertions(+)

diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecorator.java b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecorator.java
new file mode 100644
index 00000000000..5a38f4489a0
--- /dev/null
+++ b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecorator.java
@@ -0,0 +1,108 @@
+package org.apache.camel.tracing.decorators;
+
+import java.time.Duration;
+import java.time.OffsetDateTime;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.tracing.SpanAdapter;
+
+public class ServiceBusSpanDecorator extends AbstractMessagingSpanDecorator {
+
+    static final String SERVICEBUS_CONTENT_TYPE = "contentType";
+    static final String SERVICEBUS_CORRELATION_ID = "correlationId";
+    static final String SERVICEBUS_DELIVERY_COUNT = "deliveryCount";
+    static final String SERVICEBUS_ENQUEUED_SEQUENCE_NUMBER = "enqueuedSequenceNumber";
+    static final String SERVICEBUS_ENQUEUED_TIME = "enqueuedTime";
+    static final String SERVICEBUS_EXPIRES_AT = "expiresAt";
+    static final String SERVICEBUS_PARTITION_KEY = "partitionKey";
+    static final String SERVICEBUS_REPLY_TO_SESSION_ID = "replyToSessionId";
+    static final String SERVICEBUS_SESSION_ID = "sessionId";
+    static final String SERVICEBUS_TIME_TO_LIVE = "ttl";
+
+    /**
+     * Constants copied from {@link org.apache.camel.component.azure.servicebus.ServiceBusConstants}
+     */
+    static final String CONTENT_TYPE = "CamelAzureServiceBusContentType";
+    static final String CORRELATION_ID = "CamelAzureServiceBusCorrelationId";
+    static final String DELIVERY_COUNT = "CamelAzureServiceBusDeliveryCount";
+    static final String ENQUEUED_SEQUENCE_NUMBER = "CamelAzureServiceBusEnqueuedSequenceNumber";
+    static final String ENQUEUED_TIME = "CamelAzureServiceBusEnqueuedTime";
+    static final String EXPIRES_AT = "CamelAzureServiceBusExpiresAt";
+    static final String MESSAGE_ID = "CamelAzureServiceBusMessageId";
+    static final String SESSION_ID = "CamelAzureServiceBusSessionId";
+    static final String REPLY_TO_SESSION_ID = "CamelAzureServiceBusReplyToSessionId";
+    static final String PARTITION_KEY = "CamelAzureServiceBusPartitionKey";
+    static final String TIME_TO_LIVE = "CamelAzureServiceBusTimeToLive";
+
+    @Override
+    public String getComponent() {
+        return "azure-servicebus";
+    }
+
+    @Override
+    public String getComponentClassName() {
+        return "org.apache.camel.component.azure.servicebus.ServiceBusComponent";
+    }
+
+    @Override
+    public void pre(SpanAdapter span, Exchange exchange, Endpoint endpoint) {
+        super.pre(span, exchange, endpoint);
+
+        String contentType = exchange.getIn().getHeader(CONTENT_TYPE, String.class);
+        if (contentType != null) {
+            span.setTag(SERVICEBUS_CONTENT_TYPE, contentType);
+        }
+
+        String correlationId = exchange.getIn().getHeader(CORRELATION_ID, String.class);
+        if (correlationId != null) {
+            span.setTag(SERVICEBUS_CORRELATION_ID, correlationId);
+        }
+
+        Long deliveryCount = exchange.getIn().getHeader(DELIVERY_COUNT, Long.class);
+        if (deliveryCount != null) {
+            span.setTag(SERVICEBUS_DELIVERY_COUNT, deliveryCount);
+        }
+
+        OffsetDateTime enqueuedSequenceNumber = exchange.getIn().getHeader(ENQUEUED_SEQUENCE_NUMBER, OffsetDateTime.class);
+        if (enqueuedSequenceNumber != null) {
+            span.setTag(SERVICEBUS_ENQUEUED_SEQUENCE_NUMBER, enqueuedSequenceNumber.toString());
+        }
+
+        OffsetDateTime enqueuedTime = exchange.getIn().getHeader(ENQUEUED_TIME, OffsetDateTime.class);
+        if (enqueuedTime != null) {
+            span.setTag(SERVICEBUS_ENQUEUED_TIME, enqueuedTime.toString());
+        }
+
+        OffsetDateTime expiresAt = exchange.getIn().getHeader(EXPIRES_AT, OffsetDateTime.class);
+        if (expiresAt != null) {
+            span.setTag(SERVICEBUS_EXPIRES_AT, expiresAt.toString());
+        }
+
+        String partitionKey = exchange.getIn().getHeader(PARTITION_KEY, String.class);
+        if (partitionKey != null) {
+            span.setTag(SERVICEBUS_PARTITION_KEY, partitionKey);
+        }
+
+        String replyToSessionId = exchange.getIn().getHeader(REPLY_TO_SESSION_ID, String.class);
+        if (replyToSessionId != null) {
+            span.setTag(SERVICEBUS_REPLY_TO_SESSION_ID, replyToSessionId);
+        }
+
+        String sessionId = exchange.getIn().getHeader(SESSION_ID, String.class);
+        if (sessionId != null) {
+            span.setTag(SERVICEBUS_SESSION_ID, sessionId);
+        }
+
+        Duration timeToLive = exchange.getIn().getHeader(TIME_TO_LIVE, Duration.class);
+        if (timeToLive != null) {
+            span.setTag(SERVICEBUS_TIME_TO_LIVE, timeToLive.toString());
+        }
+    }
+
+    @Override
+    protected String getMessageId(Exchange exchange) {
+        return exchange.getIn().getHeader(MESSAGE_ID, String.class);
+    }
+
+}
diff --git a/components/camel-tracing/src/test/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecoratorTest.java b/components/camel-tracing/src/test/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecoratorTest.java
new file mode 100644
index 00000000000..3006673d7ca
--- /dev/null
+++ b/components/camel-tracing/src/test/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecoratorTest.java
@@ -0,0 +1,81 @@
+package org.apache.camel.tracing.decorators;
+
+import java.time.Duration;
+import java.time.OffsetDateTime;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.tracing.MockSpanAdapter;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ServiceBusSpanDecoratorTest {
+
+    @Test
+    public void testGetMessageId() {
+        String messageId = "abcd";
+        Exchange exchange = Mockito.mock(Exchange.class);
+        Message message = Mockito.mock(Message.class);
+
+        Mockito.when(exchange.getIn()).thenReturn(message);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.MESSAGE_ID, String.class)).thenReturn(messageId);
+
+        AbstractMessagingSpanDecorator decorator = new ServiceBusSpanDecorator();
+
+        assertEquals(messageId, decorator.getMessageId(exchange));
+    }
+
+    @Test
+    public void testPre() {
+        String contentType = "application/json";
+        String correlationId = "1234";
+        Long deliveryCount = 27L;
+        OffsetDateTime enqueuedSequenceNumber = OffsetDateTime.now();
+        OffsetDateTime enqueuedTime = OffsetDateTime.now();
+        OffsetDateTime expiresAt = OffsetDateTime.now();
+        String partitionKey = "MyPartitionKey";
+        String replyToSessionId = "MyReplyToSessionId";
+        String sessionId = "4321";
+        Duration ttl = Duration.ofDays(7);
+
+        Endpoint endpoint = Mockito.mock(Endpoint.class);
+        Exchange exchange = Mockito.mock(Exchange.class);
+        Message message = Mockito.mock(Message.class);
+
+        Mockito.when(endpoint.getEndpointUri()).thenReturn("azure-servicebus:topicOrQueueName");
+        Mockito.when(exchange.getIn()).thenReturn(message);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.CONTENT_TYPE, String.class)).thenReturn(contentType);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.CORRELATION_ID, String.class)).thenReturn(correlationId);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.DELIVERY_COUNT, Long.class)).thenReturn(deliveryCount);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.ENQUEUED_SEQUENCE_NUMBER, OffsetDateTime.class))
+                .thenReturn(enqueuedSequenceNumber);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.ENQUEUED_TIME, OffsetDateTime.class)).thenReturn(enqueuedTime);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.EXPIRES_AT, OffsetDateTime.class)).thenReturn(expiresAt);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.PARTITION_KEY, String.class)).thenReturn(partitionKey);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.REPLY_TO_SESSION_ID, String.class)).thenReturn(replyToSessionId);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.SESSION_ID, String.class)).thenReturn(sessionId);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.TIME_TO_LIVE, Duration.class)).thenReturn(ttl);
+
+        AbstractMessagingSpanDecorator decorator = new ServiceBusSpanDecorator();
+
+        MockSpanAdapter span = new MockSpanAdapter();
+
+        decorator.pre(span, exchange, endpoint);
+
+        assertEquals(contentType, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_CONTENT_TYPE));
+        assertEquals(correlationId, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_CORRELATION_ID));
+        assertEquals(deliveryCount, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_DELIVERY_COUNT));
+        assertEquals(enqueuedSequenceNumber.toString(),
+                span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_ENQUEUED_SEQUENCE_NUMBER));
+        assertEquals(enqueuedTime.toString(), span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_ENQUEUED_TIME));
+        assertEquals(expiresAt.toString(), span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_EXPIRES_AT));
+        assertEquals(partitionKey, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_PARTITION_KEY));
+        assertEquals(replyToSessionId, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_REPLY_TO_SESSION_ID));
+        assertEquals(sessionId, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_SESSION_ID));
+        assertEquals(ttl.toString(), span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_TIME_TO_LIVE));
+    }
+
+}