You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/09/29 17:19:39 UTC

[kafka] branch 2.0 updated: KAFKA-7434: Fix NPE in DeadLetterQueueReporter

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 74c8b83  KAFKA-7434: Fix NPE in DeadLetterQueueReporter
74c8b83 is described below

commit 74c8b831472ed07e10ceda660e0e504a6a6821c4
Author: Michał Borowiecki <mb...@gmail.com>
AuthorDate: Sat Sep 29 10:19:10 2018 -0700

    KAFKA-7434: Fix NPE in DeadLetterQueueReporter
    
    *More detailed description of your change,
    if necessary. The PR title and PR message become
    the squashed commit message, so use a separate
    comment to ping reviewers.*
    
    *Summary of testing strategy (including rationale)
    for the feature or bug fix. Unit and/or integration
    tests are expected for any behaviour change and
    system tests should be considered for larger changes.*
    
    Author: Michał Borowiecki <mb...@gmail.com>
    
    Reviewers: Arjun Satish <ar...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #5700 from mihbor/KAFKA-7434
    
    (cherry picked from commit 22f1724123c267352116c18db1abdee25c31b382)
    Signed-off-by: Ewen Cheslack-Postava <me...@ewencp.org>
---
 .../runtime/errors/DeadLetterQueueReporter.java    |  6 ++++-
 .../connect/runtime/errors/ErrorReporterTest.java  | 30 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 1 deletion(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index c059dcf..2312269 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -199,6 +199,10 @@ public class DeadLetterQueueReporter implements ErrorReporter {
     }
 
     private byte[] toBytes(String value) {
-        return value.getBytes(StandardCharsets.UTF_8);
+        if (value != null) {
+            return value.getBytes(StandardCharsets.UTF_8);
+        } else {
+            return null;
+        }
     }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index fa628b0..00a922f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -59,6 +59,7 @@ import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ER
 import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_TASK_ID;
 import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
@@ -205,6 +206,7 @@ public class ErrorReporterTest {
         assertEquals(configuration.dlqTopicReplicationFactor(), 7);
     }
 
+    @Test
     public void testDlqHeaderConsumerRecord() {
         Map<String, String> props = new HashMap<>();
         props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
@@ -233,6 +235,34 @@ public class ErrorReporterTest {
     }
 
     @Test
+    public void testDlqHeaderOnNullExceptionMessage() {
+        Map<String, String> props = new HashMap<>();
+        props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+        props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
+        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(props), TASK_ID, errorHandlingMetrics);
+
+        ProcessingContext context = new ProcessingContext();
+        context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, "source-key".getBytes(), "source-value".getBytes()));
+        context.currentContext(Stage.TRANSFORMATION, Transformation.class);
+        context.error(new NullPointerException());
+
+        ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(DLQ_TOPIC, "source-key".getBytes(), "source-value".getBytes());
+
+        deadLetterQueueReporter.populateContextHeaders(producerRecord, context);
+        assertEquals("source-topic", headerValue(producerRecord, ERROR_HEADER_ORIG_TOPIC));
+        assertEquals("7", headerValue(producerRecord, ERROR_HEADER_ORIG_PARTITION));
+        assertEquals("10", headerValue(producerRecord, ERROR_HEADER_ORIG_OFFSET));
+        assertEquals(TASK_ID.connector(), headerValue(producerRecord, ERROR_HEADER_CONNECTOR_NAME));
+        assertEquals(String.valueOf(TASK_ID.task()), headerValue(producerRecord, ERROR_HEADER_TASK_ID));
+        assertEquals(Stage.TRANSFORMATION.name(), headerValue(producerRecord, ERROR_HEADER_STAGE));
+        assertEquals(Transformation.class.getName(), headerValue(producerRecord, ERROR_HEADER_EXECUTING_CLASS));
+        assertEquals(NullPointerException.class.getName(), headerValue(producerRecord, ERROR_HEADER_EXCEPTION));
+        assertNull(producerRecord.headers().lastHeader(ERROR_HEADER_EXCEPTION_MESSAGE).value());
+        assertTrue(headerValue(producerRecord, ERROR_HEADER_EXCEPTION_STACK_TRACE).length() > 0);
+        assertTrue(headerValue(producerRecord, ERROR_HEADER_EXCEPTION_STACK_TRACE).startsWith("java.lang.NullPointerException"));
+    }
+
+    @Test
     public void testDlqHeaderIsAppended() {
         Map<String, String> props = new HashMap<>();
         props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);