You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ahmad Alkilani <ah...@dreamworks.com> on 2017/10/17 18:47:30 UTC

KTable Tombstone and expiry of records in Session Window

A few questions here
First environment:
Scala, Kafka 0.11.0.0

I have a KTable[Windowed[String], MyType]
This is then transformed to a stream via .toStream[String]((k, _) =>
k.key())

The KTable is a result of a reduce operation with a SessionWindow
InactvitiyGap defined as 10 minutes. Retention (until) defined as 11
minutes.
Timestamp extractor used is WallclockTimestampExtractor

With the understanding that "stream time" progresses loosely based on event
time which is defined as wall clock time (I understand the implications
that stream time progresses based on the min time of all partitions, so
even though it's defined as WallClockTime, it's still, to some degree
dependent on data coming in on all partitions to advance stream time)

   1. Given SessionTime can continue to expand the window that is
   considered part of the same session, i.e., it's based on data arriving for
   that key. What happens with retention time? I've seen online definitions
   that seem to define the expiry of records due to retention as as StreamTime
   - Retention time. Is this correct and does it always hold true even if the
   Session continues to expand due to recent activity for a key? The gist of
   the question here: Is retention time/expiry calculation impacted by or take
   into consideration session window expansions?
   2. In the scenario described above with the KTable.toStream I am getting
   Tombstone records; i.e., records with a Key and Null value. Are these to be
   expected? (My assumption is Yes). Are these a result of "expiry" based on
   retention period?
   3. Can I rely on these "Tombstone" records to indicate expiry from the
   session store?


This ultimately boils down to understanding Windows better but also towards
trying to establish a proxy for indicating when a window expires as Kafka
Streams doesn't seem to support this yet. With that said, any plans on
supporting an indicator that tells downstream nodes that a message in a
Window has expired, even if this is done in batch as it seems expiry is
actually on the rocks-db segment level assuming default state stores.

Thanks!

-- 
Ahmad Alkilani

Re: KTable Tombstone and expiry of records in Session Window

Posted by Damian Guy <da...@gmail.com>.
Hi Ahmad,


>    1. Given SessionTime can continue to expand the window that is
>    considered part of the same session, i.e., it's based on data arriving
> for
>    that key. What happens with retention time?


As the session expands the data for the session will continue to be
retained as it is still active. The session window uses the endTime of the
window to determine its retention policy.


> I've seen online definitions
>    that seem to define the expiry of records due to retention as as
> StreamTime
>    - Retention time. Is this correct and does it always hold true even if
> the
>    Session continues to expand due to recent activity for a key? The gist
> of
>    the question here: Is retention time/expiry calculation impacted by or
> take
>    into consideration session window expansions?
>

Yes


>    2. In the scenario described above with the KTable.toStream I am getting
>    Tombstone records; i.e., records with a Key and Null value. Are these
> to be
>    expected? (My assumption is Yes). Are these a result of "expiry" based
> on
>    retention period?
>

The tombstones are expected, but are not because the session has expired.
The tombstones are sent went sessions merge to form a larger session. So it
indicates that previous session is no longer valid. For example if you have
2 sessions and you have an inactivity gap of 5:

key=1 start=0 end=0
key=1 start=6 end=6

and then you get another record for the same key at time 3 then the 2
sessions above would be merged into:

key=1 start=0 end=6

and we'd send tombstones
key=1 start=0 end=0
key=1 start=6 end=6


>    3. Can I rely on these "Tombstone" records to indicate expiry from the
>    session store?
>
>
We don't send tombstones when the sessions have expired. It is the same as
for any other types of windows. We just send the updates to the windows as
they arrive.


>
> This ultimately boils down to understanding Windows better but also towards
> trying to establish a proxy for indicating when a window expires as Kafka
> Streams doesn't seem to support this yet. With that said, any plans on
> supporting an indicator that tells downstream nodes that a message in a
> Window has expired, even if this is done in batch as it seems expiry is
> actually on the rocks-db segment level assuming default state stores.
>
>
A window doesn't really ever expire, it is just retained for a period of
time. This is to allow for late arriving data. When the rocks-db segment is
dropped that means that retention time has passed.


> Thanks!
>
> --
> Ahmad Alkilani
>