You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/18 00:31:34 UTC
[kafka] branch trunk updated: MINOR: provide an example for
deserialization exception handler (#5231)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1546bcd MINOR: provide an example for deserialization exception handler (#5231)
1546bcd is described below
commit 1546bcd87770fba60b9663a2dc202823a873ecb4
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Sun Jun 17 17:31:30 2018 -0700
MINOR: provide an example for deserialization exception handler (#5231)
Also added a paragraph from data types to link to the example code.
Reviewers: Matthias J. Sax <mj...@apache.org>
---
docs/streams/developer-guide/config-streams.html | 44 ++++++++++++++++++++++--
docs/streams/developer-guide/datatypes.html | 5 +++
2 files changed, 46 insertions(+), 3 deletions(-)
diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html
index 6bba10d..2c6bce1 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -300,8 +300,10 @@
<span id="streams-developer-guide-deh"></span><h4><a class="toc-backref" href="#id7">default.deserialization.exception.handler</a><a class="headerlink" href="#default-deserialization-exception-handler" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The default deserialization exception handler allows you to manage record exceptions that fail to deserialize. This
- can be caused by corrupt data, incorrect serialization logic, or unhandled record types. These exception handlers
- are available:</p>
+ can be caused by corrupt data, incorrect serialization logic, or unhandled record types. The implemented exception
+ handler needs to return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning
+ <code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams should ignore the issue
+ and continue processing. The following library built-in exception handlers are available:</p>
<ul class="simple">
<li><a class="reference external" href="../../../javadoc/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.html">LogAndContinueExceptionHandler</a>:
This handler logs the deserialization exception and then signals the processing pipeline to continue processing more records.
@@ -310,6 +312,42 @@
<li><a class="reference external" href="../../../javadoc/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.html">LogAndFailExceptionHandler</a>.
This handler logs the deserialization exception and then signals the processing pipeline to stop processing more records.</li>
</ul>
+
+ <p>You can also provide your own customized exception handler besides the library provided ones to meet your needs. For example, you can choose to forward corrupt
+ records into a quarantine topic (think: a "dead letter queue") for further processing. To do this, use the Producer API to write a corrupted record directly to
+ the quarantine topic. To be more concrete, you can create a separate <code>KafkaProducer</code> object outside the Streams client, and pass in this object
+ as well as the dead letter queue topic name into the <code>Properties</code> map, which then can be retrieved from the <code>configure</code> function call.
+ The drawback of this approach is that "manual" writes are side effects that are invisible to the Kafka Streams runtime library,
+ so they do not benefit from the end-to-end processing guarantees of the Streams API:</p>
+
+ <pre class="brush: java;">
+ public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
+ KafkaProducer<byte[], byte[]> dlqProducer;
+ String dlqTopic;
+
+ @Override
+ public DeserializationHandlerResponse handle(final ProcessorContext context,
+ final ConsumerRecord<byte[], byte[]> record,
+ final Exception exception) {
+
+ log.warn("Exception caught during Deserialization, sending to the dead queue topic; " +
+ "taskId: {}, topic: {}, partition: {}, offset: {}",
+ context.taskId(), record.topic(), record.partition(), record.offset(),
+ exception);
+
+ dlqProducer.send(new ProducerRecord<>(dlqTopic, record.timestamp(), record.key(), record.value(), record.headers())).get();
+
+ return DeserializationHandlerResponse.CONTINUE;
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs) {
+ dlqProducer = .. // get a producer from the configs map
+ dlqTopic = .. // get the topic name from the configs map
+ }
+ }
+ </pre>
+
</div></blockquote>
</div>
<div class="section" id="default-production-exception-handler">
@@ -329,7 +367,7 @@
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
- class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
+ public class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
public void configure(Map<String, Object> config) {}
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html
index 1120815..a24dc4c 100644
--- a/docs/streams/developer-guide/datatypes.html
+++ b/docs/streams/developer-guide/datatypes.html
@@ -93,6 +93,11 @@
<span class="n">userCountByRegion</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">"RegionCountsTopic"</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">valueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span>
</pre></div>
</div>
+ <p>If some of your incoming records are corrupted or ill-formatted, they will cause the deserializer class to report an error.
+ Since 1.0.x we have introduced an <code>DeserializationExceptionHandler</code> interface which allows
+ you to customize how to handle such records. The customized implementation of the interface can be specified via the <code>StreamsConfig</code>.
+ For more details, please feel free to read the <a href="config-streams.html#default-deserialization-exception-handler">Configuring a Streams Application</a> section.
+ </p>
</div>
<div class="section" id="available-serdes">
<span id="streams-developer-guide-serdes-available"></span><h2>Available SerDes<a class="headerlink" href="#available-serdes" title="Permalink to this headline"></a></h2>
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.