You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Benjamin Black <be...@gmail.com> on 2017/01/04 20:48:49 UTC

Aggregated windowed counts

Hello,

I'm looking for guidance on how to approach a counting problem. We want to
consume a stream of data that consists of IDs and generate an output of the
aggregated count with a window size of X seconds using processing time and
a hopping time window. For example, using a window size of 1 second, if we
get IDs 1, 2, 2, 2 in the 1st second, then the output would be 1=1, 2=3. If
we get IDs 1, 3, 3 in the 2nd second then the output would be 1=1, 3=2. The
aggregated count will then be turned into increment commands to a cache and
a database.

Obviously we will need some state to be stored during the count of a
window, but we only need to keep it for the time period of the window (i.e.
a second). I was thinking this could be achieved by using a persistent
store, where the counts are reset during the punctuate and the store topic
uses log compression. Alternatively, we could simple have an in memory
store that is reset during the punctuate. My concern with the in memory
store is that I don't know when the input topic offset is committed or when
the output data is written and therefore we could lose data. Ultimately, at
the end of the second, the input offset and output data should be written
at the same time, reducing the likelihood of lost data. We would rather
lose data, than have duplicate counts. What is the correct approach? Is
there a better way of tackling the problem?

I have put together some code, but it doesn't do exactly what I expect. I'm
happy to share if it helps.

Thanks,
Ben

Re: Aggregated windowed counts

Posted by "Matthias J. Sax" <ma...@confluent.io>.
On a clean restart on the same machine, the local RocksDB will just be
reused as it contains the complete state. Thus there is no need to read
the changelog topic at all.

The changelog topic is only read when a state is moved from one node to
another, or the state got corrupted due to an failure (ie, recovery case).

For both those cases, the whole changelog will be consumed. This might
take some time. But keep in mind, that a changelog topic uses
"compaction policy" thus it will eventually only contain a single entry
per window -- additionally, a "retention policy" is applied and entries
are delete after window retention time expires.

If you still have a too large changelog topic and rebuilding the state
takes too long, you can configure StandByTasks that rebuild the state on
a different machine in the background constantly (they do not do any
actual processing). In case of failure StandByTasks will be used to
quickly recreate the failed tasks.

See:
http://docs.confluent.io/current/streams/architecture.html#fault-tolerance


-Matthias

