You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jonathan Santilli <jo...@gmail.com> on 2019/07/05 13:25:02 UTC

Re: org.apache.kafka.streams.processor.TimestampExtractor#extract method in version 2.3 always returns -1 as value

Thanks a lot Bill for creating the issue, I have updated it with a little
bit more of info.

Cheers!
--
Jonathan




On Fri, Jun 28, 2019 at 9:21 PM Bill Bejeck <bi...@confluent.io> wrote:

> Jonathan, Matthias
>
> I've created a Jira for this issue
> https://issues.apache.org/jira/browse/KAFKA-8615.
>
> Jonathan, I plan to work on this when I get back from vacation on 7/8.  If
> you would like to work in this yourself before that, feel free to do so and
> assign the ticket to yourself.
>
> Thanks,
> Bill
>
> On Thu, Jun 27, 2019 at 1:38 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Sounds like a regression to me.
> >
> > We did change some code to track partition time differently. Can you
> > open a Jira?
> >
> >
> > -Matthias
> >
> > On 6/26/19 7:58 AM, Jonathan Santilli wrote:
> > > Sure Bill, sure, is the same code I have reported the issue for the
> > > suppress some months ago:
> > >
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> > >
> > > In fact, I have reported at that moment, that after restarting the app,
> > the
> > > suppress was sending again downstream the already processed records.
> > > Now, with the version 2.2.1+ after restarting the app, the
> > > aggregation/suppress (do not know exactly where) is missing some
> records
> > to
> > > be aggregated, even though they are in the input topic.
> > >
> > > Kafka Version 2.3
> > >
> > > *public* *class* OwnTimeExtractor *implements* TimestampExtractor {
> > >
> > >     @Override
> > >
> > >     *public* *long* extract(*final* ConsumerRecord<Object, Object>
> > record,
> > > *final* *long* previousTimestamp) {
> > >
> > >
> > >         *// *previousTimestamp is always == -1
> > >
> > >     }
> > > }
> > >
> > > final StreamsBuilder builder = new StreamsBuilder();
> > > final KStream<..., ...> events = builder
> > >         .stream(inputTopicNames, Consumed.with(..., ...)
> > >         .withTimestampExtractor(new OwnTimeExtractor());
> > >
> > > events
> > >     .filter((k, v) -> ...)
> > >     .flatMapValues(v -> ...)
> > >     .flatMapValues(v -> ...)
> > >     .selectKey((k, v) -> v)
> > >     .groupByKey(Grouped.with(..., ...))
> > >     .windowedBy(
> > >         TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
> > >             .advanceBy(Duration.ofSeconds(windowSizeInSecs))
> > >             .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
> > >     .reduce((agg, new) -> {
> > >         ...
> > >         return agg;
> > >     })
> > >
> >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > >     .toStream()
> > >     .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
> > >
> > >
> > >
> > > On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <bi...@confluent.io> wrote:
> > >
> > >> Thanks for the reply Jonathan.
> > >>
> > >> Are you in a position to share your code so I can try to reproduce on
> my
> > >> end?
> > >>
> > >> -Bill
> > >>
> > >>
> > >> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli <
> > >> jonathansantilli@gmail.com> wrote:
> > >>
> > >>> Hello Bill,
> > >>>
> > >>> am implementing the TimestampExtractor Interface, then using it to
> > >> consume,
> > >>> like:
> > >>>
> > >>> *final* KStream<..., ...> events = builder.stream(inputTopicList,
> > >> Consumed.
> > >>> *with*(keySerde, valueSerde).withTimestampExtractor(*new
> > >> *OwnTimeExtractor(
> > >>> ...)));
> > >>>
> > >>> Am not setting the default.timestamp.extractor config value.
> > >>>
> > >>> Cheers!
> > >>> --
> > >>> Jonathan
> > >>>
> > >>>
> > >>> On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <bi...@confluent.io>
> wrote:
> > >>>
> > >>>> Hi Jonathan,
> > >>>>
> > >>>> Thanks for reporting this.  Which timestamp extractor are you using
> in
> > >>> the
> > >>>> configs?
> > >>>>
> > >>>> Thanks,
> > >>>> Bill
> > >>>>
> > >>>> On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli <
> > >>>> jonathansantilli@gmail.com> wrote:
> > >>>>
> > >>>>> Hello, hope you all are doing well,
> > >>>>>
> > >>>>> am testing the new version 2.3 for Kafka Streams specifically. I
> have
> > >>>>> noticed that now, the implementation of the method extract from the
> > >>>>> interface org.apache.kafka.streams.processor.TimestampExtractor
> > >>>>>
> > >>>>> *public* *long* extract(ConsumerRecord<Object, Object> record,
> *long*
> > >>>>> previousTimestamp)
> > >>>>>
> > >>>>>
> > >>>>> is always returning -1 as value.
> > >>>>>
> > >>>>>
> > >>>>> Previous version 2.2.1 was returning the correct value for the
> record
> > >>>>> partition.
> > >>>>>
> > >>>>> Am aware the interface is market as @InterfaceStability.Evolving
> and
> > >> we
> > >>>>> should not rely on the stability/compatibility. Am just wondering
> if
> > >>> that
> > >>>>> new behavior is intentional or is a bug.
> > >>>>>
> > >>>>>
> > >>>>> Cheers!
> > >>>>> --
> > >>>>> Santilli Jonathan
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>> --
> > >>> Santilli Jonathan
> > >>>
> > >>
> > >
> > >
> >
> >
>


-- 
Santilli Jonathan

Re: org.apache.kafka.streams.processor.TimestampExtractor#extract method in version 2.3 always returns -1 as value

Posted by Sophie Blee-Goldman <so...@confluent.io>.
Thanks for the notice Jonathan! We tracked down the problem and it should
be an easy fix: https://github.com/apache/kafka/pull/6719/files

On Fri, Jul 5, 2019 at 6:25 AM Jonathan Santilli <jo...@gmail.com>
wrote:

> Thanks a lot Bill for creating the issue, I have updated it with a little
> bit more of info.
>
> Cheers!
> --
> Jonathan
>
>
>
>
> On Fri, Jun 28, 2019 at 9:21 PM Bill Bejeck <bi...@confluent.io> wrote:
>
> > Jonathan, Matthias
> >
> > I've created a Jira for this issue
> > https://issues.apache.org/jira/browse/KAFKA-8615.
> >
> > Jonathan, I plan to work on this when I get back from vacation on 7/8.
> If
> > you would like to work in this yourself before that, feel free to do so
> and
> > assign the ticket to yourself.
> >
> > Thanks,
> > Bill
> >
> > On Thu, Jun 27, 2019 at 1:38 PM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > Sounds like a regression to me.
> > >
> > > We did change some code to track partition time differently. Can you
> > > open a Jira?
> > >
> > >
> > > -Matthias
> > >
> > > On 6/26/19 7:58 AM, Jonathan Santilli wrote:
> > > > Sure Bill, sure, is the same code I have reported the issue for the
> > > > suppress some months ago:
> > > >
> > >
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> > > >
> > > > In fact, I have reported at that moment, that after restarting the
> app,
> > > the
> > > > suppress was sending again downstream the already processed records.
> > > > Now, with the version 2.2.1+ after restarting the app, the
> > > > aggregation/suppress (do not know exactly where) is missing some
> > records
> > > to
> > > > be aggregated, even though they are in the input topic.
> > > >
> > > > Kafka Version 2.3
> > > >
> > > > *public* *class* OwnTimeExtractor *implements* TimestampExtractor {
> > > >
> > > >     @Override
> > > >
> > > >     *public* *long* extract(*final* ConsumerRecord<Object, Object>
> > > record,
> > > > *final* *long* previousTimestamp) {
> > > >
> > > >
> > > >         *// *previousTimestamp is always == -1
> > > >
> > > >     }
> > > > }
> > > >
> > > > final StreamsBuilder builder = new StreamsBuilder();
> > > > final KStream<..., ...> events = builder
> > > >         .stream(inputTopicNames, Consumed.with(..., ...)
> > > >         .withTimestampExtractor(new OwnTimeExtractor());
> > > >
> > > > events
> > > >     .filter((k, v) -> ...)
> > > >     .flatMapValues(v -> ...)
> > > >     .flatMapValues(v -> ...)
> > > >     .selectKey((k, v) -> v)
> > > >     .groupByKey(Grouped.with(..., ...))
> > > >     .windowedBy(
> > > >         TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
> > > >             .advanceBy(Duration.ofSeconds(windowSizeInSecs))
> > > >             .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
> > > >     .reduce((agg, new) -> {
> > > >         ...
> > > >         return agg;
> > > >     })
> > > >
> > >
> >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > >     .toStream()
> > > >     .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
> > > >
> > > >
> > > >
> > > > On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <bi...@confluent.io>
> wrote:
> > > >
> > > >> Thanks for the reply Jonathan.
> > > >>
> > > >> Are you in a position to share your code so I can try to reproduce
> on
> > my
> > > >> end?
> > > >>
> > > >> -Bill
> > > >>
> > > >>
> > > >> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli <
> > > >> jonathansantilli@gmail.com> wrote:
> > > >>
> > > >>> Hello Bill,
> > > >>>
> > > >>> am implementing the TimestampExtractor Interface, then using it to
> > > >> consume,
> > > >>> like:
> > > >>>
> > > >>> *final* KStream<..., ...> events = builder.stream(inputTopicList,
> > > >> Consumed.
> > > >>> *with*(keySerde, valueSerde).withTimestampExtractor(*new
> > > >> *OwnTimeExtractor(
> > > >>> ...)));
> > > >>>
> > > >>> Am not setting the default.timestamp.extractor config value.
> > > >>>
> > > >>> Cheers!
> > > >>> --
> > > >>> Jonathan
> > > >>>
> > > >>>
> > > >>> On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <bi...@confluent.io>
> > wrote:
> > > >>>
> > > >>>> Hi Jonathan,
> > > >>>>
> > > >>>> Thanks for reporting this.  Which timestamp extractor are you
> using
> > in
> > > >>> the
> > > >>>> configs?
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Bill
> > > >>>>
> > > >>>> On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli <
> > > >>>> jonathansantilli@gmail.com> wrote:
> > > >>>>
> > > >>>>> Hello, hope you all are doing well,
> > > >>>>>
> > > >>>>> am testing the new version 2.3 for Kafka Streams specifically. I
> > have
> > > >>>>> noticed that now, the implementation of the method extract from
> the
> > > >>>>> interface org.apache.kafka.streams.processor.TimestampExtractor
> > > >>>>>
> > > >>>>> *public* *long* extract(ConsumerRecord<Object, Object> record,
> > *long*
> > > >>>>> previousTimestamp)
> > > >>>>>
> > > >>>>>
> > > >>>>> is always returning -1 as value.
> > > >>>>>
> > > >>>>>
> > > >>>>> Previous version 2.2.1 was returning the correct value for the
> > record
> > > >>>>> partition.
> > > >>>>>
> > > >>>>> Am aware the interface is market as @InterfaceStability.Evolving
> > and
> > > >> we
> > > >>>>> should not rely on the stability/compatibility. Am just wondering
> > if
> > > >>> that
> > > >>>>> new behavior is intentional or is a bug.
> > > >>>>>
> > > >>>>>
> > > >>>>> Cheers!
> > > >>>>> --
> > > >>>>> Santilli Jonathan
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>> Santilli Jonathan
> > > >>>
> > > >>
> > > >
> > > >
> > >
> > >
> >
>
>
> --
> Santilli Jonathan
>