You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/06/11 22:17:17 UTC

[kafka] branch 2.0 updated: KAFKA-7003: Set error context in message headers (KIP-298)

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new fc2578c  KAFKA-7003: Set error context in message headers (KIP-298)
fc2578c is described below

commit fc2578cf26ae759280f8b9cbd0056f96681d4cbf
Author: Arjun Satish <ar...@confluent.io>
AuthorDate: Mon Jun 11 15:16:46 2018 -0700

    KAFKA-7003: Set error context in message headers (KIP-298)
    
    If the property `errors.deadletterqueue.context.headers.enable` is set to true, add a set of headers to the message describing the context under which the error took place.
    
    A unit test is added to check the correctness of header creation.
    
    Signed-off-by: Arjun Satish <arjunconfluent.io>
    
    Author: Arjun Satish <ar...@confluent.io>
    
    Reviewers: Konstantine Karantasis <ko...@confluent.io>, Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #5159 from wicknicks/KAFKA-7003
    
    (cherry picked from commit 3face7fce2489715c040c9756ec05406aaa657d4)
    Signed-off-by: Ewen Cheslack-Postava <me...@ewencp.org>
---
 .../kafka/connect/runtime/SinkConnectorConfig.java | 14 +++-
 .../org/apache/kafka/connect/runtime/Worker.java   |  2 +-
 .../runtime/errors/DeadLetterQueueReporter.java    | 77 ++++++++++++++++++++-
 .../connect/runtime/errors/ErrorReporterTest.java  | 79 +++++++++++++++++++++-
 4 files changed, 165 insertions(+), 7 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index 6e9bd6b..d9d140b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -57,11 +57,19 @@ public class SinkConnectorConfig extends ConnectorConfig {
     public static final short DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT = 3;
     private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY = "Dead Letter Queue Topic Replication Factor";
 
+    public static final String DLQ_CONTEXT_HEADERS_ENABLE_CONFIG = DLQ_PREFIX + "context.headers.enable";
+    public static final boolean DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT = false;
+    public static final String DLQ_CONTEXT_HEADERS_ENABLE_DOC = "If true, add headers containing error context to the messages " +
+            "written to the dead letter queue. To avoid clashing with headers from the original record, all error context header " +
+            "keys, all error context header keys will start with <code>__connect.errors.</code>";
+    private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers";
+
     static ConfigDef config = ConnectorConfig.configDef()
         .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
         .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
         .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
-        .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY);
+        .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY)
+        .define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
 
     public static ConfigDef configDef() {
         return config;
@@ -107,4 +115,8 @@ public class SinkConnectorConfig extends ConnectorConfig {
     public short dlqTopicReplicationFactor() {
         return getShort(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG);
     }
+
+    public boolean isDlqContextHeadersEnabled() {
+        return getBoolean(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG);
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 97e68fa..c794eb8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -530,7 +530,7 @@ public class Worker {
         // check if topic for dead letter queue exists
         String topic = connConfig.dlqTopicName();
         if (topic != null && !topic.isEmpty()) {
-            DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, connConfig, producerProps);
+            DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps);
             reporters.add(reporter);
         }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index 459eeae..d36ec22 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -22,13 +22,19 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
@@ -46,12 +52,26 @@ public class DeadLetterQueueReporter implements ErrorReporter {
 
     private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
 
+    public static final String HEADER_PREFIX = "__connect.errors.";
+    public static final String ERROR_HEADER_ORIG_TOPIC = HEADER_PREFIX + "topic";
+    public static final String ERROR_HEADER_ORIG_PARTITION = HEADER_PREFIX + "partition";
+    public static final String ERROR_HEADER_ORIG_OFFSET = HEADER_PREFIX + "offset";
+    public static final String ERROR_HEADER_CONNECTOR_NAME = HEADER_PREFIX + "connector.name";
+    public static final String ERROR_HEADER_TASK_ID = HEADER_PREFIX + "task.id";
+    public static final String ERROR_HEADER_STAGE = HEADER_PREFIX + "stage";
+    public static final String ERROR_HEADER_EXECUTING_CLASS = HEADER_PREFIX + "class.name";
+    public static final String ERROR_HEADER_EXCEPTION = HEADER_PREFIX + "exception.class.name";
+    public static final String ERROR_HEADER_EXCEPTION_MESSAGE = HEADER_PREFIX + "exception.message";
+    public static final String ERROR_HEADER_EXCEPTION_STACK_TRACE = HEADER_PREFIX + "exception.stacktrace";
+
     private final SinkConnectorConfig connConfig;
+    private final ConnectorTaskId connectorTaskId;
 
     private KafkaProducer<byte[], byte[]> kafkaProducer;
     private ErrorHandlingMetrics errorHandlingMetrics;
 
     public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig,
+                                                         ConnectorTaskId id,
                                                          SinkConnectorConfig sinkConfig, Map<String, Object> producerProps) {
         String topic = sinkConfig.dlqTopicName();
 
@@ -70,7 +90,7 @@ public class DeadLetterQueueReporter implements ErrorReporter {
         }
 
         KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps);
-        return new DeadLetterQueueReporter(dlqProducer, sinkConfig);
+        return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id);
     }
 
     /**
@@ -79,9 +99,10 @@ public class DeadLetterQueueReporter implements ErrorReporter {
      * @param kafkaProducer a Kafka Producer to produce the original consumed records.
      */
     // Visible for testing
