You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Michal Borowiecki <mi...@openbet.com> on 2017/06/16 09:38:40 UTC

Re: Kafka Streams vs Spark Streaming : reduce by window

I wonder if it's a frequent enough use case that Kafka Streams should 
consider providing this out of the box - this was asked for multiple 
times, right?

Personally, I agree totally with the philosophy of "no final 
aggregation", as expressed by Eno's post, but IMO that is predicated 
totally on event-time semantics.

If users want processing-time semantics then, as the docs already point 
out, there is no such thing as a late-arriving record - every record 
just falls in the currently open window(s), hence the notion of final 
aggregation makes perfect sense, from the usability point of view.

The single abstraction of "stream time" proves leaky in some cases (e.g. 
for punctuate method - being addressed in KIP-138). Perhaps this is 
another case where processing-time semantics warrant explicit handling 
in the api - but of course, only if there's sufficient user demand for this.

What I could imagine is a new type of time window 
(ProcessingTimeWindow?), that if used in an aggregation, the underlying 
processor would force the WallclockTimestampExtractor (KAFKA-4144 
enables that) and would use the system-time punctuation (KIP-138) to 
send the final aggregation value once the window has expired and could 
be configured to not send intermediate updates while the window was open.

Of course this is just a helper for the users, since they can implement 
it all themselves using the low-level API, as Matthias pointed out 
already. Just seems there's recurring interest in this.

Again, this only makes sense for processing time semantics. For 
event-time semantics I find the arguments for "no final aggregation" 
totally convincing.


Cheers,

Michał


