You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2018/01/08 11:33:27 UTC
[kafka] branch trunk updated: MINOR: Add documentation for
KAFKA-6086 (ProductionExceptionHandler) (#4395)
This is an automated email from the ASF dual-hosted git repository.
damianguy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e1c5d0c MINOR: Add documentation for KAFKA-6086 (ProductionExceptionHandler) (#4395)
e1c5d0c is described below
commit e1c5d0c119b38a9ddb2b09b6309a3817d86d8e14
Author: Matt Farmer <ma...@frmr.me>
AuthorDate: Mon Jan 8 06:33:23 2018 -0500
MINOR: Add documentation for KAFKA-6086 (ProductionExceptionHandler) (#4395)
* Update streams documentation to describe production exception handler
* Add a mention of the ProductionExceptionHandler in the upgrade guide
---
docs/streams/developer-guide/config-streams.html | 76 +++++++++++++++++++-----
docs/streams/upgrade-guide.html | 9 ++-
2 files changed, 68 insertions(+), 17 deletions(-)
diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html
index dbac7fb..256cc18 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -69,6 +69,7 @@
</li>
<li><a class="reference internal" href="#optional-configuration-parameters" id="id6">Optional configuration parameters</a><ul>
<li><a class="reference internal" href="#default-deserialization-exception-handler" id="id7">default.deserialization.exception.handler</a></li>
+ <li><a class="reference internal" href="#default-production-exception-handler" id="id24">default.production.exception.handler</a></li>
<li><a class="reference internal" href="#default-key-serde" id="id8">default.key.serde</a></li>
<li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
<li><a class="reference internal" href="#num-standby-replicas" id="id10">num.standby.replicas</a></li>
@@ -216,77 +217,82 @@
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">DeserializationExceptionHandler</span></code> interface.</td>
<td>30000 milliseconds</td>
</tr>
- <tr class="row-even"><td>key.serde</td>
+ <tr class="row-even"><td>default.production.exception.handler</td>
+ <td>Medium</td>
+ <td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProductionExceptionHandler</span></code> interface.</td>
+ <td><code class="docutils literal"><span class="pre">DefaultProductionExceptionHandler</span></code></td>
+ </tr>
+ <tr class="row-odd"><td>key.serde</td>
<td>Medium</td>
<td colspan="2">Default serializer/deserializer class for record keys, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface (see also value.serde).</td>
<td><code class="docutils literal"><span class="pre">Serdes.ByteArray().getClass().getName()</span></code></td>
</tr>
- <tr class="row-odd"><td>metric.reporters</td>
+ <tr class="row-even"><td>metric.reporters</td>
<td>Low</td>
<td colspan="2">A list of classes to use as metrics reporters.</td>
<td>the empty list</td>
</tr>
- <tr class="row-even"><td>metrics.num.samples</td>
+ <tr class="row-odd"><td>metrics.num.samples</td>
<td>Low</td>
<td colspan="2">The number of samples maintained to compute metrics.</td>
<td>2</td>
</tr>
- <tr class="row-odd"><td>metrics.recording.level</td>
+ <tr class="row-even"><td>metrics.recording.level</td>
<td>Low</td>
<td colspan="2">The highest recording level for metrics.</td>
<td><code class="docutils literal"><span class="pre">INFO</span></code></td>
</tr>
- <tr class="row-even"><td>metrics.sample.window.ms</td>
+ <tr class="row-odd"><td>metrics.sample.window.ms</td>
<td>Low</td>
<td colspan="2">The window of time a metrics sample is computed over.</td>
<td>30000 milliseconds</td>
</tr>
- <tr class="row-odd"><td>num.standby.replicas</td>
+ <tr class="row-even"><td>num.standby.replicas</td>
<td>Medium</td>
<td colspan="2">The number of standby replicas for each task.</td>
<td>0</td>
</tr>
- <tr class="row-even"><td>num.stream.threads</td>
+ <tr class="row-odd"><td>num.stream.threads</td>
<td>Medium</td>
<td colspan="2">The number of threads to execute stream processing.</td>
<td>1</td>
</tr>
- <tr class="row-odd"><td>partition.grouper</td>
+ <tr class="row-even"><td>partition.grouper</td>
<td>Low</td>
<td colspan="2">Partition grouper class that implements the <code class="docutils literal"><span class="pre">PartitionGrouper</span></code> interface.</td>
<td>See <a class="reference internal" href="#streams-developer-guide-partition-grouper"><span class="std std-ref">Partition Grouper</span></a></td>
</tr>
- <tr class="row-even"><td>poll.ms</td>
+ <tr class="row-odd"><td>poll.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds to block waiting for input.</td>
<td>100 milliseconds</td>
</tr>
- <tr class="row-odd"><td>replication.factor</td>
+ <tr class="row-even"><td>replication.factor</td>
<td>High</td>
<td colspan="2">The replication factor for changelog topics and repartition topics created by the application.</td>
<td>1</td>
</tr>
- <tr class="row-even"><td>state.cleanup.delay.ms</td>
+ <tr class="row-odd"><td>state.cleanup.delay.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td>
<td>6000000 milliseconds</td>
</tr>
- <tr class="row-odd"><td>state.dir</td>
+ <tr class="row-even"><td>state.dir</td>
<td>High</td>
<td colspan="2">Directory location for state stores.</td>
<td><code class="docutils literal"><span class="pre">/var/lib/kafka-streams</span></code></td>
</tr>
- <tr class="row-even"><td>timestamp.extractor</td>
+ <tr class="row-odd"><td>timestamp.extractor</td>
<td>Medium</td>
<td colspan="2">Timestamp extractor class that implements the <code class="docutils literal"><span class="pre">TimestampExtractor</span></code> interface.</td>
<td>See <a class="reference internal" href="#streams-developer-guide-timestamp-extractor"><span class="std std-ref">Timestamp Extractor</span></a></td>
</tr>
- <tr class="row-odd"><td>value.serde</td>
+ <tr class="row-even"><td>value.serde</td>
<td>Medium</td>
<td colspan="2">Default serializer/deserializer class for record values, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface (see also key.serde).</td>
<td><code class="docutils literal"><span class="pre">Serdes.ByteArray().getClass().getName()</span></code></td>
</tr>
- <tr class="row-even"><td>windowstore.changelog.additional.retention.ms</td>
+ <tr class="row-odd"><td>windowstore.changelog.additional.retention.ms</td>
<td>Low</td>
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
<td>86400000 milliseconds = 1 day</td>
@@ -309,6 +315,44 @@
</ul>
</div></blockquote>
</div>
+ <div class="section" id="default-production-exception-handler">
+ <span id="streams-developer-guide-peh"></span><h4><a class="toc-backref" href="#id24">default.production.exception.handler</a></a class="headerlink" href="#default-production-exception-handler" title="Permalink to this headline"></a></h4>
+ <blockquote>
+ <div><p>The default production exception handler allows you to manage exceptions triggered when trying to interact with a broker
+ such as attempting to produce a record that is too large. By default, Kafka provides and uses the <a class="reference external" href="/4.0.0/streams/javadocs/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.html">DefaultProductionExceptionHandler</a>
+ that always fails when these exceptions occur.</p>
+
+ <p>Each exception handler can return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning <code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams
+ should ignore the issue and continue processing. If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following:</p>
+
+ <pre class="brush: java;">
+ import java.util.Properties;
+ import org.apache.kafka.streams.StreamsConfig;
+ import org.apache.kafka.common.errors.RecordTooLargeException;
+ import org.apache.kafka.streams.errors.ProductionExceptionHandler;
+ import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
+
+ class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
+ public void configure(Map<String, Object> config) {}
+
+ public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
+ final Exception exception) {
+ if (exception instanceof RecordTooLargeException) {
+ return ProductionExceptionHandlerResponse.CONTINUE;
+ } else {
+ return ProductionExceptionHandlerResponse.FAIL;
+ }
+ }
+ }
+
+ Properties settings = new Properties();
+
+ // other various kafka streams settings, e.g. bootstrap servers, application id, etc
+
+ settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
+ IgnoreRecordTooLargeHandler.class);</pre></div>
+ </blockquote>
+ </div>
<div class="section" id="default-key-serde">
<h4><a class="toc-backref" href="#id8">default.key.serde</a><a class="headerlink" href="#default-key-serde" title="Permalink to this headline"></a></h4>
<blockquote>
@@ -714,4 +758,4 @@
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
});
- </script>
\ No newline at end of file
+ </script>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index a878331..b218ba8 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -70,6 +70,13 @@
<li> added <code>getAdminClient(config)</code> that allows to override an <code>AdminClient</code> used for administrative requests such as internal topic creations, etc. </li>
</ul>
+ <p>New error handling for exceptions during production:</p>
+ <ul>
+ <li>added interface <code>ProductionExceptionHandler</code> that allows implementors to decide whether or not Streams should <code>FAIL</code> or <code>CONTINUE</code> when certain exception occur while trying to produce.</li>
+ <li>provided an implementation, <code>DefaultProductionExceptionHandler</code> that always fails, preserving the existing behavior by default.</li>
+ <li>changing which implementation is used can be done by settings <code>default.production.exception.handler</code> to the fully qualified name of a class implementing this interface.</li>
+ </ul>
+
<h3><a id="streams_api_changes_100" href="#streams_api_changes_100">Streams API changes in 1.0.0</a></h3>
<p>
@@ -260,7 +267,7 @@
If exactly-once processing is enabled via the <code>processing.guarantees</code> parameter, internally Streams switches from a producer per thread to a producer per task runtime model.
In order to distinguish the different producers, the producer's <code>client.id</code> additionally encodes the task-ID for this case.
Because the producer's <code>client.id</code> is used to report JMX metrics, it might be required to update tools that receive those metrics.
-
+
</p>
<p> Producer's <code>client.id</code> naming schema: </p>
--
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].