-    DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, SinkConnectorConfig connConfig) {
+    DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, SinkConnectorConfig connConfig, ConnectorTaskId id) {
         this.kafkaProducer = kafkaProducer;
         this.connConfig = connConfig;
+        this.connectorTaskId = id;
     }
 
     @Override
@@ -117,6 +138,10 @@ public class DeadLetterQueueReporter implements ErrorReporter {
                     originalMessage.key(), originalMessage.value(), originalMessage.headers());
         }
 
+        if (connConfig.isDlqContextHeadersEnabled()) {
+            populateContextHeaders(producerRecord, context);
+        }
+
         this.kafkaProducer.send(producerRecord, (metadata, exception) -> {
             if (exception != null) {
                 log.error("Could not produce message to dead letter queue. topic=" + dlqTopicName, exception);
@@ -124,4 +149,52 @@ public class DeadLetterQueueReporter implements ErrorReporter {
             }
         });
     }
+
+    // Visible for testing
+    void populateContextHeaders(ProducerRecord<byte[], byte[]> producerRecord, ProcessingContext context) {
+        Headers headers = producerRecord.headers();
+        if (context.consumerRecord() != null) {
+            headers.add(ERROR_HEADER_ORIG_TOPIC, toBytes(context.consumerRecord().topic()));
+            headers.add(ERROR_HEADER_ORIG_PARTITION, toBytes(context.consumerRecord().partition()));
+            headers.add(ERROR_HEADER_ORIG_OFFSET, toBytes(context.consumerRecord().offset()));
+        }
+
+        headers.add(ERROR_HEADER_CONNECTOR_NAME, toBytes(connectorTaskId.connector()));
+        headers.add(ERROR_HEADER_TASK_ID, toBytes(String.valueOf(connectorTaskId.task())));
+        headers.add(ERROR_HEADER_STAGE, toBytes(context.stage().name()));
+        headers.add(ERROR_HEADER_EXECUTING_CLASS, toBytes(context.executingClass().getName()));
+        if (context.error() != null) {
+            headers.add(ERROR_HEADER_EXCEPTION, toBytes(context.error().getClass().getName()));
+            headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, toBytes(context.error().getMessage()));
+            byte[] trace;
+            if ((trace = stacktrace(context.error())) != null) {
+                headers.add(ERROR_HEADER_EXCEPTION_STACK_TRACE, trace);
+            }
+        }
+    }
+
+    private byte[] stacktrace(Throwable error) {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        try {
+            PrintStream stream = new PrintStream(bos, true, "UTF-8");
+            error.printStackTrace(stream);
+            bos.close();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            log.error("Could not serialize stacktrace.", e);
+        }
+        return null;
+    }
+
+    private byte[] toBytes(int value) {
+        return toBytes(String.valueOf(value));
+    }
+
+    private byte[] toBytes(long value) {
+        return toBytes(String.valueOf(value));
+    }
+
+    private byte[] toBytes(String value) {
+        return value.getBytes(StandardCharsets.UTF_8);
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index f35c514..f199982 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -18,7 +18,10 @@ package org.apache.kafka.connect.runtime.errors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.runtime.ConnectMetrics;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
@@ -26,6 +29,7 @@ import org.apache.kafka.connect.runtime.MockConnectMetrics;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.easymock.EasyMock;
 import org.easymock.Mock;
@@ -43,8 +47,19 @@ import java.util.concurrent.Future;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_CONNECTOR_NAME;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_STACK_TRACE;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXECUTING_CLASS;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_OFFSET;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_PARTITION;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_STAGE;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_TASK_ID;
 import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
@@ -81,7 +96,7 @@ public class ErrorReporterTest {
 
     @Test
     public void testDLQConfigWithEmptyTopicName() {
-        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(emptyMap()));
+        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(emptyMap()), TASK_ID);
         deadLetterQueueReporter.metrics(errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
@@ -96,7 +111,7 @@ public class ErrorReporterTest {
 
     @Test
     public void testDLQConfigWithValidTopicName() {
-        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)));
+        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID);
         deadLetterQueueReporter.metrics(errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
@@ -111,7 +126,7 @@ public class ErrorReporterTest {
 
     @Test
     public void testReportDLQTwice() {
-        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)));
+        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID);
         deadLetterQueueReporter.metrics(errorHandlingMetrics);
 
         ProcessingContext context = processingContext();
