You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/10/16 04:04:09 UTC

[kafka] branch trunk updated: KAFKA-7223: Suppression documentation (#5787)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4b7148a  KAFKA-7223: Suppression documentation (#5787)
4b7148a is described below

commit 4b7148a5b66d2207c4c6ef1d2729ce32ec6f8bcf
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Mon Oct 15 23:04:01 2018 -0500

    KAFKA-7223: Suppression documentation (#5787)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
 docs/ops.html                                      |  12 ++-
 docs/streams/developer-guide/dsl-api.html          | 113 +++++++++++++++++++++
 .../streams/kstream/internals/metrics/Sensors.java |   4 +-
 .../internals/KStreamWindowAggregateTest.java      |   4 +-
 4 files changed, 128 insertions(+), 5 deletions(-)

diff --git a/docs/ops.html b/docs/ops.html
index d57f1cf..158602b 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -257,7 +257,7 @@
     </li>
   </ul>
 
-  Please note, that out of range offsets will be adjusted to available offset end. For example, if offset end is at 10 and offset shift request is 
+  Please note, that out of range offsets will be adjusted to available offset end. For example, if offset end is at 10 and offset shift request is
   of 15, then, offset at 10 will actually be selected.
 
   <p>
@@ -1546,6 +1546,16 @@ All the following metrics have a recording level of ``debug``:
         <td>The total number of commit calls. </td>
         <td>kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)</td>
       </tr>
+      <tr>
+        <td>record-lateness-avg</td>
+        <td>The average observed lateness of records.</td>
+        <td>kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>record-lateness-max</td>
+        <td>The max observed lateness of records.</td>
+        <td>kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)</td>
+      </tr>
  </tbody>
 </table>
 
diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index d9dd220..4b91eaa 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -58,6 +58,7 @@
                             <li><a class="reference internal" href="#hopping-time-windows" id="id21">Hopping time windows</a></li>
                             <li><a class="reference internal" href="#sliding-time-windows" id="id22">Sliding time windows</a></li>
                             <li><a class="reference internal" href="#session-windows" id="id23">Session Windows</a></li>
+                            <li><a class="reference internal" href="#window-final-results" id="id31">Window Final Results</a></li>
                         </ul>
                         </li>
                     </ul>
@@ -65,6 +66,7 @@
                     <li><a class="reference internal" href="#applying-processors-and-transformers-processor-api-integration" id="id24">Applying processors and transformers (Processor API integration)</a></li>
                 </ul>
                 </li>
+                <li><a class="reference internal" href="#controlling-emit-rate" id="id32">Controlling KTable update rate</a></li>
                 <li><a class="reference internal" href="#writing-streams-back-to-kafka" id="id25">Writing streams back to Kafka</a></li>
                 <li><a class="reference internal" href="#testing-a-streams-app" id="id26">Testing a Streams application</a></li>
                 <li><a class="reference internal" href="#scala-dsl" id="id27">Kafka Streams DSL for Scala</a></li>
@@ -2969,6 +2971,73 @@ convert from minutes to milliseconds (e.g. t=5 would become t=300,000).</span></
 t=5 (blue), which lead to a merge of sessions and an extension of a session, respectively.</span></p>
                         </div>
                     </div>
