You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/07/01 15:19:40 UTC

[kafka] branch 2.6 updated: KAFKA-10153: Error Reporting in Connect Documentation (#8858)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new a677522  KAFKA-10153: Error Reporting in Connect Documentation (#8858)
a677522 is described below

commit a677522d0fc3fda37e410b0622e72bfe63ce25b9
Author: Aakash Shah <as...@confluent.io>
AuthorDate: Wed Jul 1 08:18:16 2020 -0700

    KAFKA-10153: Error Reporting in Connect Documentation (#8858)
    
    Added a section about error reporting in Connect documentation, and another about how to safely use the new errant record reporter in SinkTask implementations.
    
    Author: Aakash Shah <as...@confluent.io>
    Reviewer: Randall Hauch <rh...@gmail.com>
---
 docs/connect.html | 78 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 78 insertions(+)

diff --git a/docs/connect.html b/docs/connect.html
index 18ab5fb..797c1fe 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -327,6 +327,48 @@
         <li><code>GET /</code>- return basic information about the Kafka Connect cluster such as the version of the Connect worker that serves the REST request (including git commit ID of the source code) and the Kafka cluster ID that is connected to.
     </ul>
 
+    <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reporting in Connect</a></h4>
+
+    <p>Kafka Connect provides error reporting to handle errors encountered along various stages of processing. By default, any error encountered during conversion or within transformations will cause the connector to fail. Each connector configuration can also enable tolerating such errors by skipping them, optionally writing each error and the details of the failed operation and problematic record (with various levels of detail) to the Connect application log. These mechanisms also capt [...]
+
+    <p>To report errors within a connector's converter, transforms, or within the sink connector itself to the log, set <code>errors.log.enable=true</code> in the connector configuration to log details of each error and problem record's topic, partition, and offset. For additional debugging purposes, set <code>errors.log.include.messages=true</code> to also log the problem record key, value, and headers to the log (note this may log sensitive information).</p>
+
+    <p>To report errors within a connector's converter, transforms, or within the sink connector itself to a dead letter queue topic, set <code>errors.deadletterqueue.topic.name</code>, and optionally <code>errors.deadletterqueue.context.headers.enable=true</code>.</p>
+
+    <p>By default connectors exhibit "fail fast" behavior immediately upon an error or exception. This is equivalent to adding the following configuration properties with their defaults to a connector configuration:</p>
+
+    <pre class="brush: text;">
+        # disable retries on failure
+        errors.retry.timeout=0
+
+        # do not log the error and their contexts
+        errors.log.enable=false
+
+        # do not record errors in a dead letter queue topic
+        errors.deadletterqueue.topic.name=
+
+        # Fail on first error
+        errors.tolerance=none
+    </pre>
+
+    <p>These and other related connector configuration properties can be changed to provide different behavior. For example, the following configuration properties can be added to a connector configuration to setup error handling with multiple retries, logging to the application logs and the <code>my-connector-errors</code> Kafka topic, and tolerating all errors by reporting them rather than failing the connector task:</p>
+
+    <pre class="brush: text;">
+        # retry for at most 10 minutes times waiting up to 30 seconds between consecutive failures
+        errors.retry.timeout=600000
+        errors.retry.delay.max.ms=30000
+
+        # log error context along with application logs, but do not include configs and messages
+        errors.log.enable=true
+        errors.log.include.messages=false
+
+        # produce error context into the Kafka topic
+        errors.deadletterqueue.topic.name=my-connector-errors
+
+        # Tolerate all errors.
+        errors.tolerance=all
+    </pre>
+
     <h3><a id="connect_development" href="#connect_development">8.3 Connector Development Guide</a></h3>
 
     <p>This guide describes how developers can write new connectors for Kafka Connect to move data between Kafka and other systems. It briefly reviews a few key concepts and then describes how to create a simple connector.</p>
@@ -498,6 +540,42 @@
     <p>The <code>flush()</code> method is used during the offset commit process, which allows tasks to recover from failures and resume from a safe point such that no events will be missed. The method should push any outstanding data to the destination system and then block until the write has been acknowledged. The <code>offsets</code> parameter can often be ignored, but is useful in some cases where implementations want to store offset information in the destination store to provide ex [...]
     delivery. For example, an HDFS connector could do this and use atomic move operations to make sure the <code>flush()</code> operation atomically commits the data and offsets to a final location in HDFS.</p>
 
+    <h5><a id="connect_errantrecordreporter" href="connect_errantrecordreporter">Errant Record Reporter</a></h5>
+
+    <p>When <a href="#connect_errorreporting">error reporting</a> is enabled for a connector, the connector can use an <code>ErrantRecordReporter</code> to report problems with individual records sent to a sink connector. The following example shows how a connector's <code>SinkTask</code> subclass might obtain and use the <code>ErrantRecordReporter</code>, safely handling a null reporter when the DLQ is not enabled or when the connector is installed in an older Connect runtime that doesn [...]
+
+    <pre class="brush: java;">
+        private ErrantRecordReporter reporter;
+
+        @Override
+        public void start(Map&lt;String, String&gt; props) {
+            ...
+            try {
+                reporter = context.errantRecordReporter(); // may be null if DLQ not enabled
+            } catch (NoSuchMethodException | NoClassDefFoundError e) {
+                // Will occur in Connect runtimes earlier than 2.6
+                reporter = null;
+            }
+        }
+
+        @Override
+        public void put(Collection&lt;SinkRecord&gt; records) {
+            for (SinkRecord record: records) {
+                try {
+                    // attempt to process and send record to data sink
+                    process(record);
+                } catch(Exception e) {
+                    if (reporter != null) {
+                        // Send errant record to error reporter
+                        reporter.report(record, e);
+                    } else {
+                        // There's no error reporter, so fail
+                        throw new ConnectException("Failed on record", e);
+                    }
+                }
+            }
+        }
+    </pre>
 
     <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Offsets</a></h5>