On 16/06/17 00:08, Matthias J. Sax wrote:
> Hi Paolo,
>
> This SO question might help, too:
> https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
>
> For Streams, the basic model is based on "change" and we report updates
> to the "current" result immediately reducing latency to a minimum.
>
> Last, if you say it's going to fall into the next window, you won't get
> event time semantics but you fall back processing time semantics, that
> cannot provide exact results....
>
> If you really want to trade-off correctness version getting (late)
> updates and want to use processing time semantics, you should configure
> WallclockTimestampExtractor and implement a "update deduplication"
> operator using table.toStream().transform(). You can attached a state to
> your transformer and store all update there (ie, newer update overwrite
> older updates). Punctuations allow you to emit "final" results for
> windows for which "window end time" passed.
>
>
> -Matthias
>
> On 6/15/17 9:21 AM, Paolo Patierno wrote:
>> Hi Eno,
>>
>>
>> regarding closing window I think that it's up to the streaming application. I mean ...
>>
>> If I want something like I described, I know that a value outside my 5 seconds window will be taken into account for the next processing (in the next 5 seconds). I don't think I'm losing a record, I am ware that this record will fall in the next "processing" window. Btw I'll take a look at your article ! Thanks !
>>
>>
>> Paolo
>>
>>
>> Paolo Patierno
>> Senior Software Engineer (IoT) @ Red Hat
>> Microsoft MVP on Windows Embedded & IoT
>> Microsoft Azure Advisor
>>
>> Twitter : @ppatierno<http://twitter.com/ppatierno>
>> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>
>>
>> ________________________________
>> From: Eno Thereska <en...@gmail.com>
>> Sent: Thursday, June 15, 2017 3:57 PM
>> To: users@kafka.apache.org
>> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>>
>> Hi Paolo,
>>
>> Yeah, so if you want fewer records, you should actually "not" disable cache. If you disable cache you'll get all the records as you described.
>>
>> About closing windows: if you close a window and a late record arrives that should have been in that window, you basically lose the ability to process that record. In Kafka Streams we are robust to that, in that we handle late arriving records. There is a comparison here for example when we compare it to other methods that depend on watermarks or triggers: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
>>
>> Eno
>>
>>
>>> On 15 Jun 2017, at 14:57, Paolo Patierno <pp...@live.com> wrote:
>>>
>>> Hi Emo,
>>>
>>>
>>> thanks for the reply !
>>>
>>> Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 (so disabling cache).
>>>
>>> Regarding the interactive query API (I'll take a look) it means that it's up to the application doing something like we have oob with Spark.
>>>
>>> May I ask what do you mean with "We don’t believe in closing windows" ? Isn't it much more code that user has to write for having the same result ?
>>>
>>> I'm exploring Kafka Streams and it's very powerful imho even because the usage is pretty simple but this scenario could have a lack against Spark.
>>>
>>>
>>> Thanks,
>>>
>>> Paolo.
>>>
>>>
>>> Paolo Patierno
>>> Senior Software Engineer (IoT) @ Red Hat
>>> Microsoft MVP on Windows Embedded & IoT
>>> Microsoft Azure Advisor
>>>
>>> Twitter : @ppatierno<http://twitter.com/ppatierno>
>>> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>>> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>>
>>>
>>> ________________________________
>>> From: Eno Thereska <en...@gmail.com>
>>> Sent: Thursday, June 15, 2017 1:45 PM
>>> To: users@kafka.apache.org
>>> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>>>
>>> Hi Paolo,
>>>
>>> That is indeed correct. We don’t believe in closing windows in Kafka Streams.
>>> You could reduce the number of downstream records by using record caches: http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
>>>
>>> Alternatively you can just query the KTable whenever you want using the Interactive Query APIs (so when you query dictates what  data you receive), see this https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>>>
>>> Thanks
>>> Eno
>>>> On Jun 15, 2017, at 2:38 PM, Paolo Patierno <pp...@live.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>>
>>>> using the streams library I noticed a difference (or there is a lack of knowledge on my side)with Apache Spark.
>>>>
>>>> Imagine following scenario ...
>>>>
>>>>
>>>> I have a source topic where numeric values come in and I want to check the maximum value in the latest 5 seconds but ... putting the max value into a destination topic every 5 seconds.
>>>>
>>>> This is what happens with reduceByWindow method in Spark.
>>>>
>>>> I'm using reduce on a KStream here that process the max value taking into account previous values in the latest 5 seconds but the final value is put into the destination topic for each incoming value.
>>>>
>>>>
>>>> For example ...
>>>>
>>>>
>>>> An application sends numeric values every 1 second.
>>>>
>>>> With Spark ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 5 seconds (so when the window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 26.
>>>>
>>>> With Kafka Streams ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 1 seconds (so every time an incoming value arrives). Of course, if for example the sequence is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.
>>>>
>>>>
>>>> Is it possible with Kafka Streams ? Or it's something to do at application level ?
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Paolo
>>>>
>>>>
>>>> Paolo Patierno
>>>> Senior Software Engineer (IoT) @ Red Hat
>>>> Microsoft MVP on Windows Embedded & IoT
>>>> Microsoft Azure Advisor
>>>>
>>>> Twitter : @ppatierno<http://twitter.com/ppatierno>
>>>> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>>>> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>

-- 
Signature
<http://www.openbet.com/> 	Michal Borowiecki
Senior Software Engineer L4
	T: 	+44 208 742 1600

	
	+44 203 249 8448

	
	
	E: 	michal.borowiecki@openbet.com
	W: 	www.openbet.com <http://www.openbet.com/>

	
	OpenBet Ltd

	Chiswick Park Building 9

	566 Chiswick High Rd

	London

	W4 5XT

	UK

	
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmaster@openbet.com <ma...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612


Re: Kafka Streams vs Spark Streaming : reduce by window

Posted by Jay Kreps <ja...@confluent.io>.
I think the question is when do you actually *want* processing time
semantics? There are definitely times when its safe to assume the two are
close enough that a little lossiness doesn't matter much but it is pretty
hard to make assumptions about when the processing time is and has been
hard for us to think of a use case where its actually desirable.

I think mostly what we've seen is confusion about the core concepts:

   - stream -- immutable events that occur
   - tables (including windows) -- current state of the world

If the root problem is confusion adding knobs never makes it better. If the
root problem is we're missing important use cases that justify the
additional knobs then i think it's good to try to really understand them. I
think there could be use cases around systems that don't take updates,
example would be email, twitter, and some metrics stores.

One solution that would be less complexity inducing than allowing new
semantics, but might help with the use cases we need to collect, would be
to add a new operator in the DSL. Something like .freezeAfter(30,
TimeUnit.SECONDS) that collects all updates for a given window and both
emits and enforces a single output after 30 seconds after the advancement
of stream time and remembers that it is omitted, suppressing all further
output (so the output is actually a KStream). This might or might not
depend on wall clock time. Perhaps this is in fact what you are proposing?

-Jay



On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki <
michal.borowiecki@openbet.com> wrote:

> I wonder if it's a frequent enough use case that Kafka Streams should
> consider providing this out of the box - this was asked for multiple times,
> right?
>
> Personally, I agree totally with the philosophy of "no final aggregation",
> as expressed by Eno's post, but IMO that is predicated totally on
> event-time semantics.
>
> If users want processing-time semantics then, as the docs already point
> out, there is no such thing as a late-arriving record - every record just
> falls in the currently open window(s), hence the notion of final
> aggregation makes perfect sense, from the usability point of view.
>
> The single abstraction of "stream time" proves leaky in some cases (e.g.
> for punctuate method - being addressed in KIP-138). Perhaps this is another
> case where processing-time semantics warrant explicit handling in the api -
> but of course, only if there's sufficient user demand for this.
>
> What I could imagine is a new type of time window (ProcessingTimeWindow?),
> that if used in an aggregation, the underlying processor would force the
> WallclockTimestampExtractor (KAFKA-4144 enables that) and would use the
> system-time punctuation (KIP-138) to send the final aggregation value once
> the window has expired and could be configured to not send intermediate
> updates while the window was open.
>
> Of course this is just a helper for the users, since they can implement it
> all themselves using the low-level API, as Matthias pointed out already.
> Just seems there's recurring interest in this.
>
> Again, this only makes sense for processing time semantics. For event-time
> semantics I find the arguments for "no final aggregation" totally
> convincing.
>
>
> Cheers,
>
> Michał
>
> On 16/06/17 00:08, Matthias J. Sax wrote:
>
> Hi Paolo,
>
> This SO question might help, too:https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
>
> For Streams, the basic model is based on "change" and we report updates
> to the "current" result immediately reducing latency to a minimum.
>
> Last, if you say it's going to fall into the next window, you won't get
> event time semantics but you fall back processing time semantics, that
> cannot provide exact results....
>
> If you really want to trade-off correctness version getting (late)
> updates and want to use processing time semantics, you should configure
> WallclockTimestampExtractor and implement a "update deduplication"
> operator using table.toStream().transform(). You can attached a state to
> your transformer and store all update there (ie, newer update overwrite
> older updates). Punctuations allow you to emit "final" results for
> windows for which "window end time" passed.
>
>
> -Matthias
>
> On 6/15/17 9:21 AM, Paolo Patierno wrote:
>
> Hi Eno,
>
>
> regarding closing window I think that it's up to the streaming application. I mean ...
>
> If I want something like I described, I know that a value outside my 5 seconds window will be taken into account for the next processing (in the next 5 seconds). I don't think I'm losing a record, I am ware that this record will fall in the next "processing" window. Btw I'll take a look at your article ! Thanks !
>
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> <http://paolopatierno.wordpress.com/>
>
>
> ________________________________
> From: Eno Thereska <en...@gmail.com> <en...@gmail.com>
> Sent: Thursday, June 15, 2017 3:57 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> Yeah, so if you want fewer records, you should actually "not" disable cache. If you disable cache you'll get all the records as you described.
>
> About closing windows: if you close a window and a late record arrives that should have been in that window, you basically lose the ability to process that record. In Kafka Streams we are robust to that, in that we handle late arriving records. There is a comparison here for example when we compare it to other methods that depend on watermarks or triggers: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/> <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
>
> Eno
>
>
>
> On 15 Jun 2017, at 14:57, Paolo Patierno <pp...@live.com> <pp...@live.com> wrote:
>
> Hi Emo,
>
>
> thanks for the reply !
>
> Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 (so disabling cache).
>
> Regarding the interactive query API (I'll take a look) it means that it's up to the application doing something like we have oob with Spark.
>
> May I ask what do you mean with "We don’t believe in closing windows" ? Isn't it much more code that user has to write for having the same result ?
>
> I'm exploring Kafka Streams and it's very powerful imho even because the usage is pretty simple but this scenario could have a lack against Spark.
>
>
> Thanks,
>
> Paolo.
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> <http://paolopatierno.wordpress.com/>
>
>
> ________________________________
> From: Eno Thereska <en...@gmail.com> <en...@gmail.com>
> Sent: Thursday, June 15, 2017 1:45 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> That is indeed correct. We don’t believe in closing windows in Kafka Streams.
> You could reduce the number of downstream records by using record caches: http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl> <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
>
> Alternatively you can just query the KTable whenever you want using the Interactive Query APIs (so when you query dictates what  data you receive), see this https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>
> Thanks
> Eno
>
> On Jun 15, 2017, at 2:38 PM, Paolo Patierno <pp...@live.com> <pp...@live.com> wrote:
>
> Hi,
>
>
> using the streams library I noticed a difference (or there is a lack of knowledge on my side)with Apache Spark.
>
> Imagine following scenario ...
>
>
> I have a source topic where numeric values come in and I want to check the maximum value in the latest 5 seconds but ... putting the max value into a destination topic every 5 seconds.
>
> This is what happens with reduceByWindow method in Spark.
>
> I'm using reduce on a KStream here that process the max value taking into account previous values in the latest 5 seconds but the final value is put into the destination topic for each incoming value.
>
>
> For example ...
>
>
> An application sends numeric values every 1 second.
>
> With Spark ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 5 seconds (so when the window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 26.
>
> With Kafka Streams ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 1 seconds (so every time an incoming value arrives). Of course, if for example the sequence is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.
>
>
> Is it possible with Kafka Streams ? Or it's something to do at application level ?
>
>
> Thanks,
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> <http://paolopatierno.wordpress.com/>
>
>
> --
> <http://www.openbet.com/> Michal Borowiecki
> Senior Software Engineer L4
> T: +44 208 742 1600 <+44%2020%208742%201600>
>
>
> +44 203 249 8448 <+44%2020%203249%208448>
>
>
>
> E: michal.borowiecki@openbet.com
> W: www.openbet.com
> OpenBet Ltd
>
> Chiswick Park Building 9
>
> 566 Chiswick High Rd
>
> London
>
> W4 5XT
>
> UK
> <https://www.openbet.com/email_promo>
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmaster@openbet.com and delete it from your system as well as any
> copies. The content of e-mails as well as traffic data may be monitored by
> OpenBet for employment and security purposes. To protect the environment
> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
> United Kingdom. A company registered in England and Wales. Registered no.
> 3134634. VAT no. GB927523612
>

Re: Kafka Streams vs Spark Streaming : reduce by window

Posted by Jay Kreps <ja...@confluent.io>.
I think the question is when do you actually *want* processing time
semantics? There are definitely times when its safe to assume the two are
close enough that a little lossiness doesn't matter much but it is pretty
hard to make assumptions about when the processing time is and has been
hard for us to think of a use case where its actually desirable.

I think mostly what we've seen is confusion about the core concepts:

   - stream -- immutable events that occur
   - tables (including windows) -- current state of the world

If the root problem is confusion adding knobs never makes it better. If the
root problem is we're missing important use cases that justify the
additional knobs then i think it's good to try to really understand them. I
think there could be use cases around systems that don't take updates,
example would be email, twitter, and some metrics stores.

One solution that would be less complexity inducing than allowing new
semantics, but might help with the use cases we need to collect, would be
to add a new operator in the DSL. Something like .freezeAfter(30,
TimeUnit.SECONDS) that collects all updates for a given window and both
emits and enforces a single output after 30 seconds after the advancement
of stream time and remembers that it is omitted, suppressing all further
output (so the output is actually a KStream). This might or might not
depend on wall clock time. Perhaps this is in fact what you are proposing?

-Jay



On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki <
michal.borowiecki@openbet.com> wrote:

> I wonder if it's a frequent enough use case that Kafka Streams should
> consider providing this out of the box - this was asked for multiple times,
> right?
>
> Personally, I agree totally with the philosophy of "no final aggregation",
> as expressed by Eno's post, but IMO that is predicated totally on
> event-time semantics.
>
> If users want processing-time semantics then, as the docs already point
> out, there is no such thing as a late-arriving record - every record just
> falls in the currently open window(s), hence the notion of final
> aggregation makes perfect sense, from the usability point of view.
>
> The single abstraction of "stream time" proves leaky in some cases (e.g.
> for punctuate method - being addressed in KIP-138). Perhaps this is another
> case where processing-time semantics warrant explicit handling in the api -
> but of course, only if there's sufficient user demand for this.
>
> What I could imagine is a new type of time window (ProcessingTimeWindow?),
> that if used in an aggregation, the underlying processor would force the
> WallclockTimestampExtractor (KAFKA-4144 enables that) and would use the
> system-time punctuation (KIP-138) to send the final aggregation value once
> the window has expired and could be configured to not send intermediate
> updates while the window was open.
>
> Of course this is just a helper for the users, since they can implement it
> all themselves using the low-level API, as Matthias pointed out already.
> Just seems there's recurring interest in this.
>
> Again, this only makes sense for processing time semantics. For event-time
> semantics I find the arguments for "no final aggregation" totally
> convincing.
>
>
> Cheers,
>
> Michał
>
> On 16/06/17 00:08, Matthias J. Sax wrote:
>
> Hi Paolo,
>
> This SO question might help, too:https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
>
> For Streams, the basic model is based on "change" and we report updates
> to the "current" result immediately reducing latency to a minimum.
>
> Last, if you say it's going to fall into the next window, you won't get
> event time semantics but you fall back processing time semantics, that
> cannot provide exact results....
>
> If you really want to trade-off correctness version getting (late)
> updates and want to use processing time semantics, you should configure
> WallclockTimestampExtractor and implement a "update deduplication"
> operator using table.toStream().transform(). You can attached a state to
> your transformer and store all update there (ie, newer update overwrite
> older updates). Punctuations allow you to emit "final" results for
> windows for which "window end time" passed.
>
>
> -Matthias
>
> On 6/15/17 9:21 AM, Paolo Patierno wrote:
>
> Hi Eno,
>
>
> regarding closing window I think that it's up to the streaming application. I mean ...
>
> If I want something like I described, I know that a value outside my 5 seconds window will be taken into account for the next processing (in the next 5 seconds). I don't think I'm losing a record, I am ware that this record will fall in the next "processing" window. Btw I'll take a look at your article ! Thanks !
>
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> <http://paolopatierno.wordpress.com/>
>
>
> ________________________________
> From: Eno Thereska <en...@gmail.com> <en...@gmail.com>
> Sent: Thursday, June 15, 2017 3:57 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> Yeah, so if you want fewer records, you should actually "not" disable cache. If you disable cache you'll get all the records as you described.
>
> About closing windows: if you close a window and a late record arrives that should have been in that window, you basically lose the ability to process that record. In Kafka Streams we are robust to that, in that we handle late arriving records. There is a comparison here for example when we compare it to other methods that depend on watermarks or triggers: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/> <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
>
> Eno
>
>
>
> On 15 Jun 2017, at 14:57, Paolo Patierno <pp...@live.com> <pp...@live.com> wrote:
>
> Hi Emo,
>
>
> thanks for the reply !
>
> Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 (so disabling cache).
>
> Regarding the interactive query API (I'll take a look) it means that it's up to the application doing something like we have oob with Spark.
>
> May I ask what do you mean with "We don’t believe in closing windows" ? Isn't it much more code that user has to write for having the same result ?
>
> I'm exploring Kafka Streams and it's very powerful imho even because the usage is pretty simple but this scenario could have a lack against Spark.
>
>
> Thanks,
>
> Paolo.
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> <http://paolopatierno.wordpress.com/>
>
>
> ________________________________
> From: Eno Thereska <en...@gmail.com> <en...@gmail.com>
> Sent: Thursday, June 15, 2017 1:45 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> That is indeed correct. We don’t believe in closing windows in Kafka Streams.
> You could reduce the number of downstream records by using record caches: http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl> <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
>
> Alternatively you can just query the KTable whenever you want using the Interactive Query APIs (so when you query dictates what  data you receive), see this https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>
> Thanks
> Eno
>
> On Jun 15, 2017, at 2:38 PM, Paolo Patierno <pp...@live.com> <pp...@live.com> wrote:
>
> Hi,
>
>
> using the streams library I noticed a difference (or there is a lack of knowledge on my side)with Apache Spark.
>
> Imagine following scenario ...
>
>
> I have a source topic where numeric values come in and I want to check the maximum value in the latest 5 seconds but ... putting the max value into a destination topic every 5 seconds.
>
> This is what happens with reduceByWindow method in Spark.
>
> I'm using reduce on a KStream here that process the max value taking into account previous values in the latest 5 seconds but the final value is put into the destination topic for each incoming value.
>
>
> For example ...
>
>
> An application sends numeric values every 1 second.
>
> With Spark ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 5 seconds (so when the window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 26.
>
> With Kafka Streams ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 1 seconds (so every time an incoming value arrives). Of course, if for example the sequence is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.
>
>
> Is it possible with Kafka Streams ? Or it's something to do at application level ?
>
>
> Thanks,
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> <http://paolopatierno.wordpress.com/>
>
>
> --
> <http://www.openbet.com/> Michal Borowiecki
> Senior Software Engineer L4
> T: +44 208 742 1600 <+44%2020%208742%201600>
>
>
> +44 203 249 8448 <+44%2020%203249%208448>
>
>
>
> E: michal.borowiecki@openbet.com
> W: www.openbet.com
> OpenBet Ltd
>
> Chiswick Park Building 9
>
> 566 Chiswick High Rd
>
> London
>
> W4 5XT
>
> UK
> <https://www.openbet.com/email_promo>
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmaster@openbet.com and delete it from your system as well as any
> copies. The content of e-mails as well as traffic data may be monitored by
> OpenBet for employment and security purposes. To protect the environment
> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
> United Kingdom. A company registered in England and Wales. Registered no.
> 3134634. VAT no. GB927523612
>

Re: Kafka Streams vs Spark Streaming : reduce by window

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks Michał!

That is very good feedback.


-Matthias

On 6/16/17 2:38 AM, Michal Borowiecki wrote:
> I wonder if it's a frequent enough use case that Kafka Streams should
> consider providing this out of the box - this was asked for multiple
> times, right?
> 
> Personally, I agree totally with the philosophy of "no final
> aggregation", as expressed by Eno's post, but IMO that is predicated
> totally on event-time semantics.
> 
> If users want processing-time semantics then, as the docs already point
> out, there is no such thing as a late-arriving record - every record
> just falls in the currently open window(s), hence the notion of final
> aggregation makes perfect sense, from the usability point of view.
> 
> The single abstraction of "stream time" proves leaky in some cases (e.g.
> for punctuate method - being addressed in KIP-138). Perhaps this is
> another case where processing-time semantics warrant explicit handling
> in the api - but of course, only if there's sufficient user demand for this.
> 
> What I could imagine is a new type of time window
> (ProcessingTimeWindow?), that if used in an aggregation, the underlying
> processor would force the WallclockTimestampExtractor (KAFKA-4144
> enables that) and would use the system-time punctuation (KIP-138) to
> send the final aggregation value once the window has expired and could
> be configured to not send intermediate updates while the window was open.
> 
> Of course this is just a helper for the users, since they can implement
> it all themselves using the low-level API, as Matthias pointed out
> already. Just seems there's recurring interest in this.
> 
> Again, this only makes sense for processing time semantics. For
> event-time semantics I find the arguments for "no final aggregation"
> totally convincing.
> 
> 
> Cheers,
> 
> Michał
> 
> 
> On 16/06/17 00:08, Matthias J. Sax wrote:
>> Hi Paolo,
>>
>> This SO question might help, too:
>> https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
>>
>> For Streams, the basic model is based on "change" and we report updates
>> to the "current" result immediately reducing latency to a minimum.
>>
>> Last, if you say it's going to fall into the next window, you won't get
>> event time semantics but you fall back processing time semantics, that
>> cannot provide exact results....
>>
>> If you really want to trade-off correctness version getting (late)
>> updates and want to use processing time semantics, you should configure
>> WallclockTimestampExtractor and implement a "update deduplication"
>> operator using table.toStream().transform(). You can attached a state to
>> your transformer and store all update there (ie, newer update overwrite
>> older updates). Punctuations allow you to emit "final" results for
>> windows for which "window end time" passed.
>>
>>
>> -Matthias
>>
>> On 6/15/17 9:21 AM, Paolo Patierno wrote:
>>> Hi Eno,
>>>
>>>
>>> regarding closing window I think that it's up to the streaming application. I mean ...
>>>
>>> If I want something like I described, I know that a value outside my 5 seconds window will be taken into account for the next processing (in the next 5 seconds). I don't think I'm losing a record, I am ware that this record will fall in the next "processing" window. Btw I'll take a look at your article ! Thanks !
>>>
>>>
>>> Paolo
>>>
>>>
>>> Paolo Patierno
>>> Senior Software Engineer (IoT) @ Red Hat
>>> Microsoft MVP on Windows Embedded & IoT
>>> Microsoft Azure Advisor
>>>
>>> Twitter : @ppatierno<http://twitter.com/ppatierno>
>>> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>>> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>>
>>>
>>> ________________________________
>>> From: Eno Thereska <en...@gmail.com>
>>> Sent: Thursday, June 15, 2017 3:57 PM
>>> To: users@kafka.apache.org
>>> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>>>
>>> Hi Paolo,
>>>
>>> Yeah, so if you want fewer records, you should actually "not" disable cache. If you disable cache you'll get all the records as you described.
>>>
>>> About closing windows: if you close a window and a late record arrives that should have been in that window, you basically lose the ability to process that record. In Kafka Streams we are robust to that, in that we handle late arriving records. There is a comparison here for example when we compare it to other methods that depend on watermarks or triggers: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
>>>
>>> Eno
>>>
>>>
>>>> On 15 Jun 2017, at 14:57, Paolo Patierno <pp...@live.com> wrote:
>>>>
>>>> Hi Emo,
>>>>
>>>>
>>>> thanks for the reply !
>>>>
>>>> Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 (so disabling cache).
>>>>
>>>> Regarding the interactive query API (I'll take a look) it means that it's up to the application doing something like we have oob with Spark.
>>>>
>>>> May I ask what do you mean with "We don’t believe in closing windows" ? Isn't it much more code that user has to write for having the same result ?
>>>>
>>>> I'm exploring Kafka Streams and it's very powerful imho even because the usage is pretty simple but this scenario could have a lack against Spark.
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Paolo.
>>>>
>>>>
>>>> Paolo Patierno
>>>> Senior Software Engineer (IoT) @ Red Hat
>>>> Microsoft MVP on Windows Embedded & IoT
>>>> Microsoft Azure Advisor
>>>>
>>>> Twitter : @ppatierno<http://twitter.com/ppatierno>
>>>> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>>>> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>>>
>>>>
>>>> ________________________________
>>>> From: Eno Thereska <en...@gmail.com>
>>>> Sent: Thursday, June 15, 2017 1:45 PM
>>>> To: users@kafka.apache.org
>>>> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>>>>
>>>> Hi Paolo,
>>>>
>>>> That is indeed correct. We don’t believe in closing windows in Kafka Streams.
>>>> You could reduce the number of downstream records by using record caches: http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
>>>>
>>>> Alternatively you can just query the KTable whenever you want using the Interactive Query APIs (so when you query dictates what  data you receive), see this https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>>>>
>>>> Thanks
>>>> Eno
>>>>> On Jun 15, 2017, at 2:38 PM, Paolo Patierno <pp...@live.com> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>> using the streams library I noticed a difference (or there is a lack of knowledge on my side)with Apache Spark.
>>>>>
>>>>> Imagine following scenario ...
>>>>>
>>>>>
>>>>> I have a source topic where numeric values come in and I want to check the maximum value in the latest 5 seconds but ... putting the max value into a destination topic every 5 seconds.
>>>>>
>>>>> This is what happens with reduceByWindow method in Spark.
>>>>>
>>>>> I'm using reduce on a KStream here that process the max value taking into account previous values in the latest 5 seconds but the final value is put into the destination topic for each incoming value.
>>>>>
>>>>>
>>>>> For example ...
>>>>>
>>>>>
>>>>> An application sends numeric values every 1 second.
>>>>>
>>>>> With Spark ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 5 seconds (so when the window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 26.
>>>>>
>>>>> With Kafka Streams ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 1 seconds (so every time an incoming value arrives). Of course, if for example the sequence is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.
>>>>>
>>>>>
>>>>> Is it possible with Kafka Streams ? Or it's something to do at application level ?
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Paolo
>>>>>
>>>>>
>>>>> Paolo Patierno
>>>>> Senior Software Engineer (IoT) @ Red Hat
>>>>> Microsoft MVP on Windows Embedded & IoT
>>>>> Microsoft Azure Advisor
>>>>>
>>>>> Twitter : @ppatierno<http://twitter.com/ppatierno>
>>>>> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>>>>> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>>
> 
> -- 
> Signature
> <http://www.openbet.com/> 	Michal Borowiecki
> Senior Software Engineer L4
> 	T: 	+44 208 742 1600
> 
> 	
> 	+44 203 249 8448
> 
> 	
> 	 
> 	E: 	michal.borowiecki@openbet.com
> 	W: 	www.openbet.com <http://www.openbet.com/>
> 
> 	
> 	OpenBet Ltd
> 
> 	Chiswick Park Building 9
> 
> 	566 Chiswick High Rd
> 
> 	London
> 
> 	W4 5XT
> 
> 	UK
> 
> 	
> <https://www.openbet.com/email_promo>
> 
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmaster@openbet.com <ma...@openbet.com> and delete it
> from your system as well as any copies. The content of e-mails as well
> as traffic data may be monitored by OpenBet for employment and security
> purposes. To protect the environment please do not print this e-mail
> unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
> 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
> registered in England and Wales. Registered no. 3134634. VAT no.
> GB927523612
> 


Re: Kafka Streams vs Spark Streaming : reduce by window

Posted by Guozhang Wang <wa...@gmail.com>.
Although we have seen this request commonly being asked in the community,
to me people are actually requesting slightly different things when they
mention "one final output", or "trigger", or "window closure" terms, etc.
So I'd like to try summarizing them here based on my own understanding
before discussing further:

1). "Not caching globally": I would like to have "de-dupped" outputs on
some of the stateful operators but not on others, while currently the
caching config is global meaning that once turned on all stores will be
covered with a cache.

To me this is a valid use case request that is not supported today; for
example, we could consider having such per-store-caching in the state store
supplier API.

2). "Final output of the window": I want to know which output is the final
result for this windowed state updates (it could be the ONLY output for
this window or there can be some early "partial" outputs) such that no more
outputs will be generated afterwards.

To me this is a semi-valid use case and semi-confusion, since theoretically
there should be no "final output" and in practice the final output depends
on the implementation details of the window retention period, which is
usually not related to the computational logic at all, but more for the
monitoring / operational purposes. Today this can still be done through
Interactive Queries feature to check which window stores are still
available and then respectively respond that some state store values will
not change any more.

3). "No bytes-based triggers": The current mechanism seems just like a
"bytes-based triggering" mechanism to me, whereas I want a "time-based
triggering". I think Paolo's use case falls into this category, but there
are still some sub-categories:

  3.1). "Process-time based triggers": I think this is what Jay's proposed
solution tries to tackle. E.g. even if we are in the catching-up mode that
we processed a day's data in 5 min, we will only output 5 times if the
output interval is 1 min. This is where users want to have some partial
results to reduce the application's end2end latency, while bytes-based
triggers cannot guarantee that;

  3.2). "Event-time based triggers": this is what Paolo's use case falls
into. E.g. if we processed a day's data in 5 min, we will output 24 * 60 /
5 times as that much time has advanced. This is where --- like Paolo said
--- users want to maintain some history of the updates beside the final
output, while bytes-based triggers cannot guarantee that.

For both of these cases, with KIP-138 these should be addressable in the
lower-level Processor API: you can punctuate every one minute to send out
results, based either on stream time or event time. Question is whether /
how we want to support such syntax in the higher-level DSL: personally I
think both sub-categories are valid cases but 3.1) may be more commonly
motivated than 3.2).


In terms of importance / priority I think 1) > 3.1) > 3.2) >> 2).


