You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2018/01/08 11:33:27 UTC

[kafka] branch trunk updated: MINOR: Add documentation for KAFKA-6086 (ProductionExceptionHandler) (#4395)

This is an automated email from the ASF dual-hosted git repository.

damianguy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e1c5d0c  MINOR: Add documentation for KAFKA-6086 (ProductionExceptionHandler) (#4395)
e1c5d0c is described below

commit e1c5d0c119b38a9ddb2b09b6309a3817d86d8e14
Author: Matt Farmer <ma...@frmr.me>
AuthorDate: Mon Jan 8 06:33:23 2018 -0500

    MINOR: Add documentation for KAFKA-6086 (ProductionExceptionHandler) (#4395)
    
    * Update streams documentation to describe production exception handler
    
    * Add a mention of the ProductionExceptionHandler in the upgrade guide
---
 docs/streams/developer-guide/config-streams.html | 76 +++++++++++++++++++-----
 docs/streams/upgrade-guide.html                  |  9 ++-
 2 files changed, 68 insertions(+), 17 deletions(-)

diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html
index dbac7fb..256cc18 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -69,6 +69,7 @@
           </li>
           <li><a class="reference internal" href="#optional-configuration-parameters" id="id6">Optional configuration parameters</a><ul>
             <li><a class="reference internal" href="#default-deserialization-exception-handler" id="id7">default.deserialization.exception.handler</a></li>
+            <li><a class="reference internal" href="#default-production-exception-handler" id="id24">default.production.exception.handler</a></li>
             <li><a class="reference internal" href="#default-key-serde" id="id8">default.key.serde</a></li>
             <li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
             <li><a class="reference internal" href="#num-standby-replicas" id="id10">num.standby.replicas</a></li>
@@ -216,77 +217,82 @@
             <td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">DeserializationExceptionHandler</span></code> interface.</td>
             <td>30000 milliseconds</td>
           </tr>
-          <tr class="row-even"><td>key.serde</td>
+          <tr class="row-even"><td>default.production.exception.handler</td>
+            <td>Medium</td>
+            <td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProductionExceptionHandler</span></code> interface.</td>
+            <td><code class="docutils literal"><span class="pre">DefaultProductionExceptionHandler</span></code></td>
+          </tr>
+          <tr class="row-odd"><td>key.serde</td>
             <td>Medium</td>
             <td colspan="2">Default serializer/deserializer class for record keys, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface (see also value.serde).</td>
             <td><code class="docutils literal"><span class="pre">Serdes.ByteArray().getClass().getName()</span></code></td>
           </tr>
-          <tr class="row-odd"><td>metric.reporters</td>
+          <tr class="row-even"><td>metric.reporters</td>
             <td>Low</td>
             <td colspan="2">A list of classes to use as metrics reporters.</td>
             <td>the empty list</td>
           </tr>
-          <tr class="row-even"><td>metrics.num.samples</td>
+          <tr class="row-odd"><td>metrics.num.samples</td>
             <td>Low</td>
             <td colspan="2">The number of samples maintained to compute metrics.</td>
             <td>2</td>
           </tr>
-          <tr class="row-odd"><td>metrics.recording.level</td>
+          <tr class="row-even"><td>metrics.recording.level</td>
             <td>Low</td>
             <td colspan="2">The highest recording level for metrics.</td>
             <td><code class="docutils literal"><span class="pre">INFO</span></code></td>
           </tr>
-          <tr class="row-even"><td>metrics.sample.window.ms</td>
+          <tr class="row-odd"><td>metrics.sample.window.ms</td>
             <td>Low</td>
             <td colspan="2">The window of time a metrics sample is computed over.</td>
             <td>30000 milliseconds</td>
           </tr>
-          <tr class="row-odd"><td>num.standby.replicas</td>
+          <tr class="row-even"><td>num.standby.replicas</td>
             <td>Medium</td>
             <td colspan="2">The number of standby replicas for each task.</td>
             <td>0</td>
           </tr>
-          <tr class="row-even"><td>num.stream.threads</td>
+          <tr class="row-odd"><td>num.stream.threads</td>
             <td>Medium</td>
             <td colspan="2">The number of threads to execute stream processing.</td>
             <td>1</td>
           </tr>
-          <tr class="row-odd"><td>partition.grouper</td>
+          <tr class="row-even"><td>partition.grouper</td>
             <td>Low</td>
             <td colspan="2">Partition grouper class that implements the <code class="docutils literal"><span class="pre">PartitionGrouper</span></code> interface.</td>
             <td>See <a class="reference internal" href="#streams-developer-guide-partition-grouper"><span class="std std-ref">Partition Grouper</span></a></td>
           </tr>
-          <tr class="row-even"><td>poll.ms</td>
+          <tr class="row-odd"><td>poll.ms</td>
             <td>Low</td>
             <td colspan="2">The amount of time in milliseconds to block waiting for input.</td>
             <td>100 milliseconds</td>
           </tr>
-          <tr class="row-odd"><td>replication.factor</td>
+          <tr class="row-even"><td>replication.factor</td>
             <td>High</td>
             <td colspan="2">The replication factor for changelog topics and repartition topics created by the application.</td>
             <td>1</td>
           </tr>
-          <tr class="row-even"><td>state.cleanup.delay.ms</td>
+          <tr class="row-odd"><td>state.cleanup.delay.ms</td>
             <td>Low</td>
             <td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td>
             <td>6000000 milliseconds</td>
           </tr>
-          <tr class="row-odd"><td>state.dir</td>
+          <tr class="row-even"><td>state.dir</td>
             <td>High</td>
             <td colspan="2">Directory location for state stores.</td>
             <td><code class="docutils literal"><span class="pre">/var/lib/kafka-streams</span></code></td>
           </tr>
-          <tr class="row-even"><td>timestamp.extractor</td>
+          <tr class="row-odd"><td>timestamp.extractor</td>
             <td>Medium</td>
             <td colspan="2">Timestamp extractor class that implements the <code class="docutils literal"><span class="pre">TimestampExtractor</span></code> interface.</td>
             <td>See <a class="reference internal" href="#streams-developer-guide-timestamp-extractor"><span class="std std-ref">Timestamp Extractor</span></a></td>
           </tr>
-          <tr class="row-odd"><td>value.serde</td>
+          <tr class="row-even"><td>value.serde</td>
             <td>Medium</td>
             <td colspan="2">Default serializer/deserializer class for record values, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface (see also key.serde).</td>
             <td><code class="docutils literal"><span class="pre">Serdes.ByteArray().getClass().getName()</span></code></td>
           </tr>
-          <tr class="row-even"><td>windowstore.changelog.additional.retention.ms</td>
+          <tr class="row-odd"><td>windowstore.changelog.additional.retention.ms</td>
             <td>Low</td>
             <td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
             <td>86400000 milliseconds = 1 day</td>
@@ -309,6 +315,44 @@
               </ul>
             </div></blockquote>
         </div>
+        <div class="section" id="default-production-exception-handler">
+          <span id="streams-developer-guide-peh"></span><h4><a class="toc-backref" href="#id24">default.production.exception.handler</a></a class="headerlink" href="#default-production-exception-handler" title="Permalink to this headline"></a></h4>
+          <blockquote>
+            <div><p>The default production exception handler allows you to manage exceptions triggered when trying to interact with a broker
+              such as attempting to produce a record that is too large. By default, Kafka provides and uses the <a class="reference external" href="/4.0.0/streams/javadocs/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.html">DefaultProductionExceptionHandler</a>
+              that always fails when these exceptions occur.</p>
+
+            <p>Each exception handler can return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning <code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams
+            should ignore the issue and continue processing. If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following:</p>
+
+            <pre class="brush: java;">
+            import java.util.Properties;
+            import org.apache.kafka.streams.StreamsConfig;
+            import org.apache.kafka.common.errors.RecordTooLargeException;
+            import org.apache.kafka.streams.errors.ProductionExceptionHandler;
+            import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
+
+            class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
+                public void configure(Map&lt;String, Object&gt; config) {}
+
+                public ProductionExceptionHandlerResponse handle(final ProducerRecord&lt;byte[], byte[]&gt; record,
+                                                                 final Exception exception) {
+                    if (exception instanceof RecordTooLargeException) {
+                        return ProductionExceptionHandlerResponse.CONTINUE;
+                    } else {
+                        return ProductionExceptionHandlerResponse.FAIL;
+                    }
+                }
+            }
+
+            Properties settings = new Properties();
+
+            // other various kafka streams settings, e.g. bootstrap servers, application id, etc
+
+            settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
+                         IgnoreRecordTooLargeHandler.class);</pre></div>
+          </blockquote>
+        </div>
         <div class="section" id="default-key-serde">
           <h4><a class="toc-backref" href="#id8">default.key.serde</a><a class="headerlink" href="#default-key-serde" title="Permalink to this headline"></a></h4>
           <blockquote>
@@ -714,4 +758,4 @@
                         // Display docs subnav items
                         $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
                     });