+		    <div class="section" id="window-final-results">
+			    <span id="windowing-final-results"></span><h5><a class="toc-backref" href="#id31">Window Final Results</a><a class="headerlink" href="#window-final-results" title="Permalink to this headline"></a></h5>
+			    <p>In Kafka Streams, windowed computations update their results continuously.
+			       As new data arrives for a window, freshly computed results are emitted downstream.
+			       For many applications, this is ideal, since fresh results are always available.
+			       and Kafka Streams is designed to make programming continuous computations seamless.
+			       However, some applications need to take action <strong>only</strong> on the final result of a windowed computation.
+			       Common examples of this are sending alerts or delivering results to a system that doesn't support updates.
+			    </p>
+			    <p>Suppose that you have an hourly windowed count of events per user.
+			       If you want to send an alert when a user has <em>less than</em> three events in an hour, you have a real challange.
+			       All users would match this condition at first, until they accrue enough events, so you cannot simply
+			       send an alert when someone matches the condition; you have to wait until you know you won't see any more events for a particular window
+			       and <em>then</em> send the alert.
+			    </p>
+                            <p>Kafka Streams offers a clean way to define this logic: after defining your windowed computation, you can
+			       <span class="pre">suppress</span> the intermediate results, emitting the final count for each user when the window is <strong>closed</strong>.
+			    </p>
+			    <p>For example:</p>
+			    <div class="highlight-java"><div class="highlight">
+<pre>
+KGroupedStream&lt;UserId, Event&gt; grouped = ...;
+grouped
+    .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
+    .count()
+    .suppress(Suppressed.untilWindowCloses(unbounded()))
+    .filter((windowedUserId, count) -&gt; count &lt; 3)
+    .toStream()
+    .foreach((windowedUserId, count) -&gt; sendAlert(windowedUserId.window(), windowedUserId.key(), count));
+</pre>
+			    </div></div>
+			    <p>The key parts of this program are:
+			    <dl>
+				    <dt><code>grace(ofMinutes(10))</code></dt>
+				    <dd>This allows us to bound the lateness of events the window will accept.
+				        For example, the 09:00 to 10:00 window will accept late-arriving records until 10:10, at which point, the window is <strong>closed</strong>.
+				    </dd>
+				    <dt><code>.suppress(Suppressed.untilWindowCloses(...))</code></dt>
+				    <dd>This configures the suppression operator to emit nothing for a window until it closes, and then emit the final result.
+				        For example, if user <code>U</code> gets 10 events between 09:00 and 10:10, the <code>filter</code> downstream of the suppression
+					will get no events for the windowed key <code>U@09:00-10:00</code> until 10:10, and then it will get exactly one with the value <code>10</code>.
+					This is the final result of the windowed count.
+				    </dd>
+				    <dt><code>unbounded()</code></dt>
+				    <dd>
+				      This configures the buffer used for storing events
+				      until their windows close.
+				      Production code is able to put a cap on the amount
+				      of memory to use for the buffer,
+				      but this simple example creates a buffer with no
+				      upper bound.
+				    </dd>
+			    </dl>
+			    </p>
+			    <p>
+			      One thing to note is that suppression is just like any other
+			      Kafka Streams operator, so you can build a topology with two
+			      branches emerging from the <code>count</code>,
+			      one suppressed, and one not, or even multiple differently
+			      configured suppressions.
+			      This allows you to apply suppressions where they are needed
+			      and otherwise rely on the default continuous update behavior.
+			    </p>
+			    <p>For more detailed information, see the JavaDoc on the <code>Suppressed</code> config object
+	                       and <a href="https://cwiki.apache.org/confluence/x/sQU0BQ" title="KIP-328">KIP-328</a>.
+			    </p>
+		    </div>
                 </div>
             </div>
             <div class="section" id="applying-processors-and-transformers-processor-api-integration">
@@ -3134,6 +3203,50 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, res
                 </div>
             </div>
         </div>
