You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sicheng Liu <lz...@gmail.com> on 2018/07/10 02:27:45 UTC

flatMapValues does not calculate timestamp for each record generated

Hi,

I found out that doing windowed aggregation on records generated by
flatMapValues gets incorrect result.

Take this topology as an example:

myStream.flatMapValues(...)
                .groupByKey(...)
                .aggregate(...)

Since inside KStreamWindowAggregateProcessor, the timestamp used to query
the aggregation window is retrieved by context.timestamp(). However,
flatMapValues does not reset recordContext, which gives the wrong result if
the new records generated by flatMapValues fall into different windows.

Not sure this is a bug or by-design.

It feels like to me that the flatMapValues is somewhat counter-intuitive in
this case and the aggregate processor should extract timestamps from the
records being aggregated instead of retrieving from the context.

Thanks,
Sicheng

Re: flatMapValues does not calculate timestamp for each record generated

Posted by "Matthias J. Sax" <ma...@confluent.io>.
By default, Kafka uses the record metadata timestamp. Thus, I assume
that you use a custom timestamp extractor? If yes, what timestamp do you
extract, as it seems you have multiple?

To make it work, you will need to write the data back to Kafka into a
new topic after the flatpMap() and read each record back individually.
This allows you, via a custom timestamp extractor, to get the timestamp
from the value.

stream.flatMap(...).to("myTopic");
builder.stream("myTopic", Consumed.with( /* pass in custom timestamp
extractor */)).



Hope this helps.

-Matthias

On 7/10/18 10:32 AM, Sicheng Liu wrote:
> This is a specific example:
> 
> We are sending metrics to Kafka Stream with the following layout:
> 
> Record Key:
> --------------------------------------
> |    metric name    |    tags    |
> --------------------------------------
> 
> Record Value:
> ----------------------------------------------------------------------------
> ------------------------------------------------------
> |    timestamp1    |    value1    |    timestamp2    |    value2    |
> ...    |    timestampN    |    valueN    |
> ----------------------------------------------------------------------------
> ------------------------------------------------------
> 
> The record value contains a list of timestamps and values. The timestamps
> may have a range of 10 minutes.
> 
> You may ask why we do this, the reason is to save Kafka disk space by
> avoiding sending duplicated the large record key. (we know Kafka can do
> compaction on producer side, but this format with compaction uses less
> space).
> 
> So, in this case, we want to aggregate metrics into 5-minute window and we
> used faltMapValues to flat the metric series into individual metrics:
> 
> myStream.flatMapValues(...)
>                 .groupByKey(...)
>                 .aggregate(...)
> 
> Because with flatMapValues, all flatten metrics share the same timestamp.
> They are all aggregated into the same window (instead of two), which gives
> incorrect result.
> 
> But when I look at the API document, I had the impression that it would
> behave like sending individual metrics.
> 
> Hopefully I gave a better explanation of what I try to achieve.
> 
> Thanks,
> Sicheng
> 
> On Tue, Jul 10, 2018 at 9:32 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Not sure what you mean by "does not reset recordContext".
>>
>> Note, that the "contract" for `flatMapValues` is that the output records
>> inherit the timestamp of the input record.
>>
>> Not sure what behavior you expect? Maybe you can elaborate?
>>
>>
>> -Matthias
>>
>> On 7/9/18 7:27 PM, Sicheng Liu wrote:
>>> Hi,
>>>
>>> I found out that doing windowed aggregation on records generated by
>>> flatMapValues gets incorrect result.
>>>
>>> Take this topology as an example:
>>>
>>> myStream.flatMapValues(...)
>>>                 .groupByKey(...)
>>>                 .aggregate(...)
>>>
>>> Since inside KStreamWindowAggregateProcessor, the timestamp used to
>> query
>>> the aggregation window is retrieved by context.timestamp(). However,
>>> flatMapValues does not reset recordContext, which gives the wrong result
>> if
>>> the new records generated by flatMapValues fall into different windows.
>>>
>>> Not sure this is a bug or by-design.
>>>
>>> It feels like to me that the flatMapValues is somewhat counter-intuitive
>> in
>>> this case and the aggregate processor should extract timestamps from the
>>> records being aggregated instead of retrieving from the context.
>>>
>>> Thanks,
>>> Sicheng
>>>
>>
>>
> 