-              </script>
\ No newline at end of file
+              </script>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index a878331..b218ba8 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -70,6 +70,13 @@
         <li> added <code>getAdminClient(config)</code> that allows to override an <code>AdminClient</code> used for administrative requests such as internal topic creations, etc. </li>
     </ul>
 
+    <p>New error handling for exceptions during production:</p>
+    <ul>
+        <li>added interface <code>ProductionExceptionHandler</code> that allows implementors to decide whether or not Streams should <code>FAIL</code> or <code>CONTINUE</code> when certain exception occur while trying to produce.</li>
+        <li>provided an implementation, <code>DefaultProductionExceptionHandler</code> that always fails, preserving the existing behavior by default.</li>
+        <li>changing which implementation is used can be done by settings <code>default.production.exception.handler</code> to the fully qualified name of a class implementing this interface.</li>
+    </ul>
+
     <h3><a id="streams_api_changes_100" href="#streams_api_changes_100">Streams API changes in 1.0.0</a></h3>
 
     <p>
@@ -260,7 +267,7 @@
         If exactly-once processing is enabled via the <code>processing.guarantees</code> parameter, internally Streams switches from a producer per thread to a producer per task runtime model.
         In order to distinguish the different producers, the producer's <code>client.id</code> additionally encodes the task-ID for this case.
         Because the producer's <code>client.id</code> is used to report JMX metrics, it might be required to update tools that receive those metrics.
-    
+
     </p>
 
     <p> Producer's <code>client.id</code> naming schema: </p>

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].