On 1/5/17 8:13 AM, Benjamin Black wrote:
> I understand now. The commit triggers the output of the window data,
> whether or not the window is complete. For example, if I use .print() as
> you suggest:
> 
> [KSTREAM-AGGREGATE-0000000003]: [kafka@1483631920000] , (9<-null)
> [KSTREAM-AGGREGATE-0000000003]: [kafka@1483631925000] , (5<-null)
> [KSTREAM-AGGREGATE-0000000003]: [kafka@1483631925000] , (9<-null)
> [KSTREAM-AGGREGATE-0000000003]: [kafka@1483631930000] , (2<-null)
> 
> The second line is an intermediate result of the third line. I suppose this
> is fine if we are storing the count with the time window, but not if we are
> trying to do a total count of the each word. I'm guessing the only solution
> is a handcrafted solution using the lower level API as suggested in the
> stackoverflow post.
> 
> I have another question concerning how the Count ktable data is stored. If
> I understand correctly, on restart the process will re-create the state of
> the ktable by reading from the beginning the Count topic
> (wordcount-lambda-example-Counts-changelog). Over time wouldn't this be a
> lot of data? Or is there some mechanism used to only read from a position
> near the end?
> 
> On Wed, Jan 4, 2017 at 7:35 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> There is no such thing as a final window aggregate and you might see
>> intermediate results -- thus the count do not add up.
>>
>> Please have a look here:
>>
>>
>> http://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable/38945277#38945277
>>
>> and here:
>>
>>
>> http://docs.confluent.io/current/streams/developer-guide.html#memory-management
>>
>>
>> On each commit, the current intermediate result will be flushed from the
>> de-duplication cache -- thus, for smaller commit interval you see more
>> intermediate results and thus it seems to be more off.
>>
>> In .toStream((k, v) -> k.key()) you get rid of the window-id -- if you
>> keep it, you can see which result record belong to the same window. The
>> simplest way for testing would be to use .print() instead of .toStream()
>> to see the key as window-id plus record-key.
>>
>>
>> -Matthias
>>
>>
>> On 1/4/17 2:09 PM, Benjamin Black wrote:
>>> I'm hoping the DSL will do what I want :) Currently the example is
>>> continuously adding instead of bucketing, so if I modify it by adding a
>>> window to the count function:
>>>
>>> .groupBy((key, word) -> word)
>>> .count(TimeWindows.of(5000L), "Counts")
>>> .toStream((k, v) -> k.key());
>>>
>>> Then I do see bucketing happening. However, it isn't accurate. For
>> example,
>>> I type into the console "kafka" as 20 sentences, but the output I get is:
>>>
>>> kafka 4
>>> kafka 9
>>> kafka 2
>>> kafka 7
>>>
>>> Which equals 22. What am I doing wrong? What is the relationship between
>>> commit interval and time window. The smaller I make commit interval, the
>>> less accurate it becomes.
>>>
>>>
>>> On Wed, Jan 4, 2017 at 3:53 PM Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>
>>>> Do you know about Kafka Streams? It's DSL gives you exactly what you
>>>> want to do.
>>>>
>>>> Check out the documentation and WordCount example:
>>>>
>>>> http://docs.confluent.io/current/streams/index.html
>>>>
>>>>
>> https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java
>>>>
>>>>
>>>> Let us know if you have further questions.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 1/4/17 12:48 PM, Benjamin Black wrote:
>>>>> Hello,
>>>>>
>>>>> I'm looking for guidance on how to approach a counting problem. We want
>>>> to
>>>>> consume a stream of data that consists of IDs and generate an output of
>>>> the
>>>>> aggregated count with a window size of X seconds using processing time
>>>> and
>>>>> a hopping time window. For example, using a window size of 1 second, if
>>>> we
>>>>> get IDs 1, 2, 2, 2 in the 1st second, then the output would be 1=1,
>> 2=3.
>>>> If
>>>>> we get IDs 1, 3, 3 in the 2nd second then the output would be 1=1, 3=2.
>>>> The
>>>>> aggregated count will then be turned into increment commands to a cache
>>>> and
>>>>> a database.
>>>>>
>>>>> Obviously we will need some state to be stored during the count of a
>>>>> window, but we only need to keep it for the time period of the window
>>>> (i.e.
>>>>> a second). I was thinking this could be achieved by using a persistent
>>>>> store, where the counts are reset during the punctuate and the store
>>>> topic
>>>>> uses log compression. Alternatively, we could simple have an in memory
>>>>> store that is reset during the punctuate. My concern with the in memory
>>>>> store is that I don't know when the input topic offset is committed or
>>>> when
>>>>> the output data is written and therefore we could lose data.
>> Ultimately,
>>>> at
>>>>> the end of the second, the input offset and output data should be
>> written
>>>>> at the same time, reducing the likelihood of lost data. We would rather
>>>>> lose data, than have duplicate counts. What is the correct approach? Is
>>>>> there a better way of tackling the problem?
>>>>>
>>>>> I have put together some code, but it doesn't do exactly what I expect.
>>>> I'm
>>>>> happy to share if it helps.
>>>>>
>>>>> Thanks,
>>>>> Ben
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: Aggregated windowed counts

Posted by Benjamin Black <be...@gmail.com>.
I understand now. The commit triggers the output of the window data,
whether or not the window is complete. For example, if I use .print() as
you suggest:

[KSTREAM-AGGREGATE-0000000003]: [kafka@1483631920000] , (9<-null)
[KSTREAM-AGGREGATE-0000000003]: [kafka@1483631925000] , (5<-null)
[KSTREAM-AGGREGATE-0000000003]: [kafka@1483631925000] , (9<-null)
[KSTREAM-AGGREGATE-0000000003]: [kafka@1483631930000] , (2<-null)

