You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jon Yeargers <jo...@cedexis.com> on 2016/11/29 16:17:45 UTC

KStream window - end value out of bounds

Using the following topology:

KStream<String,SumRecord> kStream =
 kStreamBuilder.stream(stringSerde,transSerde,TOPIC);
        KTable<Windowed<String>, SumRecordCollector> ktAgg =
kStream.groupByKey().aggregate(
                SumRecordCollector::new,
                new Aggregate(),
                TimeWindows.of(20 * 60 * 1000).advanceBy(60 * 1000),
                cSerde, "table_stream");


When looking at windows as follows:

           ktAgg.toStream().foreach((postKey, postValue) -> {
                 LOGGER.debug("start: {}   end: {}",
postkey.window().start(), postkey.window().end());
           }

The 'start' values are coming through properly incremented but the 'end'
values are all 9223372036854775807.

Is there something wrong with my topology? Some other bug that would cause
this?

Re: KStream window - end value out of bounds

Posted by Guozhang Wang <wa...@gmail.com>.
I agree that we should fix the "end timestamp" in windows after calling
WindowedDeserializer, created
https://issues.apache.org/jira/browse/KAFKA-4468 for it.

As for Jon's observed issue that some records seem aggregated into
incorrect windows, we are interested in the observed behavior that was
unexpected?


Guozhang

On Tue, Nov 29, 2016 at 11:11 AM, Jon Yeargers <jo...@cedexis.com>
wrote:

> Seems straightforward enough: I have a 'foreach' after my windowed
> aggregation and I see values like these come out:
>
> (window) start: 1480444200000  end: 1480445400000
>
>  (record) epoch='1480433282000'
>
>
> If I have a 20 minute window with a 1 minute 'step' I will see my record
> come out of the aggregation 20x - with different window start/end.
>
> On Tue, Nov 29, 2016 at 11:06 AM, Eno Thereska <en...@gmail.com>
> wrote:
>
> > Let us know if we can help with that, what problems are you seeing with
> > records in wrong windows?
> >
> > Eno
> >
> > > On 29 Nov 2016, at 19:02, Jon Yeargers <jo...@cedexis.com>
> wrote:
> > >
> > > I've been having problems with records appearing in windows that they
> > > clearly don't belong to. Was curious whether this was related but it
> > seems
> > > not. Bummer.
> > >
> > > On Tue, Nov 29, 2016 at 8:52 AM, Eno Thereska <en...@gmail.com>
> > > wrote:
> > >
> > >> Hi Jon,
> > >>
> > >> There is an optimization in org.apache.kafka.streams.
> kstream.internals.
> > >> WindowedSerializer/Deserializer where we don't encode and decode the
> > end
> > >> of the window since the user can always calculate it. So instead we
> > return
> > >> a default of Long.MAX_VALUE, which is the big number you see.
> > >>
> > >> In other words, use window().start() but not window().end() in this
> > case.
> > >> If you want to print both, just add the window size to
> window().start().
> > >>
> > >> Thanks
> > >> Eno
> > >>> On 29 Nov 2016, at 16:17, Jon Yeargers <jo...@cedexis.com>
> > wrote:
> > >>>
> > >>> Using the following topology:
> > >>>
> > >>> KStream<String,SumRecord> kStream =
> > >>> kStreamBuilder.stream(stringSerde,transSerde,TOPIC);
> > >>>       KTable<Windowed<String>, SumRecordCollector> ktAgg =
> > >>> kStream.groupByKey().aggregate(
> > >>>               SumRecordCollector::new,
> > >>>               new Aggregate(),
> > >>>               TimeWindows.of(20 * 60 * 1000).advanceBy(60 * 1000),
> > >>>               cSerde, "table_stream");
> > >>>
> > >>>
> > >>> When looking at windows as follows:
> > >>>
> > >>>          ktAgg.toStream().foreach((postKey, postValue) -> {
> > >>>                LOGGER.debug("start: {}   end: {}",
> > >>> postkey.window().start(), postkey.window().end());
> > >>>          }
> > >>>
> > >>> The 'start' values are coming through properly incremented but the
> > 'end'
> > >>> values are all 9223372036854775807.
> > >>>
> > >>> Is there something wrong with my topology? Some other bug that would
> > >> cause
> > >>> this?
> > >>
> > >>
> >
> >
>



-- 
-- Guozhang

Re: KStream window - end value out of bounds

Posted by Jon Yeargers <jo...@cedexis.com>.
Seems straightforward enough: I have a 'foreach' after my windowed
aggregation and I see values like these come out:

(window) start: 1480444200000  end: 1480445400000

 (record) epoch='1480433282000'


If I have a 20 minute window with a 1 minute 'step' I will see my record
come out of the aggregation 20x - with different window start/end.

On Tue, Nov 29, 2016 at 11:06 AM, Eno Thereska <en...@gmail.com>
wrote:

> Let us know if we can help with that, what problems are you seeing with
> records in wrong windows?
>
> Eno
>
> > On 29 Nov 2016, at 19:02, Jon Yeargers <jo...@cedexis.com> wrote:
> >
> > I've been having problems with records appearing in windows that they
> > clearly don't belong to. Was curious whether this was related but it
> seems
> > not. Bummer.
> >
> > On Tue, Nov 29, 2016 at 8:52 AM, Eno Thereska <en...@gmail.com>
> > wrote:
> >
> >> Hi Jon,
> >>
> >> There is an optimization in org.apache.kafka.streams.kstream.internals.
> >> WindowedSerializer/Deserializer where we don't encode and decode the
> end
> >> of the window since the user can always calculate it. So instead we
> return
> >> a default of Long.MAX_VALUE, which is the big number you see.
> >>
> >> In other words, use window().start() but not window().end() in this
> case.
> >> If you want to print both, just add the window size to window().start().
> >>
> >> Thanks
> >> Eno
> >>> On 29 Nov 2016, at 16:17, Jon Yeargers <jo...@cedexis.com>
> wrote:
> >>>
> >>> Using the following topology:
> >>>
> >>> KStream<String,SumRecord> kStream =
> >>> kStreamBuilder.stream(stringSerde,transSerde,TOPIC);
> >>>       KTable<Windowed<String>, SumRecordCollector> ktAgg =
> >>> kStream.groupByKey().aggregate(
> >>>               SumRecordCollector::new,
> >>>               new Aggregate(),
> >>>               TimeWindows.of(20 * 60 * 1000).advanceBy(60 * 1000),
> >>>               cSerde, "table_stream");
> >>>
> >>>
> >>> When looking at windows as follows:
> >>>
> >>>          ktAgg.toStream().foreach((postKey, postValue) -> {
> >>>                LOGGER.debug("start: {}   end: {}",
> >>> postkey.window().start(), postkey.window().end());
> >>>          }
> >>>
> >>> The 'start' values are coming through properly incremented but the
> 'end'
> >>> values are all 9223372036854775807.
> >>>
> >>> Is there something wrong with my topology? Some other bug that would
> >> cause
> >>> this?
> >>
> >>
>
>

Re: KStream window - end value out of bounds

Posted by Eno Thereska <en...@gmail.com>.
Let us know if we can help with that, what problems are you seeing with records in wrong windows?

Eno

> On 29 Nov 2016, at 19:02, Jon Yeargers <jo...@cedexis.com> wrote:
> 
> I've been having problems with records appearing in windows that they
> clearly don't belong to. Was curious whether this was related but it seems
> not. Bummer.
> 
> On Tue, Nov 29, 2016 at 8:52 AM, Eno Thereska <en...@gmail.com>
> wrote:
> 
>> Hi Jon,
>> 
>> There is an optimization in org.apache.kafka.streams.kstream.internals.
>> WindowedSerializer/Deserializer where we don't encode and decode the end
>> of the window since the user can always calculate it. So instead we return
>> a default of Long.MAX_VALUE, which is the big number you see.
>> 
>> In other words, use window().start() but not window().end() in this case.
>> If you want to print both, just add the window size to window().start().
>> 
>> Thanks
>> Eno
>>> On 29 Nov 2016, at 16:17, Jon Yeargers <jo...@cedexis.com> wrote:
>>> 
>>> Using the following topology:
>>> 
>>> KStream<String,SumRecord> kStream =
>>> kStreamBuilder.stream(stringSerde,transSerde,TOPIC);
>>>       KTable<Windowed<String>, SumRecordCollector> ktAgg =
>>> kStream.groupByKey().aggregate(
>>>               SumRecordCollector::new,
>>>               new Aggregate(),
>>>               TimeWindows.of(20 * 60 * 1000).advanceBy(60 * 1000),
>>>               cSerde, "table_stream");
>>> 
>>> 
>>> When looking at windows as follows:
>>> 
>>>          ktAgg.toStream().foreach((postKey, postValue) -> {
>>>                LOGGER.debug("start: {}   end: {}",
>>> postkey.window().start(), postkey.window().end());
>>>          }
>>> 
>>> The 'start' values are coming through properly incremented but the 'end'
>>> values are all 9223372036854775807.
>>> 
>>> Is there something wrong with my topology? Some other bug that would
>> cause
>>> this?
>> 
>> 


Re: KStream window - end value out of bounds

Posted by Jon Yeargers <jo...@cedexis.com>.
I've been having problems with records appearing in windows that they
clearly don't belong to. Was curious whether this was related but it seems
not. Bummer.

On Tue, Nov 29, 2016 at 8:52 AM, Eno Thereska <en...@gmail.com>
wrote:

> Hi Jon,
>
> There is an optimization in org.apache.kafka.streams.kstream.internals.
> WindowedSerializer/Deserializer where we don't encode and decode the end
> of the window since the user can always calculate it. So instead we return
> a default of Long.MAX_VALUE, which is the big number you see.
>
> In other words, use window().start() but not window().end() in this case.
> If you want to print both, just add the window size to window().start().
>
> Thanks
> Eno
> > On 29 Nov 2016, at 16:17, Jon Yeargers <jo...@cedexis.com> wrote:
> >
> > Using the following topology:
> >
> > KStream<String,SumRecord> kStream =
> > kStreamBuilder.stream(stringSerde,transSerde,TOPIC);
> >        KTable<Windowed<String>, SumRecordCollector> ktAgg =
> > kStream.groupByKey().aggregate(
> >                SumRecordCollector::new,
> >                new Aggregate(),
> >                TimeWindows.of(20 * 60 * 1000).advanceBy(60 * 1000),
> >                cSerde, "table_stream");
> >
> >
> > When looking at windows as follows:
> >
> >           ktAgg.toStream().foreach((postKey, postValue) -> {
> >                 LOGGER.debug("start: {}   end: {}",
> > postkey.window().start(), postkey.window().end());
> >           }
> >
> > The 'start' values are coming through properly incremented but the 'end'
> > values are all 9223372036854775807.
> >
> > Is there something wrong with my topology? Some other bug that would
> cause
> > this?
>
>

Re: KStream window - end value out of bounds

Posted by Eno Thereska <en...@gmail.com>.
Hi Jon,

There is an optimization in org.apache.kafka.streams.kstream.internals.WindowedSerializer/Deserializer where we don't encode and decode the end of the window since the user can always calculate it. So instead we return a default of Long.MAX_VALUE, which is the big number you see.

In other words, use window().start() but not window().end() in this case. If you want to print both, just add the window size to window().start().

Thanks
Eno
> On 29 Nov 2016, at 16:17, Jon Yeargers <jo...@cedexis.com> wrote:
> 
> Using the following topology:
> 
> KStream<String,SumRecord> kStream =
> kStreamBuilder.stream(stringSerde,transSerde,TOPIC);
>        KTable<Windowed<String>, SumRecordCollector> ktAgg =
> kStream.groupByKey().aggregate(
>                SumRecordCollector::new,
>                new Aggregate(),
>                TimeWindows.of(20 * 60 * 1000).advanceBy(60 * 1000),
>                cSerde, "table_stream");
> 
> 
> When looking at windows as follows:
> 
>           ktAgg.toStream().foreach((postKey, postValue) -> {
>                 LOGGER.debug("start: {}   end: {}",
> postkey.window().start(), postkey.window().end());
>           }
> 
> The 'start' values are coming through properly incremented but the 'end'
> values are all 9223372036854775807.
> 
> Is there something wrong with my topology? Some other bug that would cause
> this?