@@ -189,6 +204,64 @@ public class ErrorReporterTest {
         assertEquals(configuration.dlqTopicReplicationFactor(), 7);
     }
 
+    public void testDlqHeaderConsumerRecord() {
+        Map<String, String> props = new HashMap<>();
+        props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+        props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
+        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(props), TASK_ID);
+
+        ProcessingContext context = new ProcessingContext();
+        context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, "source-key".getBytes(), "source-value".getBytes()));
+        context.currentContext(Stage.TRANSFORMATION, Transformation.class);
+        context.error(new ConnectException("Test Exception"));
+
+        ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(DLQ_TOPIC, "source-key".getBytes(), "source-value".getBytes());
+
+        deadLetterQueueReporter.populateContextHeaders(producerRecord, context);
+        assertEquals("source-topic", headerValue(producerRecord, ERROR_HEADER_ORIG_TOPIC));
+        assertEquals("7", headerValue(producerRecord, ERROR_HEADER_ORIG_PARTITION));
+        assertEquals("10", headerValue(producerRecord, ERROR_HEADER_ORIG_OFFSET));
+        assertEquals(TASK_ID.connector(), headerValue(producerRecord, ERROR_HEADER_CONNECTOR_NAME));
+        assertEquals(String.valueOf(TASK_ID.task()), headerValue(producerRecord, ERROR_HEADER_TASK_ID));
+        assertEquals(Stage.TRANSFORMATION.name(), headerValue(producerRecord, ERROR_HEADER_STAGE));
+        assertEquals(Transformation.class.getName(), headerValue(producerRecord, ERROR_HEADER_EXECUTING_CLASS));
+        assertEquals(ConnectException.class.getName(), headerValue(producerRecord, ERROR_HEADER_EXCEPTION));
+        assertEquals("Test Exception", headerValue(producerRecord, ERROR_HEADER_EXCEPTION_MESSAGE));
+        assertTrue(headerValue(producerRecord, ERROR_HEADER_EXCEPTION_STACK_TRACE).length() > 0);
+        assertTrue(headerValue(producerRecord, ERROR_HEADER_EXCEPTION_STACK_TRACE).startsWith("org.apache.kafka.connect.errors.ConnectException: Test Exception"));
+    }
+
+    @Test
+    public void testDlqHeaderIsAppended() {
+        Map<String, String> props = new HashMap<>();
+        props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+        props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
+        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(props), TASK_ID);
+
+        ProcessingContext context = new ProcessingContext();
+        context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, "source-key".getBytes(), "source-value".getBytes()));
+        context.currentContext(Stage.TRANSFORMATION, Transformation.class);
+        context.error(new ConnectException("Test Exception"));
+
+        ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(DLQ_TOPIC, "source-key".getBytes(), "source-value".getBytes());
+        producerRecord.headers().add(ERROR_HEADER_ORIG_TOPIC, "dummy".getBytes());
+
+        deadLetterQueueReporter.populateContextHeaders(producerRecord, context);
+        int appearances = 0;
+        for (Header header: producerRecord.headers()) {
+            if (ERROR_HEADER_ORIG_TOPIC.equalsIgnoreCase(header.key())) {
+                appearances++;
+            }
+        }
+
+        assertEquals("source-topic", headerValue(producerRecord, ERROR_HEADER_ORIG_TOPIC));
+        assertEquals(2, appearances);
+    }
+
+    private String headerValue(ProducerRecord<byte[], byte[]> producerRecord, String headerSuffix) {
+        return new String(producerRecord.headers().lastHeader(headerSuffix).value());
+    }
+
     private ProcessingContext processingContext() {
         ProcessingContext context = new ProcessingContext();
         context.consumerRecord(new ConsumerRecord<>(TOPIC, 5, 100, new byte[]{'a', 'b'}, new byte[]{'x'}));

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.