The second line is an intermediate result of the third line. I suppose this
is fine if we are storing the count with the time window, but not if we are
trying to do a total count of the each word. I'm guessing the only solution
is a handcrafted solution using the lower level API as suggested in the
stackoverflow post.

I have another question concerning how the Count ktable data is stored. If
I understand correctly, on restart the process will re-create the state of
the ktable by reading from the beginning the Count topic
(wordcount-lambda-example-Counts-changelog). Over time wouldn't this be a
lot of data? Or is there some mechanism used to only read from a position
near the end?

On Wed, Jan 4, 2017 at 7:35 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> There is no such thing as a final window aggregate and you might see
> intermediate results -- thus the count do not add up.
>
> Please have a look here:
>
>
> http://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable/38945277#38945277
>
> and here:
>
>
> http://docs.confluent.io/current/streams/developer-guide.html#memory-management
>
>
> On each commit, the current intermediate result will be flushed from the
> de-duplication cache -- thus, for smaller commit interval you see more
> intermediate results and thus it seems to be more off.
>
> In .toStream((k, v) -> k.key()) you get rid of the window-id -- if you
> keep it, you can see which result record belong to the same window. The
> simplest way for testing would be to use .print() instead of .toStream()
> to see the key as window-id plus record-key.
>
>
> -Matthias
>
>
> On 1/4/17 2:09 PM, Benjamin Black wrote:
> > I'm hoping the DSL will do what I want :) Currently the example is
> > continuously adding instead of bucketing, so if I modify it by adding a
> > window to the count function:
> >
> > .groupBy((key, word) -> word)
> > .count(TimeWindows.of(5000L), "Counts")
> > .toStream((k, v) -> k.key());
> >
> > Then I do see bucketing happening. However, it isn't accurate. For
> example,
> > I type into the console "kafka" as 20 sentences, but the output I get is:
> >
> > kafka 4
> > kafka 9
> > kafka 2
> > kafka 7
> >
> > Which equals 22. What am I doing wrong? What is the relationship between
> > commit interval and time window. The smaller I make commit interval, the
> > less accurate it becomes.
> >
> >
> > On Wed, Jan 4, 2017 at 3:53 PM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> Do you know about Kafka Streams? It's DSL gives you exactly what you
> >> want to do.
> >>
> >> Check out the documentation and WordCount example:
> >>
> >> http://docs.confluent.io/current/streams/index.html
> >>
> >>
> https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java
> >>
> >>
> >> Let us know if you have further questions.
> >>
> >>
> >> -Matthias
> >>
> >> On 1/4/17 12:48 PM, Benjamin Black wrote:
> >>> Hello,
> >>>
> >>> I'm looking for guidance on how to approach a counting problem. We want
> >> to
> >>> consume a stream of data that consists of IDs and generate an output of
> >> the
> >>> aggregated count with a window size of X seconds using processing time
> >> and
> >>> a hopping time window. For example, using a window size of 1 second, if
> >> we
> >>> get IDs 1, 2, 2, 2 in the 1st second, then the output would be 1=1,
> 2=3.
> >> If
> >>> we get IDs 1, 3, 3 in the 2nd second then the output would be 1=1, 3=2.
> >> The
> >>> aggregated count will then be turned into increment commands to a cache
> >> and
> >>> a database.
> >>>
> >>> Obviously we will need some state to be stored during the count of a
> >>> window, but we only need to keep it for the time period of the window
> >> (i.e.
> >>> a second). I was thinking this could be achieved by using a persistent
> >>> store, where the counts are reset during the punctuate and the store
> >> topic
> >>> uses log compression. Alternatively, we could simple have an in memory
> >>> store that is reset during the punctuate. My concern with the in memory
> >>> store is that I don't know when the input topic offset is committed or
> >> when
> >>> the output data is written and therefore we could lose data.
> Ultimately,
> >> at
> >>> the end of the second, the input offset and output data should be
> written
> >>> at the same time, reducing the likelihood of lost data. We would rather
> >>> lose data, than have duplicate counts. What is the correct approach? Is
> >>> there a better way of tackling the problem?
> >>>
> >>> I have put together some code, but it doesn't do exactly what I expect.
> >> I'm
> >>> happy to share if it helps.
> >>>
> >>> Thanks,
> >>> Ben
> >>>
> >>
> >>
> >
>
>

