You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/06/13 05:45:29 UTC
[camel] 01/02: CAMEL-12573:Fixing class cast exception. Handling
kafka.Partition as Integer and kafka.Offset as a Long.
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-2.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit d36c11b35a3eb839c320897a7ba3beb5b3fe9891
Author: akhettar <ay...@zotix.co>
AuthorDate: Tue Jun 12 16:31:12 2018 +0100
CAMEL-12573:Fixing class cast exception. Handling kafka.Partition as Integer and kafka.Offset as a Long.
---
.../opentracing/decorators/KafkaSpanDecorator.java | 16 +++++++++--
.../decorators/KafkaSpanDecoratorTest.java | 32 ++++++++++++++++++++--
2 files changed, 43 insertions(+), 5 deletions(-)
diff --git a/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/decorators/KafkaSpanDecorator.java b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/decorators/KafkaSpanDecorator.java
index ecc9279..1ec0a68 100644
--- a/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/decorators/KafkaSpanDecorator.java
+++ b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/decorators/KafkaSpanDecorator.java
@@ -58,7 +58,7 @@ public class KafkaSpanDecorator extends AbstractMessagingSpanDecorator {
public void pre(Span span, Exchange exchange, Endpoint endpoint) {
super.pre(span, exchange, endpoint);
- String partition = (String)exchange.getIn().getHeader(PARTITION);
+ String partition = getValue(exchange, PARTITION, Integer.class);
if (partition != null) {
span.setTag(KAFKA_PARTITION_TAG, partition);
}
@@ -73,10 +73,22 @@ public class KafkaSpanDecorator extends AbstractMessagingSpanDecorator {
span.setTag(KAFKA_KEY_TAG, key);
}
- String offset = (String)exchange.getIn().getHeader(OFFSET);
+ String offset = getValue(exchange, OFFSET, Long.class);
if (offset != null) {
span.setTag(KAFKA_OFFSET_TAG, offset);
}
}
+ /**
+ * Extracts header value from the exchange for given header
+ * @param exchange the {@link Exchange}
+ * @param header the header name
+ * @param type the class type of the exchange header
+ * @return
+ */
+ private <T> String getValue(final Exchange exchange, final String header, Class<T> type) {
+ T partition = exchange.getIn().getHeader(header, type);
+ return partition != null ? String.valueOf(partition) : exchange.getIn().getHeader(header, String.class);
+ }
+
}
diff --git a/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/decorators/KafkaSpanDecoratorTest.java b/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/decorators/KafkaSpanDecoratorTest.java
index b3eda9c..2c51def 100644
--- a/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/decorators/KafkaSpanDecoratorTest.java
+++ b/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/decorators/KafkaSpanDecoratorTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.opentracing.decorators;
import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
+import jdk.nashorn.internal.IntDeque;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -59,7 +60,7 @@ public class KafkaSpanDecoratorTest {
}
@Test
- public void testPre() {
+ public void testPreOffsetAndPartitionAsStringHeader() {
String testKey = "TestKey";
String testOffset = "TestOffset";
String testPartition = "TestPartition";
@@ -72,8 +73,8 @@ public class KafkaSpanDecoratorTest {
Mockito.when(endpoint.getEndpointUri()).thenReturn("test");
Mockito.when(exchange.getIn()).thenReturn(message);
Mockito.when(message.getHeader(KafkaSpanDecorator.KEY)).thenReturn(testKey);
- Mockito.when(message.getHeader(KafkaSpanDecorator.OFFSET)).thenReturn(testOffset);
- Mockito.when(message.getHeader(KafkaSpanDecorator.PARTITION)).thenReturn(testPartition);
+ Mockito.when(message.getHeader(KafkaSpanDecorator.OFFSET, String.class)).thenReturn(testOffset);
+ Mockito.when(message.getHeader(KafkaSpanDecorator.PARTITION, String.class)).thenReturn(testPartition);
Mockito.when(message.getHeader(KafkaSpanDecorator.PARTITION_KEY)).thenReturn(testPartitionKey);
SpanDecorator decorator = new KafkaSpanDecorator();
@@ -89,4 +90,29 @@ public class KafkaSpanDecoratorTest {
assertEquals(testPartitionKey, span.tags().get(KafkaSpanDecorator.KAFKA_PARTITION_KEY_TAG));
}
+ @Test
+ public void testPrePartitionAsIntegerHeaderAndOffsetAsLongHeader() {
+ Long testOffset = 4875454l;
+ Integer testPartition = 0;
+
+ Endpoint endpoint = Mockito.mock(Endpoint.class);
+ Exchange exchange = Mockito.mock(Exchange.class);
+ Message message = Mockito.mock(Message.class);
+
+ Mockito.when(endpoint.getEndpointUri()).thenReturn("test");
+ Mockito.when(exchange.getIn()).thenReturn(message);
+ Mockito.when(message.getHeader(KafkaSpanDecorator.OFFSET, Long.class)).thenReturn(testOffset);
+ Mockito.when(message.getHeader(KafkaSpanDecorator.PARTITION, Integer.class)).thenReturn(testPartition);
+
+ SpanDecorator decorator = new KafkaSpanDecorator();
+
+ MockTracer tracer = new MockTracer();
+ MockSpan span = tracer.buildSpan("TestSpan").start();
+
+ decorator.pre(span, exchange, endpoint);
+
+ assertEquals(String.valueOf(testOffset), span.tags().get(KafkaSpanDecorator.KAFKA_OFFSET_TAG));
+ assertEquals(String.valueOf(testPartition), span.tags().get(KafkaSpanDecorator.KAFKA_PARTITION_TAG));
+ }
+
}
--
To stop receiving notification emails like this one, please contact
acosentino@apache.org.