Guozhang


On Sun, Jun 18, 2017 at 3:04 AM, Paolo Patierno <pp...@live.com> wrote:

> I'm just thinking that having output into a topic every X seconds thanks
> to the windowing could be a useful feature without using something
> interactive queries that are really powerful (I love them) but aren't so
> useful in this scenario.
>
> Using the caching parameter isn't useful in such scenario because it's in
> terms of bytes not in terms of time.
>
>
> Let's consider another scenario ...
>
>
> I have a sensor sending data every 1 seconds. Let's assume that our stream
> processing application is not online and the source topic is filled by
> sensor data with related event time.
>
> When the stream processing application comes online I'd like to have a
> record in the final topic every 5 seconds in order to have an history as
> well (because the application was offline). To be clear ...
>
> Imagine that starting from t = 0, the sensor starts to send data but
> application is offline and the topic is filled from t = 0 to t = 12 (with
> 12 events, one per second).
>
> At t = 12 application comes back online and processes the stream in order
> to process data from t = 0 to t = 4 (so first 5 seconds) putting the result
> into the destination queue. Then from t = 5 to t = 9 (other 5 seconds)
> putting the result into the destination queue and so on. If sensor rate
> isn't so fast then the application will start to process in real time at
> some point (it seems to me something like a batch processing which becomes
> real time processing).
>
> This scenario, for example, isn't possible with Spark today because when
> the application comes back online it process all data from t = 0 to t = 12
> immediately as they were a whole burst of data without considering t as
> event time to take into account for processing.
>
>
> I'm thinking aloud, considering some scenario that could have a value in
> the IoT space ...
>
>
> Thanks,
>
> Paolo.
>
>
> *Paolo Patierno*
>
> *Senior Software Engineer (IoT) @ Red Hat **Microsoft MVP on **Windows
> Embedded & IoT*
> *Microsoft Azure Advisor*
>
> Twitter : @ppatierno <http://twitter.com/ppatierno>
> Linkedin : paolopatierno <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience <http://paolopatierno.wordpress.com/>
>
>
> ------------------------------
> *From:* Michal Borowiecki <mi...@openbet.com>
> *Sent:* Sunday, June 18, 2017 9:34 AM
> *To:* dev@kafka.apache.org; Jay Kreps
> *Cc:* users@kafka.apache.org; Matthias J. Sax
>
> *Subject:* Re: Kafka Streams vs Spark Streaming : reduce by window
>
>
> If confusion is the problem, then totally agree no point adding more
> knobs. Perhaps you're right that users don't *really* want
> processing-time semantics. Just *think* they want them until they start
> considering replay/catch-up scenarios. I guess people rarely think about
> those from the start (I sure didn't).
>
> Cheers,
>
> Michał
>
> On 16/06/17 17:54, Jay Kreps wrote:
>
> I think the question is when do you actually *want* processing time
> semantics? There are definitely times when its safe to assume the two are
> close enough that a little lossiness doesn't matter much but it is pretty
> hard to make assumptions about when the processing time is and has been
> hard for us to think of a use case where its actually desirable.
>
> I think mostly what we've seen is confusion about the core concepts:
>
>    - stream -- immutable events that occur
>    - tables (including windows) -- current state of the world
>
> If the root problem is confusion adding knobs never makes it better. If
> the root problem is we're missing important use cases that justify the
> additional knobs then i think it's good to try to really understand them. I
> think there could be use cases around systems that don't take updates,
> example would be email, twitter, and some metrics stores.
>
> One solution that would be less complexity inducing than allowing new
> semantics, but might help with the use cases we need to collect, would be
> to add a new operator in the DSL. Something like .freezeAfter(30,
> TimeUnit.SECONDS) that collects all updates for a given window and both
> emits and enforces a single output after 30 seconds after the advancement
> of stream time and remembers that it is omitted, suppressing all further
> output (so the output is actually a KStream). This might or might not
> depend on wall clock time. Perhaps this is in fact what you are proposing?
>
> -Jay
>
>
>
> On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki <
> michal.borowiecki@openbet.com> wrote:
>
>> I wonder if it's a frequent enough use case that Kafka Streams should
>> consider providing this out of the box - this was asked for multiple times,
>> right?
>>
>> Personally, I agree totally with the philosophy of "no final
>> aggregation", as expressed by Eno's post, but IMO that is predicated
>> totally on event-time semantics.
>>
>> If users want processing-time semantics then, as the docs already point
>> out, there is no such thing as a late-arriving record - every record just
>> falls in the currently open window(s), hence the notion of final
>> aggregation makes perfect sense, from the usability point of view.
>>
>> The single abstraction of "stream time" proves leaky in some cases (e.g.
>> for punctuate method - being addressed in KIP-138). Perhaps this is another
>> case where processing-time semantics warrant explicit handling in the api -
>> but of course, only if there's sufficient user demand for this.
>>
>> What I could imagine is a new type of time window
>> (ProcessingTimeWindow?), that if used in an aggregation, the underlying
>> processor would force the WallclockTimestampExtractor (KAFKA-4144 enables
>> that) and would use the system-time punctuation (KIP-138) to send the final
>> aggregation value once the window has expired and could be configured to
>> not send intermediate updates while the window was open.
>>
>> Of course this is just a helper for the users, since they can implement
>> it all themselves using the low-level API, as Matthias pointed out already.
>> Just seems there's recurring interest in this.
>>
>> Again, this only makes sense for processing time semantics. For
>> event-time semantics I find the arguments for "no final aggregation"
>> totally convincing.
>>
>>
>> Cheers,
>>
>> Michał
>>
>> On 16/06/17 00:08, Matthias J. Sax wrote:
>>
>> Hi Paolo,
>>
>> This SO question might help, too:https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
>>
>> For Streams, the basic model is based on "change" and we report updates
>> to the "current" result immediately reducing latency to a minimum.
>>
>> Last, if you say it's going to fall into the next window, you won't get
>> event time semantics but you fall back processing time semantics, that
>> cannot provide exact results....
>>
>> If you really want to trade-off correctness version getting (late)
>> updates and want to use processing time semantics, you should configure
>> WallclockTimestampExtractor and implement a "update deduplication"
>> operator using table.toStream().transform(). You can attached a state to
>> your transformer and store all update there (ie, newer update overwrite
>> older updates). Punctuations allow you to emit "final" results for
>> windows for which "window end time" passed.
>>
>>
>> -Matthias
>>
>> On 6/15/17 9:21 AM, Paolo Patierno wrote:
>>
>> Hi Eno,
>>
>>
>> regarding closing window I think that it's up to the streaming application. I mean ...
>>
>> If I want something like I described, I know that a value outside my 5 seconds window will be taken into account for the next processing (in the next 5 seconds). I don't think I'm losing a record, I am ware that this record will fall in the next "processing" window. Btw I'll take a look at your article ! Thanks !
>>
>>
>> Paolo
>>
>>
>> Paolo Patierno
>> Senior Software Engineer (IoT) @ Red Hat
>> Microsoft MVP on Windows Embedded & IoT
>> Microsoft Azure Advisor
>>
>> Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
>> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> <http://it.linkedin.com/in/paolopatierno>
>> Blog : DevExperience<http://paolopatierno.wordpress.com/> <http://paolopatierno.wordpress.com/>
>>
>>
>> ________________________________
>> From: Eno Thereska <en...@gmail.com> <en...@gmail.com>
>> Sent: Thursday, June 15, 2017 3:57 PM
>> To: users@kafka.apache.org
>> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>>
>> Hi Paolo,
>>
>> Yeah, so if you want fewer records, you should actually "not" disable cache. If you disable cache you'll get all the records as you described.
>>
>> About closing windows: if you close a window and a late record arrives that should have been in that window, you basically lose the ability to process that record. In Kafka Streams we are robust to that, in that we handle late arriving records. There is a comparison here for example when we compare it to other methods that depend on watermarks or triggers: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/> <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
>>
>> Eno
>>
>>
>>
>> On 15 Jun 2017, at 14:57, Paolo Patierno <pp...@live.com> <pp...@live.com> wrote:
>>
>> Hi Emo,
>>
>>
>> thanks for the reply !
>>
>> Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 (so disabling cache).
>>
>> Regarding the interactive query API (I'll take a look) it means that it's up to the application doing something like we have oob with Spark.
>>
>> May I ask what do you mean with "We don’t believe in closing windows" ? Isn't it much more code that user has to write for having the same result ?
>>
>> I'm exploring Kafka Streams and it's very powerful imho even because the usage is pretty simple but this scenario could have a lack against Spark.
>>
>>
>> Thanks,
>>
>> Paolo.
>>
>>
>> Paolo Patierno
>> Senior Software Engineer (IoT) @ Red Hat
>> Microsoft MVP on Windows Embedded & IoT
>> Microsoft Azure Advisor
>>
>> Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
>> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> <http://it.linkedin.com/in/paolopatierno>
>> Blog : DevExperience<http://paolopatierno.wordpress.com/> <http://paolopatierno.wordpress.com/>
>>
>>
>> ________________________________
>> From: Eno Thereska <en...@gmail.com> <en...@gmail.com>
>> Sent: Thursday, June 15, 2017 1:45 PM
>> To: users@kafka.apache.org
>> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>>
>> Hi Paolo,
>>
>> That is indeed correct. We don’t believe in closing windows in Kafka Streams.
>> You could reduce the number of downstream records by using record caches: http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl> <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
>>
>> Alternatively you can just query the KTable whenever you want using the Interactive Query APIs (so when you query dictates what  data you receive), see this https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>>
>> Thanks
>> Eno
>>
>> On Jun 15, 2017, at 2:38 PM, Paolo Patierno <pp...@live.com> <pp...@live.com> wrote:
>>
>> Hi,
>>
>>
>> using the streams library I noticed a difference (or there is a lack of knowledge on my side)with Apache Spark.
>>
>> Imagine following scenario ...
>>
>>
>> I have a source topic where numeric values come in and I want to check the maximum value in the latest 5 seconds but ... putting the max value into a destination topic every 5 seconds.
>>
>> This is what happens with reduceByWindow method in Spark.
>>
>> I'm using reduce on a KStream here that process the max value taking into account previous values in the latest 5 seconds but the final value is put into the destination topic for each incoming value.
>>
>>
>> For example ...
>>
>>
>> An application sends numeric values every 1 second.
>>
>> With Spark ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 5 seconds (so when the window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 26.
>>
>> With Kafka Streams ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 1 seconds (so every time an incoming value arrives). Of course, if for example the sequence is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.
>>
>>
>> Is it possible with Kafka Streams ? Or it's something to do at application level ?
>>
>>
>> Thanks,
>>
>> Paolo
>>
>>
>> Paolo Patierno
>> Senior Software Engineer (IoT) @ Red Hat
>> Microsoft MVP on Windows Embedded & IoT
>> Microsoft Azure Advisor
>>
>> Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
>> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> <http://it.linkedin.com/in/paolopatierno>
>> Blog : DevExperience<http://paolopatierno.wordpress.com/> <http://paolopatierno.wordpress.com/>
>>
>>
>> --
>> <http://www.openbet.com/> Michal Borowiecki
>> Senior Software Engineer L4
>> T: +44 208 742 1600 <+44%2020%208742%201600>
>>
>>
>> +44 203 249 8448 <+44%2020%203249%208448>
>>
>>
>>
>> E: michal.borowiecki@openbet.com
>> W: www.openbet.com
>> OpenBet Ltd
>>
>> Chiswick Park Building 9
>>
>> 566 Chiswick High Rd
>>
>> London
>>
>> W4 5XT
>>
>> UK
>> <https://www.openbet.com/email_promo>
>> This message is confidential and intended only for the addressee. If you
>> have received this message in error, please immediately notify the
>> postmaster@openbet.com and delete it from your system as well as any
>> copies. The content of e-mails as well as traffic data may be monitored by
>> OpenBet for employment and security purposes. To protect the environment
>> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
>> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
>> United Kingdom. A company registered in England and Wales. Registered no.
>> 3134634. VAT no. GB927523612
>>
>
>
> --
> <http://www.openbet.com/> Michal Borowiecki
> Senior Software Engineer L4
> T: +44 208 742 1600 <+44%2020%208742%201600>
>
>
> +44 203 249 8448 <+44%2020%203249%208448>
>
>
>
> E: michal.borowiecki@openbet.com
> W: www.openbet.com
> OpenBet Ltd
>
> Chiswick Park Building 9
>
> 566 Chiswick High Rd
>
> London
>
> W4 5XT
>
> UK
> <https://www.openbet.com/email_promo>
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmaster@openbet.com and delete it from your system as well as any
> copies. The content of e-mails as well as traffic data may be monitored by
> OpenBet for employment and security purposes. To protect the environment
> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
> United Kingdom. A company registered in England and Wales. Registered no.
> 3134634. VAT no. GB927523612
>



-- 
-- Guozhang

Re: Kafka Streams vs Spark Streaming : reduce by window

Posted by Paolo Patierno <pp...@live.com>.
I'm just thinking that having output into a topic every X seconds thanks to the windowing could be a useful feature without using something interactive queries that are really powerful (I love them) but aren't so useful in this scenario.

Using the caching parameter isn't useful in such scenario because it's in terms of bytes not in terms of time.


Let's consider another scenario ...


I have a sensor sending data every 1 seconds. Let's assume that our stream processing application is not online and the source topic is filled by sensor data with related event time.

When the stream processing application comes online I'd like to have a record in the final topic every 5 seconds in order to have an history as well (because the application was offline). To be clear ...

Imagine that starting from t = 0, the sensor starts to send data but application is offline and the topic is filled from t = 0 to t = 12 (with 12 events, one per second).

At t = 12 application comes back online and processes the stream in order to process data from t = 0 to t = 4 (so first 5 seconds) putting the result into the destination queue. Then from t = 5 to t = 9 (other 5 seconds) putting the result into the destination queue and so on. If sensor rate isn't so fast then the application will start to process in real time at some point (it seems to me something like a batch processing which becomes real time processing).

This scenario, for example, isn't possible with Spark today because when the application comes back online it process all data from t = 0 to t = 12 immediately as they were a whole burst of data without considering t as event time to take into account for processing.


I'm thinking aloud, considering some scenario that could have a value in the IoT space ...


Thanks,

Paolo.


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>


________________________________
From: Michal Borowiecki <mi...@openbet.com>
Sent: Sunday, June 18, 2017 9:34 AM
To: dev@kafka.apache.org; Jay Kreps
Cc: users@kafka.apache.org; Matthias J. Sax
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window


If confusion is the problem, then totally agree no point adding more knobs. Perhaps you're right that users don't really want processing-time semantics. Just think they want them until they start considering replay/catch-up scenarios. I guess people rarely think about those from the start (I sure didn't).

Cheers,

Michał

On 16/06/17 17:54, Jay Kreps wrote:
I think the question is when do you actually want processing time semantics? There are definitely times when its safe to assume the two are close enough that a little lossiness doesn't matter much but it is pretty hard to make assumptions about when the processing time is and has been hard for us to think of a use case where its actually desirable.

I think mostly what we've seen is confusion about the core concepts:

  *   stream -- immutable events that occur
  *   tables (including windows) -- current state of the world

If the root problem is confusion adding knobs never makes it better. If the root problem is we're missing important use cases that justify the additional knobs then i think it's good to try to really understand them. I think there could be use cases around systems that don't take updates, example would be email, twitter, and some metrics stores.

One solution that would be less complexity inducing than allowing new semantics, but might help with the use cases we need to collect, would be to add a new operator in the DSL. Something like .freezeAfter(30, TimeUnit.SECONDS) that collects all updates for a given window and both emits and enforces a single output after 30 seconds after the advancement of stream time and remembers that it is omitted, suppressing all further output (so the output is actually a KStream). This might or might not depend on wall clock time. Perhaps this is in fact what you are proposing?

-Jay



On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki <mi...@openbet.com>> wrote:

I wonder if it's a frequent enough use case that Kafka Streams should consider providing this out of the box - this was asked for multiple times, right?

Personally, I agree totally with the philosophy of "no final aggregation", as expressed by Eno's post, but IMO that is predicated totally on event-time semantics.

If users want processing-time semantics then, as the docs already point out, there is no such thing as a late-arriving record - every record just falls in the currently open window(s), hence the notion of final aggregation makes perfect sense, from the usability point of view.

The single abstraction of "stream time" proves leaky in some cases (e.g. for punctuate method - being addressed in KIP-138). Perhaps this is another case where processing-time semantics warrant explicit handling in the api - but of course, only if there's sufficient user demand for this.

What I could imagine is a new type of time window (ProcessingTimeWindow?), that if used in an aggregation, the underlying processor would force the WallclockTimestampExtractor (KAFKA-4144 enables that) and would use the system-time punctuation (KIP-138) to send the final aggregation value once the window has expired and could be configured to not send intermediate updates while the window was open.

Of course this is just a helper for the users, since they can implement it all themselves using the low-level API, as Matthias pointed out already. Just seems there's recurring interest in this.

Again, this only makes sense for processing time semantics. For event-time semantics I find the arguments for "no final aggregation" totally convincing.


Cheers,

Michał

On 16/06/17 00:08, Matthias J. Sax wrote:

Hi Paolo,

This SO question might help, too:
https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable

For Streams, the basic model is based on "change" and we report updates
to the "current" result immediately reducing latency to a minimum.

Last, if you say it's going to fall into the next window, you won't get
event time semantics but you fall back processing time semantics, that
cannot provide exact results....

If you really want to trade-off correctness version getting (late)
updates and want to use processing time semantics, you should configure
WallclockTimestampExtractor and implement a "update deduplication"
operator using table.toStream().transform(). You can attached a state to
your transformer and store all update there (ie, newer update overwrite
older updates). Punctuations allow you to emit "final" results for
windows for which "window end time" passed.


-Matthias

On 6/15/17 9:21 AM, Paolo Patierno wrote:


Hi Eno,


regarding closing window I think that it's up to the streaming application. I mean ...

If I want something like I described, I know that a value outside my 5 seconds window will be taken into account for the next processing (in the next 5 seconds). I don't think I'm losing a record, I am ware that this record will fall in the next "processing" window. Btw I'll take a look at your article ! Thanks !


Paolo


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno><http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno><http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/><http://paolopatierno.wordpress.com/>


________________________________
From: Eno Thereska <en...@gmail.com>
Sent: Thursday, June 15, 2017 3:57 PM
To: users@kafka.apache.org<ma...@kafka.apache.org>
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window

Hi Paolo,

Yeah, so if you want fewer records, you should actually "not" disable cache. If you disable cache you'll get all the records as you described.

About closing windows: if you close a window and a late record arrives that should have been in that window, you basically lose the ability to process that record. In Kafka Streams we are robust to that, in that we handle late arriving records. There is a comparison here for example when we compare it to other methods that depend on watermarks or triggers: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/><https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>

Eno




On 15 Jun 2017, at 14:57, Paolo Patierno <pp...@live.com> wrote:

Hi Emo,


thanks for the reply !

Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 (so disabling cache).

Regarding the interactive query API (I'll take a look) it means that it's up to the application doing something like we have oob with Spark.

May I ask what do you mean with "We don’t believe in closing windows" ? Isn't it much more code that user has to write for having the same result ?

I'm exploring Kafka Streams and it's very powerful imho even because the usage is pretty simple but this scenario could have a lack against Spark.


Thanks,

Paolo.


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno><http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno><http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/><http://paolopatierno.wordpress.com/>


________________________________
From: Eno Thereska <en...@gmail.com>
Sent: Thursday, June 15, 2017 1:45 PM
To: users@kafka.apache.org<ma...@kafka.apache.org>
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window

Hi Paolo,

That is indeed correct. We don’t believe in closing windows in Kafka Streams.
You could reduce the number of downstream records by using record caches: http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl><http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.

Alternatively you can just query the KTable whenever you want using the Interactive Query APIs (so when you query dictates what  data you receive), see this https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/><https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>

Thanks
Eno


On Jun 15, 2017, at 2:38 PM, Paolo Patierno <pp...@live.com> wrote:

Hi,


using the streams library I noticed a difference (or there is a lack of knowledge on my side)with Apache Spark.

Imagine following scenario ...


I have a source topic where numeric values come in and I want to check the maximum value in the latest 5 seconds but ... putting the max value into a destination topic every 5 seconds.

This is what happens with reduceByWindow method in Spark.

I'm using reduce on a KStream here that process the max value taking into account previous values in the latest 5 seconds but the final value is put into the destination topic for each incoming value.


For example ...


An application sends numeric values every 1 second.

With Spark ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 5 seconds (so when the window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 26.

With Kafka Streams ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 1 seconds (so every time an incoming value arrives). Of course, if for example the sequence is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.


Is it possible with Kafka Streams ? Or it's something to do at application level ?


Thanks,

Paolo


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno><http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno><http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/><http://paolopatierno.wordpress.com/>


--
[cid:part1.8B1B2672.513E8D1F@openbet.com]<http://www.openbet.com/>      Michal Borowiecki
Senior Software Engineer L4
[cid:part3.A197F79E.6599CD2E@openbet.com]       T:      +44 208 742 1600<tel:+44%2020%208742%201600>


        +44 203 249 8448<tel:+44%2020%203249%208448>



[cid:part3.A197F79E.6599CD2E@openbet.com]       E:      michal.borowiecki@openbet.com<ma...@openbet.com>
[cid:part3.A197F79E.6599CD2E@openbet.com]       W:      www.openbet.com<http://www.openbet.com/>

[cid:part3.A197F79E.6599CD2E@openbet.com]       OpenBet Ltd

        Chiswick Park Building 9

        566 Chiswick High Rd

        London

        W4 5XT

        UK


[cid:part8.AD874422.39D4ACC7@openbet.com]<https://www.openbet.com/email_promo>
This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmaster@openbet.com<ma...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612


--
[cid:part37.60A60B07.531F4F89@openbet.com]<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
[cid:part39.516FE424.9BA25527@openbet.com]      T:      +44 208 742 1600


        +44 203 249 8448



[cid:part39.516FE424.9BA25527@openbet.com]      E:      michal.borowiecki@openbet.com<ma...@openbet.com>
[cid:part39.516FE424.9BA25527@openbet.com]      W:      www.openbet.com<http://www.openbet.com/>

[cid:part39.516FE424.9BA25527@openbet.com]      OpenBet Ltd

        Chiswick Park Building 9

        566 Chiswick High Rd

        London

        W4 5XT

        UK


[cid:part44.C0363795.9305870D@openbet.com]<https://www.openbet.com/email_promo>
This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmaster@openbet.com<ma...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612

Re: Kafka Streams vs Spark Streaming : reduce by window

Posted by Paolo Patierno <pp...@live.com>.
I'm just thinking that having output into a topic every X seconds thanks to the windowing could be a useful feature without using something interactive queries that are really powerful (I love them) but aren't so useful in this scenario.

Using the caching parameter isn't useful in such scenario because it's in terms of bytes not in terms of time.


Let's consider another scenario ...


I have a sensor sending data every 1 seconds. Let's assume that our stream processing application is not online and the source topic is filled by sensor data with related event time.

When the stream processing application comes online I'd like to have a record in the final topic every 5 seconds in order to have an history as well (because the application was offline). To be clear ...

Imagine that starting from t = 0, the sensor starts to send data but application is offline and the topic is filled from t = 0 to t = 12 (with 12 events, one per second).

At t = 12 application comes back online and processes the stream in order to process data from t = 0 to t = 4 (so first 5 seconds) putting the result into the destination queue. Then from t = 5 to t = 9 (other 5 seconds) putting the result into the destination queue and so on. If sensor rate isn't so fast then the application will start to process in real time at some point (it seems to me something like a batch processing which becomes real time processing).

This scenario, for example, isn't possible with Spark today because when the application comes back online it process all data from t = 0 to t = 12 immediately as they were a whole burst of data without considering t as event time to take into account for processing.


I'm thinking aloud, considering some scenario that could have a value in the IoT space ...


Thanks,

Paolo.


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>


________________________________
From: Michal Borowiecki <mi...@openbet.com>
Sent: Sunday, June 18, 2017 9:34 AM
To: dev@kafka.apache.org; Jay Kreps
Cc: users@kafka.apache.org; Matthias J. Sax
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window


If confusion is the problem, then totally agree no point adding more knobs. Perhaps you're right that users don't really want processing-time semantics. Just think they want them until they start considering replay/catch-up scenarios. I guess people rarely think about those from the start (I sure didn't).

Cheers,

Michał

On 16/06/17 17:54, Jay Kreps wrote:
I think the question is when do you actually want processing time semantics? There are definitely times when its safe to assume the two are close enough that a little lossiness doesn't matter much but it is pretty hard to make assumptions about when the processing time is and has been hard for us to think of a use case where its actually desirable.

I think mostly what we've seen is confusion about the core concepts:

  *   stream -- immutable events that occur
  *   tables (including windows) -- current state of the world

If the root problem is confusion adding knobs never makes it better. If the root problem is we're missing important use cases that justify the additional knobs then i think it's good to try to really understand them. I think there could be use cases around systems that don't take updates, example would be email, twitter, and some metrics stores.

One solution that would be less complexity inducing than allowing new semantics, but might help with the use cases we need to collect, would be to add a new operator in the DSL. Something like .freezeAfter(30, TimeUnit.SECONDS) that collects all updates for a given window and both emits and enforces a single output after 30 seconds after the advancement of stream time and remembers that it is omitted, suppressing all further output (so the output is actually a KStream). This might or might not depend on wall clock time. Perhaps this is in fact what you are proposing?

-Jay



On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki <mi...@openbet.com>> wrote:

I wonder if it's a frequent enough use case that Kafka Streams should consider providing this out of the box - this was asked for multiple times, right?

Personally, I agree totally with the philosophy of "no final aggregation", as expressed by Eno's post, but IMO that is predicated totally on event-time semantics.

If users want processing-time semantics then, as the docs already point out, there is no such thing as a late-arriving record - every record just falls in the currently open window(s), hence the notion of final aggregation makes perfect sense, from the usability point of view.

The single abstraction of "stream time" proves leaky in some cases (e.g. for punctuate method - being addressed in KIP-138). Perhaps this is another case where processing-time semantics warrant explicit handling in the api - but of course, only if there's sufficient user demand for this.

What I could imagine is a new type of time window (ProcessingTimeWindow?), that if used in an aggregation, the underlying processor would force the WallclockTimestampExtractor (KAFKA-4144 enables that) and would use the system-time punctuation (KIP-138) to send the final aggregation value once the window has expired and could be configured to not send intermediate updates while the window was open.

Of course this is just a helper for the users, since they can implement it all themselves using the low-level API, as Matthias pointed out already. Just seems there's recurring interest in this.

Again, this only makes sense for processing time semantics. For event-time semantics I find the arguments for "no final aggregation" totally convincing.


Cheers,

Michał

On 16/06/17 00:08, Matthias J. Sax wrote:

Hi Paolo,

This SO question might help, too:
https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable

For Streams, the basic model is based on "change" and we report updates
to the "current" result immediately reducing latency to a minimum.

Last, if you say it's going to fall into the next window, you won't get
event time semantics but you fall back processing time semantics, that
cannot provide exact results....

If you really want to trade-off correctness version getting (late)
updates and want to use processing time semantics, you should configure
WallclockTimestampExtractor and implement a "update deduplication"
operator using table.toStream().transform(). You can attached a state to
your transformer and store all update there (ie, newer update overwrite
older updates). Punctuations allow you to emit "final" results for
windows for which "window end time" passed.


-Matthias

On 6/15/17 9:21 AM, Paolo Patierno wrote:


Hi Eno,


regarding closing window I think that it's up to the streaming application. I mean ...

If I want something like I described, I know that a value outside my 5 seconds window will be taken into account for the next processing (in the next 5 seconds). I don't think I'm losing a record, I am ware that this record will fall in the next "processing" window. Btw I'll take a look at your article ! Thanks !


Paolo


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno><http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno><http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/><http://paolopatierno.wordpress.com/>


________________________________
From: Eno Thereska <en...@gmail.com>
Sent: Thursday, June 15, 2017 3:57 PM
To: users@kafka.apache.org<ma...@kafka.apache.org>
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window

Hi Paolo,

Yeah, so if you want fewer records, you should actually "not" disable cache. If you disable cache you'll get all the records as you described.

About closing windows: if you close a window and a late record arrives that should have been in that window, you basically lose the ability to process that record. In Kafka Streams we are robust to that, in that we handle late arriving records. There is a comparison here for example when we compare it to other methods that depend on watermarks or triggers: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/><https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>

Eno




On 15 Jun 2017, at 14:57, Paolo Patierno <pp...@live.com> wrote:

Hi Emo,


thanks for the reply !

Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 (so disabling cache).

Regarding the interactive query API (I'll take a look) it means that it's up to the application doing something like we have oob with Spark.

May I ask what do you mean with "We don’t believe in closing windows" ? Isn't it much more code that user has to write for having the same result ?

I'm exploring Kafka Streams and it's very powerful imho even because the usage is pretty simple but this scenario could have a lack against Spark.


Thanks,

Paolo.


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno><http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno><http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/><http://paolopatierno.wordpress.com/>


________________________________
From: Eno Thereska <en...@gmail.com>
Sent: Thursday, June 15, 2017 1:45 PM
To: users@kafka.apache.org<ma...@kafka.apache.org>
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window

Hi Paolo,

That is indeed correct. We don’t believe in closing windows in Kafka Streams.
You could reduce the number of downstream records by using record caches: http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl><http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.

Alternatively you can just query the KTable whenever you want using the Interactive Query APIs (so when you query dictates what  data you receive), see this https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/><https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>

Thanks
Eno


On Jun 15, 2017, at 2:38 PM, Paolo Patierno <pp...@live.com> wrote:

Hi,


using the streams library I noticed a difference (or there is a lack of knowledge on my side)with Apache Spark.

Imagine following scenario ...


I have a source topic where numeric values come in and I want to check the maximum value in the latest 5 seconds but ... putting the max value into a destination topic every 5 seconds.

This is what happens with reduceByWindow method in Spark.

I'm using reduce on a KStream here that process the max value taking into account previous values in the latest 5 seconds but the final value is put into the destination topic for each incoming value.


For example ...


An application sends numeric values every 1 second.

With Spark ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 5 seconds (so when the window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 26.

With Kafka Streams ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 1 seconds (so every time an incoming value arrives). Of course, if for example the sequence is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.


Is it possible with Kafka Streams ? Or it's something to do at application level ?


Thanks,

Paolo


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno><http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno><http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/><http://paolopatierno.wordpress.com/>


--
[cid:part1.8B1B2672.513E8D1F@openbet.com]<http://www.openbet.com/>      Michal Borowiecki
Senior Software Engineer L4
[cid:part3.A197F79E.6599CD2E@openbet.com]       T:      +44 208 742 1600<tel:+44%2020%208742%201600>


        +44 203 249 8448<tel:+44%2020%203249%208448>



[cid:part3.A197F79E.6599CD2E@openbet.com]       E:      michal.borowiecki@openbet.com<ma...@openbet.com>
[cid:part3.A197F79E.6599CD2E@openbet.com]       W:      www.openbet.com<http://www.openbet.com/>

[cid:part3.A197F79E.6599CD2E@openbet.com]       OpenBet Ltd

        Chiswick Park Building 9

        566 Chiswick High Rd

        London

        W4 5XT

        UK


[cid:part8.AD874422.39D4ACC7@openbet.com]<https://www.openbet.com/email_promo>
This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmaster@openbet.com<ma...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612


--
[cid:part37.60A60B07.531F4F89@openbet.com]<http://www.openbet.com/>     Michal Borowiecki
Senior Software Engineer L4
[cid:part39.516FE424.9BA25527@openbet.com]      T:      +44 208 742 1600


        +44 203 249 8448



[cid:part39.516FE424.9BA25527@openbet.com]      E:      michal.borowiecki@openbet.com<ma...@openbet.com>
[cid:part39.516FE424.9BA25527@openbet.com]      W:      www.openbet.com<http://www.openbet.com/>

[cid:part39.516FE424.9BA25527@openbet.com]      OpenBet Ltd

        Chiswick Park Building 9

        566 Chiswick High Rd

        London

        W4 5XT

        UK


[cid:part44.C0363795.9305870D@openbet.com]<https://www.openbet.com/email_promo>
This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmaster@openbet.com<ma...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612

Re: Kafka Streams vs Spark Streaming : reduce by window

Posted by Michal Borowiecki <mi...@openbet.com>.
If confusion is the problem, then totally agree no point adding more 
knobs. Perhaps you're right that users don't /really/ want 
processing-time semantics. Just /think/ they want them until they start 
considering replay/catch-up scenarios. I guess people rarely think about 
those from the start (I sure didn't).

Cheers,

Michał


On 16/06/17 17:54, Jay Kreps wrote:
> I think the question is when do you actually /want/ processing time 
> semantics? There are definitely times when its safe to assume the two 
> are close enough that a little lossiness doesn't matter much but it is 
> pretty hard to make assumptions about when the processing time is and 
> has been hard for us to think of a use case where its actually desirable.
>
> I think mostly what we've seen is confusion about the core concepts:
>
>   * stream -- immutable events that occur
>   * tables (including windows) -- current state of the world
>
> If the root problem is confusion adding knobs never makes it better. 
> If the root problem is we're missing important use cases that justify 
> the additional knobs then i think it's good to try to really 
> understand them. I think there could be use cases around systems that 
> don't take updates, example would be email, twitter, and some metrics 
> stores.
>
> One solution that would be less complexity inducing than allowing new 
> semantics, but might help with the use cases we need to collect, would 
> be to add a new operator in the DSL. Something like .freezeAfter(30, 
> TimeUnit.SECONDS) that collects all updates for a given window and 
> both emits and enforces a single output after 30 seconds after the 
> advancement of stream time and remembers that it is omitted, 
> suppressing all further output (so the output is actually a KStream). 
> This might or might not depend on wall clock time. Perhaps this is in 
> fact what you are proposing?
>
> -Jay
>
>
>
> On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki 
> <michal.borowiecki@openbet.com <ma...@openbet.com>> 
> wrote:
>
>     I wonder if it's a frequent enough use case that Kafka Streams
>     should consider providing this out of the box - this was asked for
>     multiple times, right?
>
>     Personally, I agree totally with the philosophy of "no final
>     aggregation", as expressed by Eno's post, but IMO that is
>     predicated totally on event-time semantics.
>
>     If users want processing-time semantics then, as the docs already
>     point out, there is no such thing as a late-arriving record -
>     every record just falls in the currently open window(s), hence the
>     notion of final aggregation makes perfect sense, from the
>     usability point of view.
>
>     The single abstraction of "stream time" proves leaky in some cases
>     (e.g. for punctuate method - being addressed in KIP-138). Perhaps
>     this is another case where processing-time semantics warrant
>     explicit handling in the api - but of course, only if there's
>     sufficient user demand for this.
>
>     What I could imagine is a new type of time window
>     (ProcessingTimeWindow?), that if used in an aggregation, the
>     underlying processor would force the WallclockTimestampExtractor
>     (KAFKA-4144 enables that) and would use the system-time
>     punctuation (KIP-138) to send the final aggregation value once the
>     window has expired and could be configured to not send
>     intermediate updates while the window was open.
>
>     Of course this is just a helper for the users, since they can
>     implement it all themselves using the low-level API, as Matthias
>     pointed out already. Just seems there's recurring interest in this.
>
>     Again, this only makes sense for processing time semantics. For
>     event-time semantics I find the arguments for "no final
>     aggregation" totally convincing.
>
>
>     Cheers,
>
>     Michał
>
>
>     On 16/06/17 00:08, Matthias J. Sax wrote:
>>     Hi Paolo,
>>
>>     This SO question might help, too:
>>     https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
>>     <https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable>
>>
>>     For Streams, the basic model is based on "change" and we report updates
>>     to the "current" result immediately reducing latency to a minimum.
>>
>>     Last, if you say it's going to fall into the next window, you won't get
>>     event time semantics but you fall back processing time semantics, that
>>     cannot provide exact results....
>>
>>     If you really want to trade-off correctness version getting (late)
>>     updates and want to use processing time semantics, you should configure
>>     WallclockTimestampExtractor and implement a "update deduplication"
>>     operator using table.toStream().transform(). You can attached a state to
>>     your transformer and store all update there (ie, newer update overwrite
>>     older updates). Punctuations allow you to emit "final" results for
>>     windows for which "window end time" passed.
>>
>>
>>     -Matthias
>>
>>     On 6/15/17 9:21 AM, Paolo Patierno wrote:
>>>     Hi Eno,
>>>
>>>
>>>     regarding closing window I think that it's up to the streaming application. I mean ...
>>>
>>>     If I want something like I described, I know that a value outside my 5 seconds window will be taken into account for the next processing (in the next 5 seconds). I don't think I'm losing a record, I am ware that this record will fall in the next "processing" window. Btw I'll take a look at your article ! Thanks !
>>>
>>>
>>>     Paolo
>>>
>>>
>>>     Paolo Patierno
>>>     Senior Software Engineer (IoT) @ Red Hat
>>>     Microsoft MVP on Windows Embedded & IoT
>>>     Microsoft Azure Advisor
>>>
>>>     Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
>>>     Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>>>     <http://it.linkedin.com/in/paolopatierno>
>>>     Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>>     <http://paolopatierno.wordpress.com/>
>>>
>>>
>>>     ________________________________
>>>     From: Eno Thereska<en...@gmail.com> <ma...@gmail.com>
>>>     Sent: Thursday, June 15, 2017 3:57 PM
>>>     To:users@kafka.apache.org <ma...@kafka.apache.org>
>>>     Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>>>
>>>     Hi Paolo,
>>>
>>>     Yeah, so if you want fewer records, you should actually "not" disable cache. If you disable cache you'll get all the records as you described.
>>>
>>>     About closing windows: if you close a window and a late record arrives that should have been in that window, you basically lose the ability to process that record. In Kafka Streams we are robust to that, in that we handle late arriving records. There is a comparison here for example when we compare it to other methods that depend on watermarks or triggers:https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
>>>     <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>  <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
>>>     <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
>>>
>>>     Eno
>>>
>>>
>>>>     On 15 Jun 2017, at 14:57, Paolo Patierno<pp...@live.com> <ma...@live.com>  wrote:
>>>>
>>>>     Hi Emo,
>>>>
>>>>
>>>>     thanks for the reply !
>>>>
>>>>     Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 (so disabling cache).
>>>>
>>>>     Regarding the interactive query API (I'll take a look) it means that it's up to the application doing something like we have oob with Spark.
>>>>
>>>>     May I ask what do you mean with "We don’t believe in closing windows" ? Isn't it much more code that user has to write for having the same result ?
>>>>
>>>>     I'm exploring Kafka Streams and it's very powerful imho even because the usage is pretty simple but this scenario could have a lack against Spark.
>>>>
>>>>
>>>>     Thanks,
>>>>
>>>>     Paolo.
>>>>
>>>>
>>>>     Paolo Patierno
>>>>     Senior Software Engineer (IoT) @ Red Hat
>>>>     Microsoft MVP on Windows Embedded & IoT
>>>>     Microsoft Azure Advisor
>>>>
>>>>     Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
>>>>     Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>>>>     <http://it.linkedin.com/in/paolopatierno>
>>>>     Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>>>     <http://paolopatierno.wordpress.com/>
>>>>
>>>>
>>>>     ________________________________
>>>>     From: Eno Thereska<en...@gmail.com> <ma...@gmail.com>
>>>>     Sent: Thursday, June 15, 2017 1:45 PM
>>>>     To:users@kafka.apache.org <ma...@kafka.apache.org>
>>>>     Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>>>>
>>>>     Hi Paolo,
>>>>
>>>>     That is indeed correct. We don’t believe in closing windows in Kafka Streams.
>>>>     You could reduce the number of downstream records by using record caches:http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl
>>>>     <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>  <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>
>>>>     <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
>>>>
>>>>     Alternatively you can just query the KTable whenever you want using the Interactive Query APIs (so when you query dictates what  data you receive), see thishttps://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
>>>>     <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>  <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>>>>     <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>>>>
>>>>     Thanks
>>>>     Eno
>>>>>     On Jun 15, 2017, at 2:38 PM, Paolo Patierno<pp...@live.com> <ma...@live.com>  wrote:
>>>>>
>>>>>     Hi,
>>>>>
>>>>>
>>>>>     using the streams library I noticed a difference (or there is a lack of knowledge on my side)with Apache Spark.
>>>>>
>>>>>     Imagine following scenario ...
>>>>>
>>>>>
>>>>>     I have a source topic where numeric values come in and I want to check the maximum value in the latest 5 seconds but ... putting the max value into a destination topic every 5 seconds.
>>>>>
>>>>>     This is what happens with reduceByWindow method in Spark.
>>>>>
>>>>>     I'm using reduce on a KStream here that process the max value taking into account previous values in the latest 5 seconds but the final value is put into the destination topic for each incoming value.
>>>>>
>>>>>
>>>>>     For example ...
>>>>>
>>>>>
>>>>>     An application sends numeric values every 1 second.
>>>>>
>>>>>     With Spark ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 5 seconds (so when the window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 26.
>>>>>
>>>>>     With Kafka Streams ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 1 seconds (so every time an incoming value arrives). Of course, if for example the sequence is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.
>>>>>
>>>>>
>>>>>     Is it possible with Kafka Streams ? Or it's something to do at application level ?
>>>>>
>>>>>
>>>>>     Thanks,
>>>>>
>>>>>     Paolo
>>>>>
>>>>>
>>>>>     Paolo Patierno
>>>>>     Senior Software Engineer (IoT) @ Red Hat
>>>>>     Microsoft MVP on Windows Embedded & IoT
>>>>>     Microsoft Azure Advisor
>>>>>
>>>>>     Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
>>>>>     Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>>>>>     <http://it.linkedin.com/in/paolopatierno>
>>>>>     Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>>>>     <http://paolopatierno.wordpress.com/>
>
>     -- 
>     <http://www.openbet.com/> 	Michal Borowiecki
>     Senior Software Engineer L4
>     	T: 	+44 208 742 1600 <tel:+44%2020%208742%201600>
>
>     	
>     	+44 203 249 8448 <tel:+44%2020%203249%208448>
>
>     	
>     	
>     	E: 	michal.borowiecki@openbet.com
>     <ma...@openbet.com>
>     	W: 	www.openbet.com <http://www.openbet.com/>
>
>     	
>     	OpenBet Ltd
>
>     	Chiswick Park Building 9
>
>     	566 Chiswick High Rd
>
>     	London
>
>     	W4 5XT
>
>     	UK
>
>     	
>     <https://www.openbet.com/email_promo>
>
>     This message is confidential and intended only for the addressee.
>     If you have received this message in error, please immediately
>     notify the postmaster@openbet.com <ma...@openbet.com>
>     and delete it from your system as well as any copies. The content
>     of e-mails as well as traffic data may be monitored by OpenBet for
>     employment and security purposes. To protect the environment
>     please do not print this e-mail unless necessary. OpenBet Ltd.
>     Registered Office: Chiswick Park Building 9, 566 Chiswick High
>     Road, London, W4 5XT, United Kingdom. A company registered in
>     England and Wales. Registered no. 3134634. VAT no. GB927523612
>
>

-- 
Signature
<http://www.openbet.com/> 	Michal Borowiecki
Senior Software Engineer L4
	T: 	+44 208 742 1600

	
	+44 203 249 8448

	
	
	E: 	michal.borowiecki@openbet.com
	W: 	www.openbet.com <http://www.openbet.com/>

	
	OpenBet Ltd

	Chiswick Park Building 9

	566 Chiswick High Rd

	London

	W4 5XT

	UK

	
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmaster@openbet.com <ma...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612


Re: Kafka Streams vs Spark Streaming : reduce by window

Posted by Michal Borowiecki <mi...@openbet.com>.
If confusion is the problem, then totally agree no point adding more 
knobs. Perhaps you're right that users don't /really/ want 
processing-time semantics. Just /think/ they want them until they start 
considering replay/catch-up scenarios. I guess people rarely think about 
those from the start (I sure didn't).

Cheers,

Michał


On 16/06/17 17:54, Jay Kreps wrote:
> I think the question is when do you actually /want/ processing time 
> semantics? There are definitely times when its safe to assume the two 
> are close enough that a little lossiness doesn't matter much but it is 
> pretty hard to make assumptions about when the processing time is and 
> has been hard for us to think of a use case where its actually desirable.
>
> I think mostly what we've seen is confusion about the core concepts:
>
>   * stream -- immutable events that occur
>   * tables (including windows) -- current state of the world
>
> If the root problem is confusion adding knobs never makes it better. 
> If the root problem is we're missing important use cases that justify 
> the additional knobs then i think it's good to try to really 
> understand them. I think there could be use cases around systems that 
> don't take updates, example would be email, twitter, and some metrics 
> stores.
>
> One solution that would be less complexity inducing than allowing new 
> semantics, but might help with the use cases we need to collect, would 
> be to add a new operator in the DSL. Something like .freezeAfter(30, 
> TimeUnit.SECONDS) that collects all updates for a given window and 
> both emits and enforces a single output after 30 seconds after the 
> advancement of stream time and remembers that it is omitted, 
> suppressing all further output (so the output is actually a KStream). 
> This might or might not depend on wall clock time. Perhaps this is in 
> fact what you are proposing?
>
> -Jay
>
>
>
> On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki 
> <michal.borowiecki@openbet.com <ma...@openbet.com>> 
> wrote:
>
>     I wonder if it's a frequent enough use case that Kafka Streams
>     should consider providing this out of the box - this was asked for
>     multiple times, right?
>
>     Personally, I agree totally with the philosophy of "no final
>     aggregation", as expressed by Eno's post, but IMO that is
>     predicated totally on event-time semantics.
>
>     If users want processing-time semantics then, as the docs already
>     point out, there is no such thing as a late-arriving record -
>     every record just falls in the currently open window(s), hence the
>     notion of final aggregation makes perfect sense, from the
>     usability point of view.
>
>     The single abstraction of "stream time" proves leaky in some cases
>     (e.g. for punctuate method - being addressed in KIP-138). Perhaps
>     this is another case where processing-time semantics warrant
>     explicit handling in the api - but of course, only if there's
>     sufficient user demand for this.
>
>     What I could imagine is a new type of time window
>     (ProcessingTimeWindow?), that if used in an aggregation, the
>     underlying processor would force the WallclockTimestampExtractor
>     (KAFKA-4144 enables that) and would use the system-time
>     punctuation (KIP-138) to send the final aggregation value once the
>     window has expired and could be configured to not send
>     intermediate updates while the window was open.
>
>     Of course this is just a helper for the users, since they can
>     implement it all themselves using the low-level API, as Matthias
>     pointed out already. Just seems there's recurring interest in this.
>
>     Again, this only makes sense for processing time semantics. For
>     event-time semantics I find the arguments for "no final
>     aggregation" totally convincing.
>
>
>     Cheers,
>
>     Michał
>
>
>     On 16/06/17 00:08, Matthias J. Sax wrote:
>>     Hi Paolo,
>>
>>     This SO question might help, too:
>>     https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
>>     <https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable>
>>
>>     For Streams, the basic model is based on "change" and we report updates
>>     to the "current" result immediately reducing latency to a minimum.
>>
>>     Last, if you say it's going to fall into the next window, you won't get
>>     event time semantics but you fall back processing time semantics, that
>>     cannot provide exact results....
>>
>>     If you really want to trade-off correctness version getting (late)
>>     updates and want to use processing time semantics, you should configure
>>     WallclockTimestampExtractor and implement a "update deduplication"
>>     operator using table.toStream().transform(). You can attached a state to
>>     your transformer and store all update there (ie, newer update overwrite
>>     older updates). Punctuations allow you to emit "final" results for
>>     windows for which "window end time" passed.
>>
>>
>>     -Matthias
>>
>>     On 6/15/17 9:21 AM, Paolo Patierno wrote:
>>>     Hi Eno,
>>>
>>>
>>>     regarding closing window I think that it's up to the streaming application. I mean ...
>>>
>>>     If I want something like I described, I know that a value outside my 5 seconds window will be taken into account for the next processing (in the next 5 seconds). I don't think I'm losing a record, I am ware that this record will fall in the next "processing" window. Btw I'll take a look at your article ! Thanks !
>>>
>>>
>>>     Paolo
>>>
>>>
>>>     Paolo Patierno
>>>     Senior Software Engineer (IoT) @ Red Hat
>>>     Microsoft MVP on Windows Embedded & IoT
>>>     Microsoft Azure Advisor
>>>
>>>     Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
>>>     Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>>>     <http://it.linkedin.com/in/paolopatierno>
>>>     Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>>     <http://paolopatierno.wordpress.com/>
>>>
>>>
>>>     ________________________________
>>>     From: Eno Thereska<en...@gmail.com> <ma...@gmail.com>
>>>     Sent: Thursday, June 15, 2017 3:57 PM
>>>     To:users@kafka.apache.org <ma...@kafka.apache.org>
>>>     Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>>>
>>>     Hi Paolo,
>>>
>>>     Yeah, so if you want fewer records, you should actually "not" disable cache. If you disable cache you'll get all the records as you described.
>>>
>>>     About closing windows: if you close a window and a late record arrives that should have been in that window, you basically lose the ability to process that record. In Kafka Streams we are robust to that, in that we handle late arriving records. There is a comparison here for example when we compare it to other methods that depend on watermarks or triggers:https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
>>>     <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>  <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
>>>     <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
>>>
>>>     Eno
>>>
>>>
>>>>     On 15 Jun 2017, at 14:57, Paolo Patierno<pp...@live.com> <ma...@live.com>  wrote:
>>>>
>>>>     Hi Emo,
>>>>
>>>>
>>>>     thanks for the reply !
>>>>
>>>>     Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 (so disabling cache).
>>>>
>>>>     Regarding the interactive query API (I'll take a look) it means that it's up to the application doing something like we have oob with Spark.
>>>>
>>>>     May I ask what do you mean with "We don’t believe in closing windows" ? Isn't it much more code that user has to write for having the same result ?
>>>>
>>>>     I'm exploring Kafka Streams and it's very powerful imho even because the usage is pretty simple but this scenario could have a lack against Spark.
>>>>
>>>>
>>>>     Thanks,
>>>>
>>>>     Paolo.
>>>>
>>>>
>>>>     Paolo Patierno
>>>>     Senior Software Engineer (IoT) @ Red Hat
>>>>     Microsoft MVP on Windows Embedded & IoT
>>>>     Microsoft Azure Advisor
>>>>
>>>>     Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
>>>>     Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>>>>     <http://it.linkedin.com/in/paolopatierno>
>>>>     Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>>>     <http://paolopatierno.wordpress.com/>
>>>>
>>>>
>>>>     ________________________________
>>>>     From: Eno Thereska<en...@gmail.com> <ma...@gmail.com>
>>>>     Sent: Thursday, June 15, 2017 1:45 PM
>>>>     To:users@kafka.apache.org <ma...@kafka.apache.org>
>>>>     Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>>>>
>>>>     Hi Paolo,
>>>>
>>>>     That is indeed correct. We don’t believe in closing windows in Kafka Streams.
>>>>     You could reduce the number of downstream records by using record caches:http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl
>>>>     <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>  <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>
>>>>     <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
>>>>
>>>>     Alternatively you can just query the KTable whenever you want using the Interactive Query APIs (so when you query dictates what  data you receive), see thishttps://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
>>>>     <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>  <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>>>>     <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>>>>
>>>>     Thanks
>>>>     Eno
>>>>>     On Jun 15, 2017, at 2:38 PM, Paolo Patierno<pp...@live.com> <ma...@live.com>  wrote:
>>>>>
>>>>>     Hi,
>>>>>
>>>>>
>>>>>     using the streams library I noticed a difference (or there is a lack of knowledge on my side)with Apache Spark.
>>>>>
>>>>>     Imagine following scenario ...
>>>>>
>>>>>
>>>>>     I have a source topic where numeric values come in and I want to check the maximum value in the latest 5 seconds but ... putting the max value into a destination topic every 5 seconds.
>>>>>
>>>>>     This is what happens with reduceByWindow method in Spark.
>>>>>
>>>>>     I'm using reduce on a KStream here that process the max value taking into account previous values in the latest 5 seconds but the final value is put into the destination topic for each incoming value.
>>>>>
>>>>>
>>>>>     For example ...
>>>>>
>>>>>
>>>>>     An application sends numeric values every 1 second.
>>>>>
>>>>>     With Spark ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 5 seconds (so when the window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 26.
>>>>>
>>>>>     With Kafka Streams ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 1 seconds (so every time an incoming value arrives). Of course, if for example the sequence is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.
>>>>>
>>>>>
>>>>>     Is it possible with Kafka Streams ? Or it's something to do at application level ?
>>>>>
>>>>>
>>>>>     Thanks,
>>>>>
>>>>>     Paolo
>>>>>
>>>>>
>>>>>     Paolo Patierno
>>>>>     Senior Software Engineer (IoT) @ Red Hat
>>>>>     Microsoft MVP on Windows Embedded & IoT
>>>>>     Microsoft Azure Advisor
>>>>>
>>>>>     Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
>>>>>     Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>>>>>     <http://it.linkedin.com/in/paolopatierno>
>>>>>     Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>>>>     <http://paolopatierno.wordpress.com/>
>
>     -- 
>     <http://www.openbet.com/> 	Michal Borowiecki
>     Senior Software Engineer L4
>     	T: 	+44 208 742 1600 <tel:+44%2020%208742%201600>
>
>     	
>     	+44 203 249 8448 <tel:+44%2020%203249%208448>
>
>     	
>     	
>     	E: 	michal.borowiecki@openbet.com
>     <ma...@openbet.com>
>     	W: 	www.openbet.com <http://www.openbet.com/>
>
>     	
>     	OpenBet Ltd
>
>     	Chiswick Park Building 9
>
>     	566 Chiswick High Rd
>
>     	London
>
>     	W4 5XT
>
>     	UK
>
>     	
>     <https://www.openbet.com/email_promo>
>
>     This message is confidential and intended only for the addressee.
>     If you have received this message in error, please immediately
>     notify the postmaster@openbet.com <ma...@openbet.com>
>     and delete it from your system as well as any copies. The content
>     of e-mails as well as traffic data may be monitored by OpenBet for
>     employment and security purposes. To protect the environment
>     please do not print this e-mail unless necessary. OpenBet Ltd.
>     Registered Office: Chiswick Park Building 9, 566 Chiswick High
>     Road, London, W4 5XT, United Kingdom. A company registered in
>     England and Wales. Registered no. 3134634. VAT no. GB927523612
>
>

-- 
Signature
<http://www.openbet.com/> 	Michal Borowiecki
Senior Software Engineer L4
	T: 	+44 208 742 1600

	
	+44 203 249 8448

	
	
	E: 	michal.borowiecki@openbet.com
	W: 	www.openbet.com <http://www.openbet.com/>

	
	OpenBet Ltd

	Chiswick Park Building 9

	566 Chiswick High Rd

	London

	W4 5XT

	UK

	
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmaster@openbet.com <ma...@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612


Re: Kafka Streams vs Spark Streaming : reduce by window

Posted by Jay Kreps <ja...@confluent.io>.
I think the question is when do you actually *want* processing time
semantics? There are definitely times when its safe to assume the two are
close enough that a little lossiness doesn't matter much but it is pretty
hard to make assumptions about when the processing time is and has been
hard for us to think of a use case where its actually desirable.

I think mostly what we've seen is confusion about the core concepts:

   - stream -- immutable events that occur
   - tables (including windows) -- current state of the world

If the root problem is confusion adding knobs never makes it better. If the
root problem is we're missing important use cases that justify the
additional knobs then i think it's good to try to really understand them. I
think there could be use cases around systems that don't take updates,
example would be email, twitter, and some metrics stores.

One solution that would be less complexity inducing than allowing new
semantics, but might help with the use cases we need to collect, would be
to add a new operator in the DSL. Something like .freezeAfter(30,
TimeUnit.SECONDS) that collects all updates for a given window and both
emits and enforces a single output after 30 seconds after the advancement
of stream time and remembers that it is omitted, suppressing all further
output (so the output is actually a KStream). This might or might not
depend on wall clock time. Perhaps this is in fact what you are proposing?

-Jay



On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki <
michal.borowiecki@openbet.com> wrote:

> I wonder if it's a frequent enough use case that Kafka Streams should
> consider providing this out of the box - this was asked for multiple times,
> right?
>
> Personally, I agree totally with the philosophy of "no final aggregation",
> as expressed by Eno's post, but IMO that is predicated totally on
> event-time semantics.
>
> If users want processing-time semantics then, as the docs already point
> out, there is no such thing as a late-arriving record - every record just
> falls in the currently open window(s), hence the notion of final
> aggregation makes perfect sense, from the usability point of view.
>
> The single abstraction of "stream time" proves leaky in some cases (e.g.
> for punctuate method - being addressed in KIP-138). Perhaps this is another
> case where processing-time semantics warrant explicit handling in the api -
> but of course, only if there's sufficient user demand for this.
>
> What I could imagine is a new type of time window (ProcessingTimeWindow?),
> that if used in an aggregation, the underlying processor would force the
> WallclockTimestampExtractor (KAFKA-4144 enables that) and would use the
> system-time punctuation (KIP-138) to send the final aggregation value once
> the window has expired and could be configured to not send intermediate
> updates while the window was open.
>
> Of course this is just a helper for the users, since they can implement it
> all themselves using the low-level API, as Matthias pointed out already.
> Just seems there's recurring interest in this.
>
> Again, this only makes sense for processing time semantics. For event-time
> semantics I find the arguments for "no final aggregation" totally
> convincing.
>
>
> Cheers,
>
> Michał
>
> On 16/06/17 00:08, Matthias J. Sax wrote:
>
> Hi Paolo,
>
> This SO question might help, too:https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
>
> For Streams, the basic model is based on "change" and we report updates
> to the "current" result immediately reducing latency to a minimum.
>
> Last, if you say it's going to fall into the next window, you won't get
> event time semantics but you fall back processing time semantics, that
> cannot provide exact results....
>
> If you really want to trade-off correctness version getting (late)
> updates and want to use processing time semantics, you should configure
> WallclockTimestampExtractor and implement a "update deduplication"
> operator using table.toStream().transform(). You can attached a state to
> your transformer and store all update there (ie, newer update overwrite
> older updates). Punctuations allow you to emit "final" results for
> windows for which "window end time" passed.
>
>
> -Matthias
>
> On 6/15/17 9:21 AM, Paolo Patierno wrote:
>
> Hi Eno,
>
>
> regarding closing window I think that it's up to the streaming application. I mean ...
>
> If I want something like I described, I know that a value outside my 5 seconds window will be taken into account for the next processing (in the next 5 seconds). I don't think I'm losing a record, I am ware that this record will fall in the next "processing" window. Btw I'll take a look at your article ! Thanks !
>
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> <http://paolopatierno.wordpress.com/>
>
>
> ________________________________
> From: Eno Thereska <en...@gmail.com> <en...@gmail.com>
> Sent: Thursday, June 15, 2017 3:57 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> Yeah, so if you want fewer records, you should actually "not" disable cache. If you disable cache you'll get all the records as you described.
>
> About closing windows: if you close a window and a late record arrives that should have been in that window, you basically lose the ability to process that record. In Kafka Streams we are robust to that, in that we handle late arriving records. There is a comparison here for example when we compare it to other methods that depend on watermarks or triggers: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/> <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
>
> Eno
>
>
>
> On 15 Jun 2017, at 14:57, Paolo Patierno <pp...@live.com> <pp...@live.com> wrote:
>
> Hi Emo,
>
>
> thanks for the reply !
>
> Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 (so disabling cache).
>
> Regarding the interactive query API (I'll take a look) it means that it's up to the application doing something like we have oob with Spark.
>
> May I ask what do you mean with "We don’t believe in closing windows" ? Isn't it much more code that user has to write for having the same result ?
>
> I'm exploring Kafka Streams and it's very powerful imho even because the usage is pretty simple but this scenario could have a lack against Spark.
>
>
> Thanks,
>
> Paolo.
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> <http://paolopatierno.wordpress.com/>
>
>
> ________________________________
> From: Eno Thereska <en...@gmail.com> <en...@gmail.com>
> Sent: Thursday, June 15, 2017 1:45 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> That is indeed correct. We don’t believe in closing windows in Kafka Streams.
> You could reduce the number of downstream records by using record caches: http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl> <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
>
> Alternatively you can just query the KTable whenever you want using the Interactive Query APIs (so when you query dictates what  data you receive), see this https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>
> Thanks
> Eno
>
> On Jun 15, 2017, at 2:38 PM, Paolo Patierno <pp...@live.com> <pp...@live.com> wrote:
>
> Hi,
>
>
> using the streams library I noticed a difference (or there is a lack of knowledge on my side)with Apache Spark.
>
> Imagine following scenario ...
>
>
> I have a source topic where numeric values come in and I want to check the maximum value in the latest 5 seconds but ... putting the max value into a destination topic every 5 seconds.
>
> This is what happens with reduceByWindow method in Spark.
>
> I'm using reduce on a KStream here that process the max value taking into account previous values in the latest 5 seconds but the final value is put into the destination topic for each incoming value.
>
>
> For example ...
>
>
> An application sends numeric values every 1 second.
>
> With Spark ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 5 seconds (so when the window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 26.
>
> With Kafka Streams ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 1 seconds (so every time an incoming value arrives). Of course, if for example the sequence is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.
>
>
> Is it possible with Kafka Streams ? Or it's something to do at application level ?
>
>
> Thanks,
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> <http://paolopatierno.wordpress.com/>
>
>
> --
> <http://www.openbet.com/> Michal Borowiecki
> Senior Software Engineer L4
> T: +44 208 742 1600 <+44%2020%208742%201600>
>
>
> +44 203 249 8448 <+44%2020%203249%208448>
>
>
>
> E: michal.borowiecki@openbet.com
> W: www.openbet.com
> OpenBet Ltd
>
> Chiswick Park Building 9
>
> 566 Chiswick High Rd
>
> London
>
> W4 5XT
>
> UK
> <https://www.openbet.com/email_promo>
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmaster@openbet.com and delete it from your system as well as any
> copies. The content of e-mails as well as traffic data may be monitored by
> OpenBet for employment and security purposes. To protect the environment
> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
> United Kingdom. A company registered in England and Wales. Registered no.
> 3134634. VAT no. GB927523612
>

Re: Kafka Streams vs Spark Streaming : reduce by window

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks Michał!

That is very good feedback.


-Matthias

On 6/16/17 2:38 AM, Michal Borowiecki wrote:
> I wonder if it's a frequent enough use case that Kafka Streams should
> consider providing this out of the box - this was asked for multiple
> times, right?
> 
> Personally, I agree totally with the philosophy of "no final
> aggregation", as expressed by Eno's post, but IMO that is predicated
> totally on event-time semantics.
> 
> If users want processing-time semantics then, as the docs already point
> out, there is no such thing as a late-arriving record - every record
> just falls in the currently open window(s), hence the notion of final
> aggregation makes perfect sense, from the usability point of view.
> 
> The single abstraction of "stream time" proves leaky in some cases (e.g.
> for punctuate method - being addressed in KIP-138). Perhaps this is
> another case where processing-time semantics warrant explicit handling
> in the api - but of course, only if there's sufficient user demand for this.
> 
> What I could imagine is a new type of time window
> (ProcessingTimeWindow?), that if used in an aggregation, the underlying
> processor would force the WallclockTimestampExtractor (KAFKA-4144
> enables that) and would use the system-time punctuation (KIP-138) to
> send the final aggregation value once the window has expired and could
> be configured to not send intermediate updates while the window was open.
> 
> Of course this is just a helper for the users, since they can implement
> it all themselves using the low-level API, as Matthias pointed out
> already. Just seems there's recurring interest in this.
> 
> Again, this only makes sense for processing time semantics. For
> event-time semantics I find the arguments for "no final aggregation"
> totally convincing.
> 
> 
> Cheers,
> 
> Michał
> 
> 
> On 16/06/17 00:08, Matthias J. Sax wrote:
>> Hi Paolo,
>>
>> This SO question might help, too:
>> https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
>>
>> For Streams, the basic model is based on "change" and we report updates
>> to the "current" result immediately reducing latency to a minimum.
>>
>> Last, if you say it's going to fall into the next window, you won't get
>> event time semantics but you fall back processing time semantics, that
>> cannot provide exact results....
>>
>> If you really want to trade-off correctness version getting (late)
>> updates and want to use processing time semantics, you should configure
>> WallclockTimestampExtractor and implement a "update deduplication"
>> operator using table.toStream().transform(). You can attached a state to
>> your transformer and store all update there (ie, newer update overwrite
>> older updates). Punctuations allow you to emit "final" results for
>> windows for which "window end time" passed.
>>
>>
>> -Matthias
>>
>> On 6/15/17 9:21 AM, Paolo Patierno wrote:
>>> Hi Eno,
>>>
>>>
>>> regarding closing window I think that it's up to the streaming application. I mean ...
>>>
>>> If I want something like I described, I know that a value outside my 5 seconds window will be taken into account for the next processing (in the next 5 seconds). I don't think I'm losing a record, I am ware that this record will fall in the next "processing" window. Btw I'll take a look at your article ! Thanks !
>>>
>>>
>>> Paolo
>>>
>>>
>>> Paolo Patierno
>>> Senior Software Engineer (IoT) @ Red Hat
>>> Microsoft MVP on Windows Embedded & IoT
>>> Microsoft Azure Advisor
>>>
>>> Twitter : @ppatierno<http://twitter.com/ppatierno>
>>> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>>> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>>
>>>
>>> ________________________________
>>> From: Eno Thereska <en...@gmail.com>
>>> Sent: Thursday, June 15, 2017 3:57 PM
>>> To: users@kafka.apache.org
>>> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>>>
>>> Hi Paolo,
>>>
>>> Yeah, so if you want fewer records, you should actually "not" disable cache. If you disable cache you'll get all the records as you described.
>>>
>>> About closing windows: if you close a window and a late record arrives that should have been in that window, you basically lose the ability to process that record. In Kafka Streams we are robust to that, in that we handle late arriving records. There is a comparison here for example when we compare it to other methods that depend on watermarks or triggers: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
>>>
>>> Eno
>>>
>>>
>>>> On 15 Jun 2017, at 14:57, Paolo Patierno <pp...@live.com> wrote:
>>>>
>>>> Hi Emo,
>>>>
>>>>
>>>> thanks for the reply !
>>>>
>>>> Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 (so disabling cache).
>>>>
>>>> Regarding the interactive query API (I'll take a look) it means that it's up to the application doing something like we have oob with Spark.
>>>>
>>>> May I ask what do you mean with "We don’t believe in closing windows" ? Isn't it much more code that user has to write for having the same result ?
>>>>
>>>> I'm exploring Kafka Streams and it's very powerful imho even because the usage is pretty simple but this scenario could have a lack against Spark.
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Paolo.
>>>>
>>>>
>>>> Paolo Patierno
>>>> Senior Software Engineer (IoT) @ Red Hat
>>>> Microsoft MVP on Windows Embedded & IoT
>>>> Microsoft Azure Advisor
>>>>
>>>> Twitter : @ppatierno<http://twitter.com/ppatierno>
>>>> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>>>> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>>>
>>>>
>>>> ________________________________
>>>> From: Eno Thereska <en...@gmail.com>
>>>> Sent: Thursday, June 15, 2017 1:45 PM
>>>> To: users@kafka.apache.org
>>>> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>>>>
>>>> Hi Paolo,
>>>>
>>>> That is indeed correct. We don’t believe in closing windows in Kafka Streams.
>>>> You could reduce the number of downstream records by using record caches: http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
>>>>
>>>> Alternatively you can just query the KTable whenever you want using the Interactive Query APIs (so when you query dictates what  data you receive), see this https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>>>>
>>>> Thanks
>>>> Eno
>>>>> On Jun 15, 2017, at 2:38 PM, Paolo Patierno <pp...@live.com> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>> using the streams library I noticed a difference (or there is a lack of knowledge on my side)with Apache Spark.
>>>>>
>>>>> Imagine following scenario ...
>>>>>
>>>>>
>>>>> I have a source topic where numeric values come in and I want to check the maximum value in the latest 5 seconds but ... putting the max value into a destination topic every 5 seconds.
>>>>>
>>>>> This is what happens with reduceByWindow method in Spark.
>>>>>
>>>>> I'm using reduce on a KStream here that process the max value taking into account previous values in the latest 5 seconds but the final value is put into the destination topic for each incoming value.
>>>>>
>>>>>
>>>>> For example ...
>>>>>
>>>>>
>>>>> An application sends numeric values every 1 second.
>>>>>
>>>>> With Spark ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 5 seconds (so when the window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 26.
>>>>>
>>>>> With Kafka Streams ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 1 seconds (so every time an incoming value arrives). Of course, if for example the sequence is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.
>>>>>
>>>>>
>>>>> Is it possible with Kafka Streams ? Or it's something to do at application level ?
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Paolo
>>>>>
>>>>>
>>>>> Paolo Patierno
>>>>> Senior Software Engineer (IoT) @ Red Hat
>>>>> Microsoft MVP on Windows Embedded & IoT
>>>>> Microsoft Azure Advisor
>>>>>
>>>>> Twitter : @ppatierno<http://twitter.com/ppatierno>
>>>>> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>>>>> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>>
> 
> -- 
> Signature
> <http://www.openbet.com/> 	Michal Borowiecki
> Senior Software Engineer L4
> 	T: 	+44 208 742 1600
> 
> 	
> 	+44 203 249 8448
> 
> 	
> 	 
> 	E: 	michal.borowiecki@openbet.com
> 	W: 	www.openbet.com <http://www.openbet.com/>
> 
> 	
> 	OpenBet Ltd
> 
> 	Chiswick Park Building 9
> 
> 	566 Chiswick High Rd
> 
> 	London
> 
> 	W4 5XT
> 
> 	UK
> 
> 	
> <https://www.openbet.com/email_promo>
> 
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmaster@openbet.com <ma...@openbet.com> and delete it
> from your system as well as any copies. The content of e-mails as well
> as traffic data may be monitored by OpenBet for employment and security
> purposes. To protect the environment please do not print this e-mail
> unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
> 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
> registered in England and Wales. Registered no. 3134634. VAT no.
> GB927523612
> 


Re: Kafka Streams vs Spark Streaming : reduce by window

Posted by Jay Kreps <ja...@confluent.io>.
I think the question is when do you actually *want* processing time
semantics? There are definitely times when its safe to assume the two are
close enough that a little lossiness doesn't matter much but it is pretty
hard to make assumptions about when the processing time is and has been
hard for us to think of a use case where its actually desirable.

I think mostly what we've seen is confusion about the core concepts:

   - stream -- immutable events that occur
   - tables (including windows) -- current state of the world

If the root problem is confusion adding knobs never makes it better. If the
root problem is we're missing important use cases that justify the
additional knobs then i think it's good to try to really understand them. I
think there could be use cases around systems that don't take updates,
example would be email, twitter, and some metrics stores.

One solution that would be less complexity inducing than allowing new
semantics, but might help with the use cases we need to collect, would be
to add a new operator in the DSL. Something like .freezeAfter(30,
TimeUnit.SECONDS) that collects all updates for a given window and both
emits and enforces a single output after 30 seconds after the advancement
of stream time and remembers that it is omitted, suppressing all further
output (so the output is actually a KStream). This might or might not
depend on wall clock time. Perhaps this is in fact what you are proposing?

-Jay



On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki <
michal.borowiecki@openbet.com> wrote:

> I wonder if it's a frequent enough use case that Kafka Streams should
> consider providing this out of the box - this was asked for multiple times,
> right?
>
> Personally, I agree totally with the philosophy of "no final aggregation",
> as expressed by Eno's post, but IMO that is predicated totally on
> event-time semantics.
>
> If users want processing-time semantics then, as the docs already point
> out, there is no such thing as a late-arriving record - every record just
> falls in the currently open window(s), hence the notion of final
> aggregation makes perfect sense, from the usability point of view.
>
> The single abstraction of "stream time" proves leaky in some cases (e.g.
> for punctuate method - being addressed in KIP-138). Perhaps this is another
> case where processing-time semantics warrant explicit handling in the api -
> but of course, only if there's sufficient user demand for this.
>
> What I could imagine is a new type of time window (ProcessingTimeWindow?),
> that if used in an aggregation, the underlying processor would force the
> WallclockTimestampExtractor (KAFKA-4144 enables that) and would use the
> system-time punctuation (KIP-138) to send the final aggregation value once
> the window has expired and could be configured to not send intermediate
> updates while the window was open.
>
> Of course this is just a helper for the users, since they can implement it
> all themselves using the low-level API, as Matthias pointed out already.
> Just seems there's recurring interest in this.
>
> Again, this only makes sense for processing time semantics. For event-time
> semantics I find the arguments for "no final aggregation" totally
> convincing.
>
>
> Cheers,
>
> Michał
>
> On 16/06/17 00:08, Matthias J. Sax wrote:
>
> Hi Paolo,
>
> This SO question might help, too:https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
>
> For Streams, the basic model is based on "change" and we report updates
> to the "current" result immediately reducing latency to a minimum.
>
> Last, if you say it's going to fall into the next window, you won't get
> event time semantics but you fall back processing time semantics, that
> cannot provide exact results....
>
> If you really want to trade-off correctness version getting (late)
> updates and want to use processing time semantics, you should configure
> WallclockTimestampExtractor and implement a "update deduplication"
> operator using table.toStream().transform(). You can attached a state to
> your transformer and store all update there (ie, newer update overwrite
> older updates). Punctuations allow you to emit "final" results for
> windows for which "window end time" passed.
>
>
> -Matthias
>
> On 6/15/17 9:21 AM, Paolo Patierno wrote:
>
> Hi Eno,
>
>
> regarding closing window I think that it's up to the streaming application. I mean ...
>
> If I want something like I described, I know that a value outside my 5 seconds window will be taken into account for the next processing (in the next 5 seconds). I don't think I'm losing a record, I am ware that this record will fall in the next "processing" window. Btw I'll take a look at your article ! Thanks !
>
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> <http://paolopatierno.wordpress.com/>
>
>
> ________________________________
> From: Eno Thereska <en...@gmail.com> <en...@gmail.com>
> Sent: Thursday, June 15, 2017 3:57 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> Yeah, so if you want fewer records, you should actually "not" disable cache. If you disable cache you'll get all the records as you described.
>
> About closing windows: if you close a window and a late record arrives that should have been in that window, you basically lose the ability to process that record. In Kafka Streams we are robust to that, in that we handle late arriving records. There is a comparison here for example when we compare it to other methods that depend on watermarks or triggers: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/> <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
>
> Eno
>
>
>
> On 15 Jun 2017, at 14:57, Paolo Patierno <pp...@live.com> <pp...@live.com> wrote:
>
> Hi Emo,
>
>
> thanks for the reply !
>
> Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 (so disabling cache).
>
> Regarding the interactive query API (I'll take a look) it means that it's up to the application doing something like we have oob with Spark.
>
> May I ask what do you mean with "We don’t believe in closing windows" ? Isn't it much more code that user has to write for having the same result ?
>
> I'm exploring Kafka Streams and it's very powerful imho even because the usage is pretty simple but this scenario could have a lack against Spark.
>
>
> Thanks,
>
> Paolo.
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> <http://paolopatierno.wordpress.com/>
>
>
> ________________________________
> From: Eno Thereska <en...@gmail.com> <en...@gmail.com>
> Sent: Thursday, June 15, 2017 1:45 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> That is indeed correct. We don’t believe in closing windows in Kafka Streams.
> You could reduce the number of downstream records by using record caches: http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl> <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
>
> Alternatively you can just query the KTable whenever you want using the Interactive Query APIs (so when you query dictates what  data you receive), see this https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>
> Thanks
> Eno
>
> On Jun 15, 2017, at 2:38 PM, Paolo Patierno <pp...@live.com> <pp...@live.com> wrote:
>
> Hi,
>
>
> using the streams library I noticed a difference (or there is a lack of knowledge on my side)with Apache Spark.
>
> Imagine following scenario ...
>
>
> I have a source topic where numeric values come in and I want to check the maximum value in the latest 5 seconds but ... putting the max value into a destination topic every 5 seconds.
>
> This is what happens with reduceByWindow method in Spark.
>
> I'm using reduce on a KStream here that process the max value taking into account previous values in the latest 5 seconds but the final value is put into the destination topic for each incoming value.
>
>
> For example ...
>
>
> An application sends numeric values every 1 second.
>
> With Spark ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 5 seconds (so when the window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 26.
>
> With Kafka Streams ... the source gets values every 1 second, process max in a window of 5 seconds, puts the max into the destination every 1 seconds (so every time an incoming value arrives). Of course, if for example the sequence is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.
>
>
> Is it possible with Kafka Streams ? Or it's something to do at application level ?
>
>
> Thanks,
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> <http://paolopatierno.wordpress.com/>
>
>
> --
> <http://www.openbet.com/> Michal Borowiecki
> Senior Software Engineer L4
> T: +44 208 742 1600 <+44%2020%208742%201600>
>
>
> +44 203 249 8448 <+44%2020%203249%208448>
>
>
>
> E: michal.borowiecki@openbet.com
> W: www.openbet.com
> OpenBet Ltd
>
> Chiswick Park Building 9
>
> 566 Chiswick High Rd
>
> London
>
> W4 5XT
>
> UK
> <https://www.openbet.com/email_promo>
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmaster@openbet.com and delete it from your system as well as any
> copies. The content of e-mails as well as traffic data may be monitored by
> OpenBet for employment and security purposes. To protect the environment
> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
> United Kingdom. A company registered in England and Wales. Registered no.
> 3134634. VAT no. GB927523612
>