Re: Aggregated windowed counts

Posted by "Matthias J. Sax" <ma...@confluent.io>.
There is no such thing as a final window aggregate and you might see
intermediate results -- thus the count do not add up.

Please have a look here:

http://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable/38945277#38945277

and here:

http://docs.confluent.io/current/streams/developer-guide.html#memory-management


On each commit, the current intermediate result will be flushed from the
de-duplication cache -- thus, for smaller commit interval you see more
intermediate results and thus it seems to be more off.

In .toStream((k, v) -> k.key()) you get rid of the window-id -- if you
keep it, you can see which result record belong to the same window. The
simplest way for testing would be to use .print() instead of .toStream()
to see the key as window-id plus record-key.


-Matthias


On 1/4/17 2:09 PM, Benjamin Black wrote:
> I'm hoping the DSL will do what I want :) Currently the example is
> continuously adding instead of bucketing, so if I modify it by adding a
> window to the count function:
> 
> .groupBy((key, word) -> word)
> .count(TimeWindows.of(5000L), "Counts")
> .toStream((k, v) -> k.key());
> 
> Then I do see bucketing happening. However, it isn't accurate. For example,
> I type into the console "kafka" as 20 sentences, but the output I get is:
> 
> kafka 4
> kafka 9
> kafka 2
> kafka 7
> 
> Which equals 22. What am I doing wrong? What is the relationship between
> commit interval and time window. The smaller I make commit interval, the
> less accurate it becomes.
> 
> 
> On Wed, Jan 4, 2017 at 3:53 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Do you know about Kafka Streams? It's DSL gives you exactly what you
>> want to do.
>>
>> Check out the documentation and WordCount example:
>>
>> http://docs.confluent.io/current/streams/index.html
>>
>> https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java
>>
>>
>> Let us know if you have further questions.
>>
>>
>> -Matthias
>>
>> On 1/4/17 12:48 PM, Benjamin Black wrote:
>>> Hello,
>>>
>>> I'm looking for guidance on how to approach a counting problem. We want
>> to
>>> consume a stream of data that consists of IDs and generate an output of
>> the
>>> aggregated count with a window size of X seconds using processing time
>> and
>>> a hopping time window. For example, using a window size of 1 second, if
>> we
>>> get IDs 1, 2, 2, 2 in the 1st second, then the output would be 1=1, 2=3.
>> If
>>> we get IDs 1, 3, 3 in the 2nd second then the output would be 1=1, 3=2.
>> The
>>> aggregated count will then be turned into increment commands to a cache
>> and
>>> a database.
>>>
>>> Obviously we will need some state to be stored during the count of a
>>> window, but we only need to keep it for the time period of the window
>> (i.e.
>>> a second). I was thinking this could be achieved by using a persistent
>>> store, where the counts are reset during the punctuate and the store
>> topic
>>> uses log compression. Alternatively, we could simple have an in memory
>>> store that is reset during the punctuate. My concern with the in memory
>>> store is that I don't know when the input topic offset is committed or
>> when
>>> the output data is written and therefore we could lose data. Ultimately,
>> at
>>> the end of the second, the input offset and output data should be written
>>> at the same time, reducing the likelihood of lost data. We would rather
>>> lose data, than have duplicate counts. What is the correct approach? Is
>>> there a better way of tackling the problem?
>>>
>>> I have put together some code, but it doesn't do exactly what I expect.
>> I'm
>>> happy to share if it helps.
>>>
>>> Thanks,
>>> Ben
>>>
>>
>>
> 


Re: Aggregated windowed counts

Posted by Benjamin Black <be...@gmail.com>.
I'm hoping the DSL will do what I want :) Currently the example is
continuously adding instead of bucketing, so if I modify it by adding a
window to the count function:

