You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/18 00:31:34 UTC

[kafka] branch trunk updated: MINOR: provide an example for deserialization exception handler (#5231)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1546bcd  MINOR: provide an example for deserialization exception handler (#5231)
1546bcd is described below

commit 1546bcd87770fba60b9663a2dc202823a873ecb4
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Sun Jun 17 17:31:30 2018 -0700

    MINOR: provide an example for deserialization exception handler (#5231)
    
    Also added a paragraph from data types to link to the example code.
    
    Reviewers: Matthias J. Sax <mj...@apache.org>
---
 docs/streams/developer-guide/config-streams.html | 44 ++++++++++++++++++++++--
 docs/streams/developer-guide/datatypes.html      |  5 +++
 2 files changed, 46 insertions(+), 3 deletions(-)

diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html
index 6bba10d..2c6bce1 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -300,8 +300,10 @@
           <span id="streams-developer-guide-deh"></span><h4><a class="toc-backref" href="#id7">default.deserialization.exception.handler</a><a class="headerlink" href="#default-deserialization-exception-handler" title="Permalink to this headline"></a></h4>
           <blockquote>
             <div><p>The default deserialization exception handler allows you to manage record exceptions that fail to deserialize. This
-              can be caused by corrupt data, incorrect serialization logic, or unhandled record types. These exception handlers
-              are available:</p>
+              can be caused by corrupt data, incorrect serialization logic, or unhandled record types. The implemented exception
+              handler needs to return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning
+              <code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams should ignore the issue
+              and continue processing. The following library built-in exception handlers are available:</p>
               <ul class="simple">
                 <li><a class="reference external" href="../../../javadoc/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.html">LogAndContinueExceptionHandler</a>:
                   This handler logs the deserialization exception and then signals the processing pipeline to continue processing more records.
@@ -310,6 +312,42 @@
                 <li><a class="reference external" href="../../../javadoc/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.html">LogAndFailExceptionHandler</a>.
                   This handler logs the deserialization exception and then signals the processing pipeline to stop processing more records.</li>
               </ul>
+
+              <p>You can also provide your own customized exception handler besides the library provided ones to meet your needs. For example, you can choose to forward corrupt
+                records into a quarantine topic (think: a "dead letter queue") for further processing. To do this, use the Producer API to write a corrupted record directly to
+                the quarantine topic. To be more concrete, you can create a separate <code>KafkaProducer</code> object outside the Streams client, and pass in this object
+                as well as the dead letter queue topic name into the <code>Properties</code> map, which then can be retrieved from the <code>configure</code> function call.
+                The drawback of this approach is that "manual" writes are side effects that are invisible to the Kafka Streams runtime library,
+                so they do not benefit from the end-to-end processing guarantees of the Streams API:</p>
+
+              <pre class="brush: java;">
+              public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
+                  KafkaProducer&lt;byte[], byte[]&gt; dlqProducer;
+                  String dlqTopic;
+
+                  @Override
+                  public DeserializationHandlerResponse handle(final ProcessorContext context,
+                                                               final ConsumerRecord&lt;byte[], byte[]&gt; record,
+                                                               final Exception exception) {
+
+                      log.warn("Exception caught during Deserialization, sending to the dead queue topic; " +
+                          "taskId: {}, topic: {}, partition: {}, offset: {}",
+                          context.taskId(), record.topic(), record.partition(), record.offset(),
+                          exception);
+
+                      dlqProducer.send(new ProducerRecord&lt;&gt;(dlqTopic, record.timestamp(), record.key(), record.value(), record.headers())).get();
+
+                      return DeserializationHandlerResponse.CONTINUE;
+                  }
+
+                  @Override
+                  public void configure(final Map&lt;String, ?&gt; configs) {
+                      dlqProducer = .. // get a producer from the configs map
+                      dlqTopic = .. // get the topic name from the configs map
+                  }
+              }
+              </pre>
+
             </div></blockquote>
         </div>
         <div class="section" id="default-production-exception-handler">
@@ -329,7 +367,7 @@
             import org.apache.kafka.streams.errors.ProductionExceptionHandler;
             import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
 
-            class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
+            public class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
                 public void configure(Map&lt;String, Object&gt; config) {}
 
                 public ProductionExceptionHandlerResponse handle(final ProducerRecord&lt;byte[], byte[]&gt; record,
diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html
index 1120815..a24dc4c 100644
--- a/docs/streams/developer-guide/datatypes.html
+++ b/docs/streams/developer-guide/datatypes.html
@@ -93,6 +93,11 @@
 <span class="n">userCountByRegion</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;RegionCountsTopic&quot;</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">valueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span>
 </pre></div>
       </div>
+      <p>If some of your incoming records are corrupted or ill-formatted, they will cause the deserializer class to report an error.
+         Since 1.0.x we have introduced an <code>DeserializationExceptionHandler</code> interface which allows
+         you to customize how to handle such records. The customized implementation of the interface can be specified via the <code>StreamsConfig</code>.
+         For more details, please feel free to read the <a href="config-streams.html#default-deserialization-exception-handler">Configuring a Streams Application</a> section.
+      </p>
     </div>
     <div class="section" id="available-serdes">
       <span id="streams-developer-guide-serdes-available"></span><h2>Available SerDes<a class="headerlink" href="#available-serdes" title="Permalink to this headline"></a></h2>

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.