You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/09/11 04:55:58 UTC

[kafka] branch trunk updated: KAFKA-5636: Add Sliding Windows documentation (#9264)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8260d7c  KAFKA-5636: Add Sliding Windows documentation (#9264)
8260d7c is described below

commit 8260d7cdfbe30250e8bf4079c8f0734e1b5a203b
Author: leah <lt...@confluent.io>
AuthorDate: Thu Sep 10 23:55:11 2020 -0500

    KAFKA-5636: Add Sliding Windows documentation (#9264)
    
    Add necessary documentation for KIP-450, adding sliding window aggregations to KStreams
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 docs/images/streams-sliding-windows.png   | Bin 0 -> 38991 bytes
 docs/streams/developer-guide/dsl-api.html |  52 ++++++++++++++++++++++++++----
 docs/streams/upgrade-guide.html           |   5 +++
 3 files changed, 51 insertions(+), 6 deletions(-)

diff --git a/docs/images/streams-sliding-windows.png b/docs/images/streams-sliding-windows.png
new file mode 100644
index 0000000..fa6d5c3
Binary files /dev/null and b/docs/images/streams-sliding-windows.png differ
diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index 9e26e2c..2eac368 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -1103,6 +1103,14 @@
         <span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">WindowStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;time-windowed-aggregated-stream-store&quot;</span><span class="o">)</ [...]
         <span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span> <span class="cm">/* serde for aggregate value */</span>
 
+<span class="c1">// Aggregating with time-based windowing (here: with 5-minute sliding windows and 30-minute grace period)</span>
+<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">SlidingWindows</span><span class="o">.</span><span cla [...]
+    <span class="o">.</span><span class="na">aggregate</span><span class="o">(</span>
+        <span class="o">()</span> <span class="o">-&gt;</span> <span class="mi">0</span><span class="n">L</span><span class="o">,</span> <span class="cm">/* initializer */</span>
+        <span class="o">(</span><span class="n">aggKey</span><span class="o">,</span> <span class="n">newValue</span><span class="o">,</span> <span class="n">aggValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span><span class="o">,</span> <span class="cm">/* adder */</span>
+        <span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">WindowStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;time-windowed-aggregated-stream-store&quot;</span><span class="o">)</ [...]
+        <span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span> <span class="cm">/* serde for aggregate value */</span>
+
 <span class="c1">// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)</span>
 <span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">sessionizedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">SessionWindows</span><span class="o">.</span><span clas [...]
     <span class="n">aggregate</span><span class="o">(</span>
@@ -1221,6 +1229,11 @@
     <span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)))</span> <span class="cm">/* time-based window */</span>
     <span class="o">.</span><span class="na">count</span><span class="o">();</span>
 
+<span class="c1">// Counting a KGroupedStream with time-based windowing (here: with 5-minute sliding windows and 30-minute grace period)</span>
+<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">aggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span>
+    <span class="n">SlidingWindows</span><span class="o">.</span><span class="na">withTimeDifferenceAndGrace</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">),</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">30</span><span class="o">)))</span> <span class="cm">/* time-b [...]
+    <span class="o">.</span><span class="na">count</span><span class="o">();</span>
+
 <span class="c1">// Counting a KGroupedStream with session-based windowing (here: with 5-minute inactivity gaps)</span>
 <span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">aggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span>
     <span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)))</span> <span class="cm">/* session window */</span>
@@ -1343,6 +1356,13 @@
     <span class="o">(</span><span class="n">aggValue</span><span class="o">,</span> <span class="n">newValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span> <span class="cm">/* adder */</span>
   <span class="o">);</span>
 
+<span class="c1">// Aggregating with time-based windowing (here: with 5-minute sliding windows and 30-minute grace)</span>
+<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span>
+  <span class="n">SlidingWindows</span><span class="o">.</span><span class="na">withTimeDifferenceAndGrace</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">),</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">30</span><span class="o">)))</span> <span class="cm">/* time-bas [...]
+  <span class="o">.</span><span class="na">reduce</span><span class="o">(</span>
+    <span class="o">(</span><span class="n">aggValue</span><span class="o">,</span> <span class="n">newValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span> <span class="cm">/* adder */</span>
+  <span class="o">);</span>
+
 <span class="c1">// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)</span>
 <span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">sessionzedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span>
   <span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)))</span> <span class="cm">/* session window */</span>
