You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bv...@apache.org on 2022/05/05 19:08:39 UTC
[camel] 01/01: CAMEL-18055: Create tracing SpanDecorator for ServiceBusComponent
This is an automated email from the ASF dual-hosted git repository.
bvahdat pushed a commit to branch CAMEL-18055
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3179795c3f165b54b0acf307547f9c5eb35a1f2a
Author: Babak Vahdat <bv...@apache.org>
AuthorDate: Thu May 5 21:08:14 2022 +0200
CAMEL-18055: Create tracing SpanDecorator for ServiceBusComponent
---
.../decorators/ServiceBusSpanDecorator.java | 108 +++++++++++++++++++++
.../decorators/ServiceBusSpanDecoratorTest.java | 81 ++++++++++++++++
2 files changed, 189 insertions(+)
diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecorator.java b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecorator.java
new file mode 100644
index 00000000000..5a38f4489a0
--- /dev/null
+++ b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecorator.java
@@ -0,0 +1,108 @@
+package org.apache.camel.tracing.decorators;
+
+import java.time.Duration;
+import java.time.OffsetDateTime;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.tracing.SpanAdapter;
+
+public class ServiceBusSpanDecorator extends AbstractMessagingSpanDecorator {
+
+ static final String SERVICEBUS_CONTENT_TYPE = "contentType";
+ static final String SERVICEBUS_CORRELATION_ID = "correlationId";
+ static final String SERVICEBUS_DELIVERY_COUNT = "deliveryCount";
+ static final String SERVICEBUS_ENQUEUED_SEQUENCE_NUMBER = "enqueuedSequenceNumber";
+ static final String SERVICEBUS_ENQUEUED_TIME = "enqueuedTime";
+ static final String SERVICEBUS_EXPIRES_AT = "expiresAt";
+ static final String SERVICEBUS_PARTITION_KEY = "partitionKey";
+ static final String SERVICEBUS_REPLY_TO_SESSION_ID = "replyToSessionId";
+ static final String SERVICEBUS_SESSION_ID = "sessionId";
+ static final String SERVICEBUS_TIME_TO_LIVE = "ttl";
+
+ /**
+ * Constants copied from {@link org.apache.camel.component.azure.servicebus.ServiceBusConstants}
+ */
+ static final String CONTENT_TYPE = "CamelAzureServiceBusContentType";
+ static final String CORRELATION_ID = "CamelAzureServiceBusCorrelationId";
+ static final String DELIVERY_COUNT = "CamelAzureServiceBusDeliveryCount";
+ static final String ENQUEUED_SEQUENCE_NUMBER = "CamelAzureServiceBusEnqueuedSequenceNumber";
+ static final String ENQUEUED_TIME = "CamelAzureServiceBusEnqueuedTime";
+ static final String EXPIRES_AT = "CamelAzureServiceBusExpiresAt";
+ static final String MESSAGE_ID = "CamelAzureServiceBusMessageId";
+ static final String SESSION_ID = "CamelAzureServiceBusSessionId";
+ static final String REPLY_TO_SESSION_ID = "CamelAzureServiceBusReplyToSessionId";
+ static final String PARTITION_KEY = "CamelAzureServiceBusPartitionKey";
+ static final String TIME_TO_LIVE = "CamelAzureServiceBusTimeToLive";
+
+ @Override
+ public String getComponent() {
+ return "azure-servicebus";
+ }
+
+ @Override
+ public String getComponentClassName() {
+ return "org.apache.camel.component.azure.servicebus.ServiceBusComponent";
+ }
+
+ @Override
+ public void pre(SpanAdapter span, Exchange exchange, Endpoint endpoint) {
+ super.pre(span, exchange, endpoint);
+
+ String contentType = exchange.getIn().getHeader(CONTENT_TYPE, String.class);
+ if (contentType != null) {
+ span.setTag(SERVICEBUS_CONTENT_TYPE, contentType);
+ }
+
+ String correlationId = exchange.getIn().getHeader(CORRELATION_ID, String.class);
+ if (correlationId != null) {
+ span.setTag(SERVICEBUS_CORRELATION_ID, correlationId);
+ }
+
+ Long deliveryCount = exchange.getIn().getHeader(DELIVERY_COUNT, Long.class);
+ if (deliveryCount != null) {
+ span.setTag(SERVICEBUS_DELIVERY_COUNT, deliveryCount);
+ }
+
+ OffsetDateTime enqueuedSequenceNumber = exchange.getIn().getHeader(ENQUEUED_SEQUENCE_NUMBER, OffsetDateTime.class);
+ if (enqueuedSequenceNumber != null) {
+ span.setTag(SERVICEBUS_ENQUEUED_SEQUENCE_NUMBER, enqueuedSequenceNumber.toString());
+ }
+
+ OffsetDateTime enqueuedTime = exchange.getIn().getHeader(ENQUEUED_TIME, OffsetDateTime.class);
+ if (enqueuedTime != null) {
+ span.setTag(SERVICEBUS_ENQUEUED_TIME, enqueuedTime.toString());
+ }
+
+ OffsetDateTime expiresAt = exchange.getIn().getHeader(EXPIRES_AT, OffsetDateTime.class);
+ if (expiresAt != null) {
+ span.setTag(SERVICEBUS_EXPIRES_AT, expiresAt.toString());
+ }
+
+ String partitionKey = exchange.getIn().getHeader(PARTITION_KEY, String.class);
+ if (partitionKey != null) {
+ span.setTag(SERVICEBUS_PARTITION_KEY, partitionKey);
+ }
+
+ String replyToSessionId = exchange.getIn().getHeader(REPLY_TO_SESSION_ID, String.class);
+ if (replyToSessionId != null) {
+ span.setTag(SERVICEBUS_REPLY_TO_SESSION_ID, replyToSessionId);
+ }
+
+ String sessionId = exchange.getIn().getHeader(SESSION_ID, String.class);
+ if (sessionId != null) {
+ span.setTag(SERVICEBUS_SESSION_ID, sessionId);
+ }
+
+ Duration timeToLive = exchange.getIn().getHeader(TIME_TO_LIVE, Duration.class);
+ if (timeToLive != null) {
+ span.setTag(SERVICEBUS_TIME_TO_LIVE, timeToLive.toString());
+ }
+ }
+
+ @Override
+ protected String getMessageId(Exchange exchange) {
+ return exchange.getIn().getHeader(MESSAGE_ID, String.class);
+ }
+
+}
diff --git a/components/camel-tracing/src/test/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecoratorTest.java b/components/camel-tracing/src/test/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecoratorTest.java
new file mode 100644
index 00000000000..3006673d7ca
--- /dev/null
+++ b/components/camel-tracing/src/test/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecoratorTest.java
@@ -0,0 +1,81 @@
+package org.apache.camel.tracing.decorators;
+
+import java.time.Duration;
+import java.time.OffsetDateTime;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.tracing.MockSpanAdapter;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ServiceBusSpanDecoratorTest {
+
+ @Test
+ public void testGetMessageId() {
+ String messageId = "abcd";
+ Exchange exchange = Mockito.mock(Exchange.class);
+ Message message = Mockito.mock(Message.class);
+
+ Mockito.when(exchange.getIn()).thenReturn(message);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.MESSAGE_ID, String.class)).thenReturn(messageId);
+
+ AbstractMessagingSpanDecorator decorator = new ServiceBusSpanDecorator();
+
+ assertEquals(messageId, decorator.getMessageId(exchange));
+ }
+
+ @Test
+ public void testPre() {
+ String contentType = "application/json";
+ String correlationId = "1234";
+ Long deliveryCount = 27L;
+ OffsetDateTime enqueuedSequenceNumber = OffsetDateTime.now();
+ OffsetDateTime enqueuedTime = OffsetDateTime.now();
+ OffsetDateTime expiresAt = OffsetDateTime.now();
+ String partitionKey = "MyPartitionKey";
+ String replyToSessionId = "MyReplyToSessionId";
+ String sessionId = "4321";
+ Duration ttl = Duration.ofDays(7);
+
+ Endpoint endpoint = Mockito.mock(Endpoint.class);
+ Exchange exchange = Mockito.mock(Exchange.class);
+ Message message = Mockito.mock(Message.class);
+
+ Mockito.when(endpoint.getEndpointUri()).thenReturn("azure-servicebus:topicOrQueueName");
+ Mockito.when(exchange.getIn()).thenReturn(message);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.CONTENT_TYPE, String.class)).thenReturn(contentType);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.CORRELATION_ID, String.class)).thenReturn(correlationId);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.DELIVERY_COUNT, Long.class)).thenReturn(deliveryCount);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.ENQUEUED_SEQUENCE_NUMBER, OffsetDateTime.class))
+ .thenReturn(enqueuedSequenceNumber);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.ENQUEUED_TIME, OffsetDateTime.class)).thenReturn(enqueuedTime);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.EXPIRES_AT, OffsetDateTime.class)).thenReturn(expiresAt);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.PARTITION_KEY, String.class)).thenReturn(partitionKey);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.REPLY_TO_SESSION_ID, String.class)).thenReturn(replyToSessionId);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.SESSION_ID, String.class)).thenReturn(sessionId);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.TIME_TO_LIVE, Duration.class)).thenReturn(ttl);
+
+ AbstractMessagingSpanDecorator decorator = new ServiceBusSpanDecorator();
+
+ MockSpanAdapter span = new MockSpanAdapter();
+
+ decorator.pre(span, exchange, endpoint);
+
+ assertEquals(contentType, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_CONTENT_TYPE));
+ assertEquals(correlationId, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_CORRELATION_ID));
+ assertEquals(deliveryCount, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_DELIVERY_COUNT));
+ assertEquals(enqueuedSequenceNumber.toString(),
+ span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_ENQUEUED_SEQUENCE_NUMBER));
+ assertEquals(enqueuedTime.toString(), span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_ENQUEUED_TIME));
+ assertEquals(expiresAt.toString(), span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_EXPIRES_AT));
+ assertEquals(partitionKey, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_PARTITION_KEY));
+ assertEquals(replyToSessionId, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_REPLY_TO_SESSION_ID));
+ assertEquals(sessionId, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_SESSION_ID));
+ assertEquals(ttl.toString(), span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_TIME_TO_LIVE));
+ }
+
+}