.groupBy((key, word) -> word)
.count(TimeWindows.of(5000L), "Counts")
.toStream((k, v) -> k.key());

Then I do see bucketing happening. However, it isn't accurate. For example,
I type into the console "kafka" as 20 sentences, but the output I get is:

kafka 4
kafka 9
kafka 2
kafka 7

Which equals 22. What am I doing wrong? What is the relationship between
commit interval and time window. The smaller I make commit interval, the
less accurate it becomes.


On Wed, Jan 4, 2017 at 3:53 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> Do you know about Kafka Streams? It's DSL gives you exactly what you
> want to do.
>
> Check out the documentation and WordCount example:
>
> http://docs.confluent.io/current/streams/index.html
>
> https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java
>
>
> Let us know if you have further questions.
>
>
> -Matthias
>
> On 1/4/17 12:48 PM, Benjamin Black wrote:
> > Hello,
> >
> > I'm looking for guidance on how to approach a counting problem. We want
> to
> > consume a stream of data that consists of IDs and generate an output of
> the
> > aggregated count with a window size of X seconds using processing time
> and
> > a hopping time window. For example, using a window size of 1 second, if
> we
> > get IDs 1, 2, 2, 2 in the 1st second, then the output would be 1=1, 2=3.
> If
> > we get IDs 1, 3, 3 in the 2nd second then the output would be 1=1, 3=2.
> The
> > aggregated count will then be turned into increment commands to a cache
> and
> > a database.
> >
> > Obviously we will need some state to be stored during the count of a
> > window, but we only need to keep it for the time period of the window
> (i.e.
> > a second). I was thinking this could be achieved by using a persistent
> > store, where the counts are reset during the punctuate and the store
> topic
> > uses log compression. Alternatively, we could simple have an in memory
> > store that is reset during the punctuate. My concern with the in memory
> > store is that I don't know when the input topic offset is committed or
> when
> > the output data is written and therefore we could lose data. Ultimately,
> at
> > the end of the second, the input offset and output data should be written
> > at the same time, reducing the likelihood of lost data. We would rather
> > lose data, than have duplicate counts. What is the correct approach? Is
> > there a better way of tackling the problem?
> >
> > I have put together some code, but it doesn't do exactly what I expect.
> I'm
> > happy to share if it helps.
> >
> > Thanks,
> > Ben
> >
>
>

Re: Aggregated windowed counts

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Do you know about Kafka Streams? It's DSL gives you exactly what you
want to do.

Check out the documentation and WordCount example:

http://docs.confluent.io/current/streams/index.html
https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java


Let us know if you have further questions.


-Matthias

On 1/4/17 12:48 PM, Benjamin Black wrote:
> Hello,
> 
> I'm looking for guidance on how to approach a counting problem. We want to
> consume a stream of data that consists of IDs and generate an output of the
> aggregated count with a window size of X seconds using processing time and
> a hopping time window. For example, using a window size of 1 second, if we
> get IDs 1, 2, 2, 2 in the 1st second, then the output would be 1=1, 2=3. If
> we get IDs 1, 3, 3 in the 2nd second then the output would be 1=1, 3=2. The
> aggregated count will then be turned into increment commands to a cache and
> a database.
> 
> Obviously we will need some state to be stored during the count of a
> window, but we only need to keep it for the time period of the window (i.e.
> a second). I was thinking this could be achieved by using a persistent
> store, where the counts are reset during the punctuate and the store topic
> uses log compression. Alternatively, we could simple have an in memory
> store that is reset during the punctuate. My concern with the in memory
> store is that I don't know when the input topic offset is committed or when
> the output data is written and therefore we could lose data. Ultimately, at
> the end of the second, the input offset and output data should be written
> at the same time, reducing the likelihood of lost data. We would rather
> lose data, than have duplicate counts. What is the correct approach? Is
> there a better way of tackling the problem?
> 
> I have put together some code, but it doesn't do exactly what I expect. I'm
> happy to share if it helps.
> 
> Thanks,
> Ben
>