@@ -3286,13 +3306,33 @@ become t=300,000).</span></p>
                     <div class="section" id="sliding-time-windows">
                         <span id="windowing-sliding"></span><h5><a class="toc-backref" href="#id22">Sliding time windows</a><a class="headerlink" href="#sliding-time-windows" title="Permalink to this headline"></a></h5>
                         <p>Sliding windows are actually quite different from hopping and tumbling windows.  In Kafka Streams, sliding windows
-                            are used only for <a class="reference internal" href="#streams-developer-guide-dsl-joins"><span class="std std-ref">join operations</span></a>, and can be specified through the
-                            <code class="docutils literal"><span class="pre">JoinWindows</span></code> class.</p>
-                        <p>A sliding window models a fixed-size window that slides continuously over the time axis; here, two data records are
+                            are used for <a class="reference internal" href="#streams-developer-guide-dsl-joins"><span class="std std-ref">join operations</span></a>, specified by using the
+                            <code class="docutils literal"><span class="pre">JoinWindows</span></code> class, and windowed aggregations, specified by using the <code class="docutils literal"><span class="pre">SlidingWindows</span></code> class.</p>
+                        <p>A sliding window models a fixed-size window that slides continuously over the time axis. In this model, two data records are
                             said to be included in the same window if (in the case of symmetric windows) the difference of their timestamps is
-                            within the window size.  Thus, sliding windows are not aligned to the epoch, but to the data record timestamps.  In
-                            contrast to hopping and tumbling windows, the lower and upper window time interval bounds of sliding windows are
-                            <em>both inclusive</em>.</p>
+                            within the window size. As a sliding window moves along the time axis, records may fall into multiple snapshots of
+                            the sliding window, but each unique combination of records appears only in one sliding window snapshot.</p>
+                        <p>The following code defines a sliding window with a time difference of 10 minutes and a grace period of 30 minutes:</p>
+                        <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.streams.kstream.SlidingWindows</span><span class="o">;</span>
+
+<span class="c1">// A sliding time window with a time difference of 10 minutes and grace period of 30 minutes</span>
+<span class="kt">Duration</span> <span class="n">timeDifferenceMs</span> <span class="o">=</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">10</span><span class="o">);</span>
+<span class="kt">Duration</span> <span class="n">gracePeriodMs</span> <span class="o">=</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">30</span><span class="o">);</span>
+<span class="n">SlidingWindows</span><span class="o">.</span><span class="na">withTimeDifferenceAndGrace</span><span class="o">(</span><span class="n">timeDifferenceMs</span><span class="o">,</span><span class="n">gracePeriodMs</span><span class="o">);</span>
+</pre></div>
+                         <div class="admonition note">
+                             <p><b>Note</b></p>
+                             <p>Sliding windows <em>require</em> that you set a grace period, as shown above. For time windows and session windows,
+                                 setting the grace period is optional and defaults to 24 hours.</p>
+                         </div>
+                         <div class="figure align-center" id="id35">
+                             <img class="centered" src="/{{version}}/images/streams-sliding-windows.png">
+                             <p class="caption"><span class="caption-text">This diagram shows windowing a stream of data records with sliding windows. The overlap of
+                                 the sliding window snapshots varies depending on the record times. In this diagram, the time numbers represent miliseconds. For example,
+                                 t=5 means &#8220;at the five milisecond mark&#8220;.</span></p>
+                         </div>
+                         <p>Sliding windows are aligned to the data record timestamps, not to the epoch. In contrast to hopping and tumbling windows,
+                             the lower and upper window time interval bounds of sliding windows are both inclusive.</p>
                     </div>
                     <div class="section" id="session-windows">
                         <span id="windowing-session"></span><h5><a class="toc-backref" href="#id23">Session Windows</a><a class="headerlink" href="#session-windows" title="Permalink to this headline"></a></h5>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index bac7320..05218ef 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -110,6 +110,11 @@
         <code>delivery.timeout.ms</code> and <code>max.block.ms</code> for the producer and
         <code>default.api.timeout.ms</code> for the admin client.
     </p>
+    <p>
+        We added <code>SlidingWindows</code> as an option for <code>windowedBy()</code> windowed aggregations as described in
+        <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL">KIP-450</a>.
+        Sliding windows are fixed-time and data-aligned windows that allow for flexible and efficient windowed aggregations.
+    </p>
 
     <h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API changes in 2.6.0</a></h3>
     <p>