You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/06/11 22:17:17 UTC
[kafka] branch 2.0 updated: KAFKA-7003: Set error context in
message headers (KIP-298)
This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new fc2578c KAFKA-7003: Set error context in message headers (KIP-298)
fc2578c is described below
commit fc2578cf26ae759280f8b9cbd0056f96681d4cbf
Author: Arjun Satish <ar...@confluent.io>
AuthorDate: Mon Jun 11 15:16:46 2018 -0700
KAFKA-7003: Set error context in message headers (KIP-298)
If the property `errors.deadletterqueue.context.headers.enable` is set to true, add a set of headers to the message describing the context under which the error took place.
A unit test is added to check the correctness of header creation.
Signed-off-by: Arjun Satish <arjunconfluent.io>
Author: Arjun Satish <ar...@confluent.io>
Reviewers: Konstantine Karantasis <ko...@confluent.io>, Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #5159 from wicknicks/KAFKA-7003
(cherry picked from commit 3face7fce2489715c040c9756ec05406aaa657d4)
Signed-off-by: Ewen Cheslack-Postava <me...@ewencp.org>
---
.../kafka/connect/runtime/SinkConnectorConfig.java | 14 +++-
.../org/apache/kafka/connect/runtime/Worker.java | 2 +-
.../runtime/errors/DeadLetterQueueReporter.java | 77 ++++++++++++++++++++-
.../connect/runtime/errors/ErrorReporterTest.java | 79 +++++++++++++++++++++-
4 files changed, 165 insertions(+), 7 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index 6e9bd6b..d9d140b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -57,11 +57,19 @@ public class SinkConnectorConfig extends ConnectorConfig {
public static final short DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT = 3;
private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY = "Dead Letter Queue Topic Replication Factor";
+ public static final String DLQ_CONTEXT_HEADERS_ENABLE_CONFIG = DLQ_PREFIX + "context.headers.enable";
+ public static final boolean DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT = false;
+ public static final String DLQ_CONTEXT_HEADERS_ENABLE_DOC = "If true, add headers containing error context to the messages " +
+ "written to the dead letter queue. To avoid clashing with headers from the original record, all error context header " +
+ "keys, all error context header keys will start with <code>__connect.errors.</code>";
+ private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers";
+
static ConfigDef config = ConnectorConfig.configDef()
.define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
.define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
.define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
- .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY);
+ .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY)
+ .define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
public static ConfigDef configDef() {
return config;
@@ -107,4 +115,8 @@ public class SinkConnectorConfig extends ConnectorConfig {
public short dlqTopicReplicationFactor() {
return getShort(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG);
}
+
+ public boolean isDlqContextHeadersEnabled() {
+ return getBoolean(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG);
+ }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 97e68fa..c794eb8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -530,7 +530,7 @@ public class Worker {
// check if topic for dead letter queue exists
String topic = connConfig.dlqTopicName();
if (topic != null && !topic.isEmpty()) {
- DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, connConfig, producerProps);
+ DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps);
reporters.add(reporter);
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index 459eeae..d36ec22 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -22,13 +22,19 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -46,12 +52,26 @@ public class DeadLetterQueueReporter implements ErrorReporter {
private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
+ public static final String HEADER_PREFIX = "__connect.errors.";
+ public static final String ERROR_HEADER_ORIG_TOPIC = HEADER_PREFIX + "topic";
+ public static final String ERROR_HEADER_ORIG_PARTITION = HEADER_PREFIX + "partition";
+ public static final String ERROR_HEADER_ORIG_OFFSET = HEADER_PREFIX + "offset";
+ public static final String ERROR_HEADER_CONNECTOR_NAME = HEADER_PREFIX + "connector.name";
+ public static final String ERROR_HEADER_TASK_ID = HEADER_PREFIX + "task.id";
+ public static final String ERROR_HEADER_STAGE = HEADER_PREFIX + "stage";
+ public static final String ERROR_HEADER_EXECUTING_CLASS = HEADER_PREFIX + "class.name";
+ public static final String ERROR_HEADER_EXCEPTION = HEADER_PREFIX + "exception.class.name";
+ public static final String ERROR_HEADER_EXCEPTION_MESSAGE = HEADER_PREFIX + "exception.message";
+ public static final String ERROR_HEADER_EXCEPTION_STACK_TRACE = HEADER_PREFIX + "exception.stacktrace";
+
private final SinkConnectorConfig connConfig;
+ private final ConnectorTaskId connectorTaskId;
private KafkaProducer<byte[], byte[]> kafkaProducer;
private ErrorHandlingMetrics errorHandlingMetrics;
public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig,
+ ConnectorTaskId id,
SinkConnectorConfig sinkConfig, Map<String, Object> producerProps) {
String topic = sinkConfig.dlqTopicName();
@@ -70,7 +90,7 @@ public class DeadLetterQueueReporter implements ErrorReporter {
}
KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps);
- return new DeadLetterQueueReporter(dlqProducer, sinkConfig);
+ return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id);
}
/**
@@ -79,9 +99,10 @@ public class DeadLetterQueueReporter implements ErrorReporter {
* @param kafkaProducer a Kafka Producer to produce the original consumed records.
*/
// Visible for testing
- DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, SinkConnectorConfig connConfig) {
+ DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, SinkConnectorConfig connConfig, ConnectorTaskId id) {
this.kafkaProducer = kafkaProducer;
this.connConfig = connConfig;
+ this.connectorTaskId = id;
}
@Override
@@ -117,6 +138,10 @@ public class DeadLetterQueueReporter implements ErrorReporter {
originalMessage.key(), originalMessage.value(), originalMessage.headers());
}
+ if (connConfig.isDlqContextHeadersEnabled()) {
+ populateContextHeaders(producerRecord, context);
+ }
+
this.kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
log.error("Could not produce message to dead letter queue. topic=" + dlqTopicName, exception);
@@ -124,4 +149,52 @@ public class DeadLetterQueueReporter implements ErrorReporter {
}
});
}
+
+ // Visible for testing
+ void populateContextHeaders(ProducerRecord<byte[], byte[]> producerRecord, ProcessingContext context) {
+ Headers headers = producerRecord.headers();
+ if (context.consumerRecord() != null) {
+ headers.add(ERROR_HEADER_ORIG_TOPIC, toBytes(context.consumerRecord().topic()));
+ headers.add(ERROR_HEADER_ORIG_PARTITION, toBytes(context.consumerRecord().partition()));
+ headers.add(ERROR_HEADER_ORIG_OFFSET, toBytes(context.consumerRecord().offset()));
+ }
+
+ headers.add(ERROR_HEADER_CONNECTOR_NAME, toBytes(connectorTaskId.connector()));
+ headers.add(ERROR_HEADER_TASK_ID, toBytes(String.valueOf(connectorTaskId.task())));
+ headers.add(ERROR_HEADER_STAGE, toBytes(context.stage().name()));
+ headers.add(ERROR_HEADER_EXECUTING_CLASS, toBytes(context.executingClass().getName()));
+ if (context.error() != null) {
+ headers.add(ERROR_HEADER_EXCEPTION, toBytes(context.error().getClass().getName()));
+ headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, toBytes(context.error().getMessage()));
+ byte[] trace;
+ if ((trace = stacktrace(context.error())) != null) {
+ headers.add(ERROR_HEADER_EXCEPTION_STACK_TRACE, trace);
+ }
+ }
+ }
+
+ private byte[] stacktrace(Throwable error) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ PrintStream stream = new PrintStream(bos, true, "UTF-8");
+ error.printStackTrace(stream);
+ bos.close();
+ return bos.toByteArray();
+ } catch (IOException e) {
+ log.error("Could not serialize stacktrace.", e);
+ }
+ return null;
+ }
+
+ private byte[] toBytes(int value) {
+ return toBytes(String.valueOf(value));
+ }
+
+ private byte[] toBytes(long value) {
+ return toBytes(String.valueOf(value));
+ }
+
+ private byte[] toBytes(String value) {
+ return value.getBytes(StandardCharsets.UTF_8);
+ }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index f35c514..f199982 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -18,7 +18,10 @@ package org.apache.kafka.connect.runtime.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectorConfig;
@@ -26,6 +29,7 @@ import org.apache.kafka.connect.runtime.MockConnectMetrics;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.EasyMock;
import org.easymock.Mock;
@@ -43,8 +47,19 @@ import java.util.concurrent.Future;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_CONNECTOR_NAME;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_STACK_TRACE;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXECUTING_CLASS;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_OFFSET;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_PARTITION;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_STAGE;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_TASK_ID;
import static org.easymock.EasyMock.replay;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("javax.management.*")
@@ -81,7 +96,7 @@ public class ErrorReporterTest {
@Test
public void testDLQConfigWithEmptyTopicName() {
- DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(emptyMap()));
+ DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(emptyMap()), TASK_ID);
deadLetterQueueReporter.metrics(errorHandlingMetrics);
ProcessingContext context = processingContext();
@@ -96,7 +111,7 @@ public class ErrorReporterTest {
@Test
public void testDLQConfigWithValidTopicName() {
- DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)));
+ DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID);
deadLetterQueueReporter.metrics(errorHandlingMetrics);
ProcessingContext context = processingContext();
@@ -111,7 +126,7 @@ public class ErrorReporterTest {
@Test
public void testReportDLQTwice() {
- DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)));
+ DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID);
deadLetterQueueReporter.metrics(errorHandlingMetrics);
ProcessingContext context = processingContext();
@@ -189,6 +204,64 @@ public class ErrorReporterTest {
assertEquals(configuration.dlqTopicReplicationFactor(), 7);
}
+ public void testDlqHeaderConsumerRecord() {
+ Map<String, String> props = new HashMap<>();
+ props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+ props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
+ DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(props), TASK_ID);
+
+ ProcessingContext context = new ProcessingContext();
+ context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, "source-key".getBytes(), "source-value".getBytes()));
+ context.currentContext(Stage.TRANSFORMATION, Transformation.class);
+ context.error(new ConnectException("Test Exception"));
+
+ ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(DLQ_TOPIC, "source-key".getBytes(), "source-value".getBytes());
+
+ deadLetterQueueReporter.populateContextHeaders(producerRecord, context);
+ assertEquals("source-topic", headerValue(producerRecord, ERROR_HEADER_ORIG_TOPIC));
+ assertEquals("7", headerValue(producerRecord, ERROR_HEADER_ORIG_PARTITION));
+ assertEquals("10", headerValue(producerRecord, ERROR_HEADER_ORIG_OFFSET));
+ assertEquals(TASK_ID.connector(), headerValue(producerRecord, ERROR_HEADER_CONNECTOR_NAME));
+ assertEquals(String.valueOf(TASK_ID.task()), headerValue(producerRecord, ERROR_HEADER_TASK_ID));
+ assertEquals(Stage.TRANSFORMATION.name(), headerValue(producerRecord, ERROR_HEADER_STAGE));
+ assertEquals(Transformation.class.getName(), headerValue(producerRecord, ERROR_HEADER_EXECUTING_CLASS));
+ assertEquals(ConnectException.class.getName(), headerValue(producerRecord, ERROR_HEADER_EXCEPTION));
+ assertEquals("Test Exception", headerValue(producerRecord, ERROR_HEADER_EXCEPTION_MESSAGE));
+ assertTrue(headerValue(producerRecord, ERROR_HEADER_EXCEPTION_STACK_TRACE).length() > 0);
+ assertTrue(headerValue(producerRecord, ERROR_HEADER_EXCEPTION_STACK_TRACE).startsWith("org.apache.kafka.connect.errors.ConnectException: Test Exception"));
+ }
+
+ @Test
+ public void testDlqHeaderIsAppended() {
+ Map<String, String> props = new HashMap<>();
+ props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+ props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
+ DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(producer, config(props), TASK_ID);
+
+ ProcessingContext context = new ProcessingContext();
+ context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, "source-key".getBytes(), "source-value".getBytes()));
+ context.currentContext(Stage.TRANSFORMATION, Transformation.class);
+ context.error(new ConnectException("Test Exception"));
+
+ ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(DLQ_TOPIC, "source-key".getBytes(), "source-value".getBytes());
+ producerRecord.headers().add(ERROR_HEADER_ORIG_TOPIC, "dummy".getBytes());
+
+ deadLetterQueueReporter.populateContextHeaders(producerRecord, context);
+ int appearances = 0;
+ for (Header header: producerRecord.headers()) {
+ if (ERROR_HEADER_ORIG_TOPIC.equalsIgnoreCase(header.key())) {
+ appearances++;
+ }
+ }
+
+ assertEquals("source-topic", headerValue(producerRecord, ERROR_HEADER_ORIG_TOPIC));
+ assertEquals(2, appearances);
+ }
+
+ private String headerValue(ProducerRecord<byte[], byte[]> producerRecord, String headerSuffix) {
+ return new String(producerRecord.headers().lastHeader(headerSuffix).value());
+ }
+
private ProcessingContext processingContext() {
ProcessingContext context = new ProcessingContext();
context.consumerRecord(new ConsumerRecord<>(TOPIC, 5, 100, new byte[]{'a', 'b'}, new byte[]{'x'}));
--
To stop receiving notification emails like this one, please contact
ewencp@apache.org.