You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/06/13 05:45:29 UTC

[camel] 01/02: CAMEL-12573:Fixing class cast exception. Handling kafka.Partition as Integer and kafka.Offset as a Long.

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-2.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d36c11b35a3eb839c320897a7ba3beb5b3fe9891
Author: akhettar <ay...@zotix.co>
AuthorDate: Tue Jun 12 16:31:12 2018 +0100

    CAMEL-12573:Fixing class cast exception. Handling kafka.Partition as Integer and kafka.Offset as a Long.
---
 .../opentracing/decorators/KafkaSpanDecorator.java | 16 +++++++++--
 .../decorators/KafkaSpanDecoratorTest.java         | 32 ++++++++++++++++++++--
 2 files changed, 43 insertions(+), 5 deletions(-)

diff --git a/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/decorators/KafkaSpanDecorator.java b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/decorators/KafkaSpanDecorator.java
index ecc9279..1ec0a68 100644
--- a/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/decorators/KafkaSpanDecorator.java
+++ b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/decorators/KafkaSpanDecorator.java
@@ -58,7 +58,7 @@ public class KafkaSpanDecorator extends AbstractMessagingSpanDecorator {
     public void pre(Span span, Exchange exchange, Endpoint endpoint) {
         super.pre(span, exchange, endpoint);
 
-        String partition = (String)exchange.getIn().getHeader(PARTITION);
+        String partition = getValue(exchange, PARTITION, Integer.class);
         if (partition != null) {
             span.setTag(KAFKA_PARTITION_TAG, partition);
         }
@@ -73,10 +73,22 @@ public class KafkaSpanDecorator extends AbstractMessagingSpanDecorator {
             span.setTag(KAFKA_KEY_TAG, key);
         }
 
-        String offset = (String)exchange.getIn().getHeader(OFFSET);
+        String offset = getValue(exchange, OFFSET, Long.class);
         if (offset != null) {
             span.setTag(KAFKA_OFFSET_TAG, offset);
         }
     }
 
+    /**
+     * Extracts header value from the exchange for given header
+     * @param exchange the {@link Exchange}
+     * @param header the header name
+     * @param type the class type of the exchange header
+     * @return
+     */
+    private <T> String getValue(final Exchange exchange, final String header, Class<T> type) {
+         T partition = exchange.getIn().getHeader(header, type);
+         return partition != null ? String.valueOf(partition) : exchange.getIn().getHeader(header, String.class);
+    }
+
 }
diff --git a/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/decorators/KafkaSpanDecoratorTest.java b/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/decorators/KafkaSpanDecoratorTest.java
index b3eda9c..2c51def 100644
--- a/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/decorators/KafkaSpanDecoratorTest.java
+++ b/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/decorators/KafkaSpanDecoratorTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.opentracing.decorators;
 import io.opentracing.mock.MockSpan;
 import io.opentracing.mock.MockTracer;
 
+import jdk.nashorn.internal.IntDeque;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -59,7 +60,7 @@ public class KafkaSpanDecoratorTest {
     }
 
     @Test
-    public void testPre() {
+    public void testPreOffsetAndPartitionAsStringHeader() {
         String testKey = "TestKey";
         String testOffset = "TestOffset";
         String testPartition = "TestPartition";
@@ -72,8 +73,8 @@ public class KafkaSpanDecoratorTest {
         Mockito.when(endpoint.getEndpointUri()).thenReturn("test");
         Mockito.when(exchange.getIn()).thenReturn(message);
         Mockito.when(message.getHeader(KafkaSpanDecorator.KEY)).thenReturn(testKey);
-        Mockito.when(message.getHeader(KafkaSpanDecorator.OFFSET)).thenReturn(testOffset);
-        Mockito.when(message.getHeader(KafkaSpanDecorator.PARTITION)).thenReturn(testPartition);
+        Mockito.when(message.getHeader(KafkaSpanDecorator.OFFSET, String.class)).thenReturn(testOffset);
+        Mockito.when(message.getHeader(KafkaSpanDecorator.PARTITION, String.class)).thenReturn(testPartition);
         Mockito.when(message.getHeader(KafkaSpanDecorator.PARTITION_KEY)).thenReturn(testPartitionKey);
 
         SpanDecorator decorator = new KafkaSpanDecorator();
@@ -89,4 +90,29 @@ public class KafkaSpanDecoratorTest {
         assertEquals(testPartitionKey, span.tags().get(KafkaSpanDecorator.KAFKA_PARTITION_KEY_TAG));
     }
 
+    @Test
+    public void testPrePartitionAsIntegerHeaderAndOffsetAsLongHeader() {
+        Long testOffset = 4875454l;
+        Integer testPartition = 0;
+
+        Endpoint endpoint = Mockito.mock(Endpoint.class);
+        Exchange exchange = Mockito.mock(Exchange.class);
+        Message message = Mockito.mock(Message.class);
+
+        Mockito.when(endpoint.getEndpointUri()).thenReturn("test");
+        Mockito.when(exchange.getIn()).thenReturn(message);
+        Mockito.when(message.getHeader(KafkaSpanDecorator.OFFSET, Long.class)).thenReturn(testOffset);
+        Mockito.when(message.getHeader(KafkaSpanDecorator.PARTITION, Integer.class)).thenReturn(testPartition);
+
+        SpanDecorator decorator = new KafkaSpanDecorator();
+
+        MockTracer tracer = new MockTracer();
+        MockSpan span = tracer.buildSpan("TestSpan").start();
+
+        decorator.pre(span, exchange, endpoint);
+
+        assertEquals(String.valueOf(testOffset), span.tags().get(KafkaSpanDecorator.KAFKA_OFFSET_TAG));
+        assertEquals(String.valueOf(testPartition), span.tags().get(KafkaSpanDecorator.KAFKA_PARTITION_TAG));
+    }
+
 }

-- 
To stop receiving notification emails like this one, please contact
acosentino@apache.org.