+	
+	<div class="section" id="controlling-emit-rate">
+            <span id="streams-developer-guide-dsl-suppression"></span><h2><a class="toc-backref" href="#id32">Controlling KTable emit rate</a><a class="headerlink" href="#controlling-emit-rate" title="Permalink to this headline"></a></h2>
+	    <p>A KTable is logically a continuously updated table.
+	       These updates make their way to downstream operators whenever new data is available, ensuring that the whole computation is as fresh as possible.
+	       Logically speaking, most programs describe a series of transformations, and the update rate is not a factor in the program behavior.
+	       In these cases, the rate of update is more of a performance concern.
+	       Operators are able to optimize both the network traffic (to the Kafka brokers) and the disk traffic (to the local state stores) by adjusting
+	       commit interval and batch size configurations.
+	    </p>
+	    <p>However, some applications need to take other actions, such as calling out to external systems, 
+	       and therefore need to exercise some control over the rate of invocations, for example of <code>KStream#foreach</code>.
+	    </p>
+	    <p>Rather than achieving this as a side-effect of the  <a class="reference internal" href="memory-mgmt.html#streams-developer-guide-memory-management-record-cache"><span class="std std-ref">KTable record cache</span></a>,
+	       you can directly impose a rate limit via the <code>KTable#suppress</code> operator.
+	    </p>
+	    <p>For example:
+	    </p>
+	    <div class="highlight-java"><div class="highlight">
+<pre>
+KGroupedTable&lt;String, String&gt; groupedTable = ...;
+groupedTable
+    .count()
+    .suppress(untilTimeLimit(ofMinutes(5), maxBytes(1_000_000L).emitEarlyWhenFull()))
+    .toStream()
+    .foreach((key, count) -&gt; updateCountsDatabase(key, count));
+</pre>
+	    </div></div>
+	    <p>This configuration ensures that <code>updateCountsDatabase</code> gets events for each <code>key</code> no more than once every 5 minutes.
+	       Note that the latest state for each key has to be buffered in memory for that 5-minute period.
+	       You have the option to control the maximum amount of memory to use for this buffer (in this case, 1MB).
+	       There is also an option to impose a limit in terms of number of records (or to leave both limits unspecified).
+	    </p>
+	    <p>Additionally, it is possible to choose what happens if the buffer fills up.
+	       This example takes a relaxed approach and just emits the oldest records before their 5-minute time limit to bring the buffer back down to size.
+	       Alternatively, you can choose to stop processing and shut the application down.
+	       This may seem extreme, but it gives you a guarantee that the 5-minute time limit will be absolutely enforced.
+	       After the application shuts down, you could allocate more memory for the buffer and resume processing.
+	       Emitting early is preferable for most applications.
+	    </p>
+	    <p>For more detailed information, see the JavaDoc on the <code>Suppressed</code> config object
+	       and <a href="https://cwiki.apache.org/confluence/x/sQU0BQ" title="KIP-328">KIP-328</a>.
+	    </p>
+	</div>
         <div class="section" id="writing-streams-back-to-kafka">
             <span id="streams-developer-guide-dsl-destinations"></span><h2><a class="toc-backref" href="#id25">Writing streams back to Kafka</a><a class="headerlink" href="#writing-streams-back-to-kafka" title="Permalink to this headline"></a></h2>
             <p>Any streams and tables may be (continuously) written back to a Kafka topic.  As we will describe in more detail below, the output data might be
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
index a85bbb8..5b0d8b5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -60,7 +60,7 @@ public class Sensors {
         sensor.add(
             new MetricName(
                 "record-lateness-avg",
-                "stream-processor-node-metrics",
+                "stream-task-metrics",
                 "The average observed lateness of records.",
                 tags),
             new Avg()
@@ -68,7 +68,7 @@ public class Sensors {
         sensor.add(
             new MetricName(
                 "record-lateness-max",
-                "stream-processor-node-metrics",
+                "stream-task-metrics",
                 "The max observed lateness of records.",
                 tags),
             new Max()
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 236cd8c..1e39bd3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -390,7 +390,7 @@ public class KStreamWindowAggregateTest {
 
         final MetricName latenessMaxMetric = new MetricName(
             "record-lateness-max",
-            "stream-processor-node-metrics",
+            "stream-task-metrics",
             "The max observed lateness of records.",
             mkMap(
                 mkEntry("client-id", "topology-test-driver-virtual-thread"),
@@ -401,7 +401,7 @@ public class KStreamWindowAggregateTest {
 
         final MetricName latenessAvgMetric = new MetricName(
             "record-lateness-avg",
-            "stream-processor-node-metrics",
+            "stream-task-metrics",
             "The average observed lateness of records.",
             mkMap(
                 mkEntry("client-id", "topology-test-driver-virtual-thread"),