Re: flatMapValues does not calculate timestamp for each record generated

Posted by Sicheng Liu <lz...@gmail.com>.
This is a specific example:

We are sending metrics to Kafka Stream with the following layout:

Record Key:
--------------------------------------
|    metric name    |    tags    |
--------------------------------------

Record Value:
----------------------------------------------------------------------------
------------------------------------------------------
|    timestamp1    |    value1    |    timestamp2    |    value2    |
...    |    timestampN    |    valueN    |
----------------------------------------------------------------------------
------------------------------------------------------

The record value contains a list of timestamps and values. The timestamps
may have a range of 10 minutes.

You may ask why we do this, the reason is to save Kafka disk space by
avoiding sending duplicated the large record key. (we know Kafka can do
compaction on producer side, but this format with compaction uses less
space).

So, in this case, we want to aggregate metrics into 5-minute window and we
used faltMapValues to flat the metric series into individual metrics:

myStream.flatMapValues(...)
                .groupByKey(...)
                .aggregate(...)

Because with flatMapValues, all flatten metrics share the same timestamp.
They are all aggregated into the same window (instead of two), which gives
incorrect result.

But when I look at the API document, I had the impression that it would
behave like sending individual metrics.

Hopefully I gave a better explanation of what I try to achieve.

Thanks,
Sicheng

On Tue, Jul 10, 2018 at 9:32 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Not sure what you mean by "does not reset recordContext".
>
> Note, that the "contract" for `flatMapValues` is that the output records
> inherit the timestamp of the input record.
>
> Not sure what behavior you expect? Maybe you can elaborate?
>
>
> -Matthias
>
> On 7/9/18 7:27 PM, Sicheng Liu wrote:
> > Hi,
> >
> > I found out that doing windowed aggregation on records generated by
> > flatMapValues gets incorrect result.
> >
> > Take this topology as an example:
> >
> > myStream.flatMapValues(...)
> >                 .groupByKey(...)
> >                 .aggregate(...)
> >
> > Since inside KStreamWindowAggregateProcessor, the timestamp used to
> query
> > the aggregation window is retrieved by context.timestamp(). However,
> > flatMapValues does not reset recordContext, which gives the wrong result
> if
> > the new records generated by flatMapValues fall into different windows.
> >
> > Not sure this is a bug or by-design.
> >
> > It feels like to me that the flatMapValues is somewhat counter-intuitive
> in
> > this case and the aggregate processor should extract timestamps from the
> > records being aggregated instead of retrieving from the context.
> >
> > Thanks,
> > Sicheng
> >
>
>

Re: flatMapValues does not calculate timestamp for each record generated

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Not sure what you mean by "does not reset recordContext".

Note, that the "contract" for `flatMapValues` is that the output records
inherit the timestamp of the input record.

Not sure what behavior you expect? Maybe you can elaborate?


-Matthias

On 7/9/18 7:27 PM, Sicheng Liu wrote:
> Hi,
> 
> I found out that doing windowed aggregation on records generated by
> flatMapValues gets incorrect result.
> 
> Take this topology as an example:
> 
> myStream.flatMapValues(...)
>                 .groupByKey(...)
>                 .aggregate(...)
> 
> Since inside KStreamWindowAggregateProcessor, the timestamp used to query
> the aggregation window is retrieved by context.timestamp(). However,
> flatMapValues does not reset recordContext, which gives the wrong result if
> the new records generated by flatMapValues fall into different windows.
> 
> Not sure this is a bug or by-design.
> 
> It feels like to me that the flatMapValues is somewhat counter-intuitive in
> this case and the aggregate processor should extract timestamps from the
> records being aggregated instead of retrieving from the context.
> 
> Thanks,
> Sicheng
>