You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jonathan Santilli <jo...@gmail.com> on 2019/06/26 13:13:21 UTC

org.apache.kafka.streams.processor.TimestampExtractor#extract method in version 2.3 always returns -1 as value

Hello, hope you all are doing well,

am testing the new version 2.3 for Kafka Streams specifically. I have
noticed that now, the implementation of the method extract from the
interface org.apache.kafka.streams.processor.TimestampExtractor

*public* *long* extract(ConsumerRecord<Object, Object> record, *long*
previousTimestamp)


is always returning -1 as value.


Previous version 2.2.1 was returning the correct value for the record
partition.

Am aware the interface is market as @InterfaceStability.Evolving and we
should not rely on the stability/compatibility. Am just wondering if that
new behavior is intentional or is a bug.


Cheers!
-- 
Santilli Jonathan

Re: org.apache.kafka.streams.processor.TimestampExtractor#extract method in version 2.3 always returns -1 as value

Posted by Sophie Blee-Goldman <so...@confluent.io>.
Thanks for the notice Jonathan! We tracked down the problem and it should
be an easy fix: https://github.com/apache/kafka/pull/6719/files

On Fri, Jul 5, 2019 at 6:25 AM Jonathan Santilli <jo...@gmail.com>
wrote:

> Thanks a lot Bill for creating the issue, I have updated it with a little
> bit more of info.
>
> Cheers!
> --
> Jonathan
>
>
>
>
> On Fri, Jun 28, 2019 at 9:21 PM Bill Bejeck <bi...@confluent.io> wrote:
>
> > Jonathan, Matthias
> >
> > I've created a Jira for this issue
> > https://issues.apache.org/jira/browse/KAFKA-8615.
> >
> > Jonathan, I plan to work on this when I get back from vacation on 7/8.
> If
> > you would like to work in this yourself before that, feel free to do so
> and
> > assign the ticket to yourself.
> >
> > Thanks,
> > Bill
> >
> > On Thu, Jun 27, 2019 at 1:38 PM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > Sounds like a regression to me.
> > >
> > > We did change some code to track partition time differently. Can you
> > > open a Jira?
> > >
> > >
> > > -Matthias
> > >
> > > On 6/26/19 7:58 AM, Jonathan Santilli wrote:
> > > > Sure Bill, sure, is the same code I have reported the issue for the
> > > > suppress some months ago:
> > > >
> > >
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> > > >
> > > > In fact, I have reported at that moment, that after restarting the
> app,
> > > the
> > > > suppress was sending again downstream the already processed records.
> > > > Now, with the version 2.2.1+ after restarting the app, the
> > > > aggregation/suppress (do not know exactly where) is missing some
> > records
> > > to
> > > > be aggregated, even though they are in the input topic.
> > > >
> > > > Kafka Version 2.3
> > > >
> > > > *public* *class* OwnTimeExtractor *implements* TimestampExtractor {
> > > >
> > > >     @Override
> > > >
> > > >     *public* *long* extract(*final* ConsumerRecord<Object, Object>
> > > record,
> > > > *final* *long* previousTimestamp) {
> > > >
> > > >
> > > >         *// *previousTimestamp is always == -1
> > > >
> > > >     }
> > > > }
> > > >
> > > > final StreamsBuilder builder = new StreamsBuilder();
> > > > final KStream<..., ...> events = builder
> > > >         .stream(inputTopicNames, Consumed.with(..., ...)
> > > >         .withTimestampExtractor(new OwnTimeExtractor());
> > > >
> > > > events
> > > >     .filter((k, v) -> ...)
> > > >     .flatMapValues(v -> ...)
> > > >     .flatMapValues(v -> ...)
> > > >     .selectKey((k, v) -> v)
> > > >     .groupByKey(Grouped.with(..., ...))
> > > >     .windowedBy(
> > > >         TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
> > > >             .advanceBy(Duration.ofSeconds(windowSizeInSecs))
> > > >             .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
> > > >     .reduce((agg, new) -> {
> > > >         ...
> > > >         return agg;
> > > >     })
> > > >
> > >
> >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > >     .toStream()
> > > >     .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
> > > >
> > > >
> > > >
> > > > On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <bi...@confluent.io>
> wrote:
> > > >
> > > >> Thanks for the reply Jonathan.
> > > >>
> > > >> Are you in a position to share your code so I can try to reproduce
> on
> > my
> > > >> end?
> > > >>
> > > >> -Bill
> > > >>
> > > >>
> > > >> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli <
> > > >> jonathansantilli@gmail.com> wrote:
> > > >>
> > > >>> Hello Bill,
> > > >>>
> > > >>> am implementing the TimestampExtractor Interface, then using it to
> > > >> consume,
> > > >>> like:
> > > >>>
> > > >>> *final* KStream<..., ...> events = builder.stream(inputTopicList,
> > > >> Consumed.
> > > >>> *with*(keySerde, valueSerde).withTimestampExtractor(*new
> > > >> *OwnTimeExtractor(
> > > >>> ...)));
> > > >>>
> > > >>> Am not setting the default.timestamp.extractor config value.
> > > >>>
> > > >>> Cheers!
> > > >>> --
> > > >>> Jonathan
> > > >>>
> > > >>>
> > > >>> On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <bi...@confluent.io>
> > wrote:
> > > >>>
> > > >>>> Hi Jonathan,
> > > >>>>
> > > >>>> Thanks for reporting this.  Which timestamp extractor are you
> using
> > in
> > > >>> the
> > > >>>> configs?
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Bill
> > > >>>>
> > > >>>> On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli <
> > > >>>> jonathansantilli@gmail.com> wrote:
> > > >>>>
> > > >>>>> Hello, hope you all are doing well,
> > > >>>>>
> > > >>>>> am testing the new version 2.3 for Kafka Streams specifically. I
> > have
> > > >>>>> noticed that now, the implementation of the method extract from
> the
> > > >>>>> interface org.apache.kafka.streams.processor.TimestampExtractor
> > > >>>>>
> > > >>>>> *public* *long* extract(ConsumerRecord<Object, Object> record,
> > *long*
> > > >>>>> previousTimestamp)
> > > >>>>>
> > > >>>>>
> > > >>>>> is always returning -1 as value.
> > > >>>>>
> > > >>>>>
> > > >>>>> Previous version 2.2.1 was returning the correct value for the
> > record
> > > >>>>> partition.
> > > >>>>>
> > > >>>>> Am aware the interface is market as @InterfaceStability.Evolving
> > and
> > > >> we
> > > >>>>> should not rely on the stability/compatibility. Am just wondering
> > if
> > > >>> that
> > > >>>>> new behavior is intentional or is a bug.
> > > >>>>>
> > > >>>>>
> > > >>>>> Cheers!
> > > >>>>> --
> > > >>>>> Santilli Jonathan
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>> Santilli Jonathan
> > > >>>
> > > >>
> > > >
> > > >
> > >
> > >
> >
>
>
> --
> Santilli Jonathan
>

Re: org.apache.kafka.streams.processor.TimestampExtractor#extract method in version 2.3 always returns -1 as value

Posted by Jonathan Santilli <jo...@gmail.com>.
Thanks a lot Bill for creating the issue, I have updated it with a little
bit more of info.

Cheers!
--
Jonathan




On Fri, Jun 28, 2019 at 9:21 PM Bill Bejeck <bi...@confluent.io> wrote:

> Jonathan, Matthias
>
> I've created a Jira for this issue
> https://issues.apache.org/jira/browse/KAFKA-8615.
>
> Jonathan, I plan to work on this when I get back from vacation on 7/8.  If
> you would like to work in this yourself before that, feel free to do so and
> assign the ticket to yourself.
>
> Thanks,
> Bill
>
> On Thu, Jun 27, 2019 at 1:38 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Sounds like a regression to me.
> >
> > We did change some code to track partition time differently. Can you
> > open a Jira?
> >
> >
> > -Matthias
> >
> > On 6/26/19 7:58 AM, Jonathan Santilli wrote:
> > > Sure Bill, sure, is the same code I have reported the issue for the
> > > suppress some months ago:
> > >
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> > >
> > > In fact, I have reported at that moment, that after restarting the app,
> > the
> > > suppress was sending again downstream the already processed records.
> > > Now, with the version 2.2.1+ after restarting the app, the
> > > aggregation/suppress (do not know exactly where) is missing some
> records
> > to
> > > be aggregated, even though they are in the input topic.
> > >
> > > Kafka Version 2.3
> > >
> > > *public* *class* OwnTimeExtractor *implements* TimestampExtractor {
> > >
> > >     @Override
> > >
> > >     *public* *long* extract(*final* ConsumerRecord<Object, Object>
> > record,
> > > *final* *long* previousTimestamp) {
> > >
> > >
> > >         *// *previousTimestamp is always == -1
> > >
> > >     }
> > > }
> > >
> > > final StreamsBuilder builder = new StreamsBuilder();
> > > final KStream<..., ...> events = builder
> > >         .stream(inputTopicNames, Consumed.with(..., ...)
> > >         .withTimestampExtractor(new OwnTimeExtractor());
> > >
> > > events
> > >     .filter((k, v) -> ...)
> > >     .flatMapValues(v -> ...)
> > >     .flatMapValues(v -> ...)
> > >     .selectKey((k, v) -> v)
> > >     .groupByKey(Grouped.with(..., ...))
> > >     .windowedBy(
> > >         TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
> > >             .advanceBy(Duration.ofSeconds(windowSizeInSecs))
> > >             .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
> > >     .reduce((agg, new) -> {
> > >         ...
> > >         return agg;
> > >     })
> > >
> >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > >     .toStream()
> > >     .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
> > >
> > >
> > >
> > > On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <bi...@confluent.io> wrote:
> > >
> > >> Thanks for the reply Jonathan.
> > >>
> > >> Are you in a position to share your code so I can try to reproduce on
> my
> > >> end?
> > >>
> > >> -Bill
> > >>
> > >>
> > >> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli <
> > >> jonathansantilli@gmail.com> wrote:
> > >>
> > >>> Hello Bill,
> > >>>
> > >>> am implementing the TimestampExtractor Interface, then using it to
> > >> consume,
> > >>> like:
> > >>>
> > >>> *final* KStream<..., ...> events = builder.stream(inputTopicList,
> > >> Consumed.
> > >>> *with*(keySerde, valueSerde).withTimestampExtractor(*new
> > >> *OwnTimeExtractor(
> > >>> ...)));
> > >>>
> > >>> Am not setting the default.timestamp.extractor config value.
> > >>>
> > >>> Cheers!
> > >>> --
> > >>> Jonathan
> > >>>
> > >>>
> > >>> On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <bi...@confluent.io>
> wrote:
> > >>>
> > >>>> Hi Jonathan,
> > >>>>
> > >>>> Thanks for reporting this.  Which timestamp extractor are you using
> in
> > >>> the
> > >>>> configs?
> > >>>>
> > >>>> Thanks,
> > >>>> Bill
> > >>>>
> > >>>> On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli <
> > >>>> jonathansantilli@gmail.com> wrote:
> > >>>>
> > >>>>> Hello, hope you all are doing well,
> > >>>>>
> > >>>>> am testing the new version 2.3 for Kafka Streams specifically. I
> have
> > >>>>> noticed that now, the implementation of the method extract from the
> > >>>>> interface org.apache.kafka.streams.processor.TimestampExtractor
> > >>>>>
> > >>>>> *public* *long* extract(ConsumerRecord<Object, Object> record,
> *long*
> > >>>>> previousTimestamp)
> > >>>>>
> > >>>>>
> > >>>>> is always returning -1 as value.
> > >>>>>
> > >>>>>
> > >>>>> Previous version 2.2.1 was returning the correct value for the
> record
> > >>>>> partition.
> > >>>>>
> > >>>>> Am aware the interface is market as @InterfaceStability.Evolving
> and
> > >> we
> > >>>>> should not rely on the stability/compatibility. Am just wondering
> if
> > >>> that
> > >>>>> new behavior is intentional or is a bug.
> > >>>>>
> > >>>>>
> > >>>>> Cheers!
> > >>>>> --
> > >>>>> Santilli Jonathan
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>> --
> > >>> Santilli Jonathan
> > >>>
> > >>
> > >
> > >
> >
> >
>


-- 
Santilli Jonathan

Re: org.apache.kafka.streams.processor.TimestampExtractor#extract method in version 2.3 always returns -1 as value

Posted by Bill Bejeck <bi...@confluent.io>.
Jonathan, Matthias

I've created a Jira for this issue
https://issues.apache.org/jira/browse/KAFKA-8615.

Jonathan, I plan to work on this when I get back from vacation on 7/8.  If
you would like to work in this yourself before that, feel free to do so and
assign the ticket to yourself.

Thanks,
Bill

On Thu, Jun 27, 2019 at 1:38 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> Sounds like a regression to me.
>
> We did change some code to track partition time differently. Can you
> open a Jira?
>
>
> -Matthias
>
> On 6/26/19 7:58 AM, Jonathan Santilli wrote:
> > Sure Bill, sure, is the same code I have reported the issue for the
> > suppress some months ago:
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> >
> > In fact, I have reported at that moment, that after restarting the app,
> the
> > suppress was sending again downstream the already processed records.
> > Now, with the version 2.2.1+ after restarting the app, the
> > aggregation/suppress (do not know exactly where) is missing some records
> to
> > be aggregated, even though they are in the input topic.
> >
> > Kafka Version 2.3
> >
> > *public* *class* OwnTimeExtractor *implements* TimestampExtractor {
> >
> >     @Override
> >
> >     *public* *long* extract(*final* ConsumerRecord<Object, Object>
> record,
> > *final* *long* previousTimestamp) {
> >
> >
> >         *// *previousTimestamp is always == -1
> >
> >     }
> > }
> >
> > final StreamsBuilder builder = new StreamsBuilder();
> > final KStream<..., ...> events = builder
> >         .stream(inputTopicNames, Consumed.with(..., ...)
> >         .withTimestampExtractor(new OwnTimeExtractor());
> >
> > events
> >     .filter((k, v) -> ...)
> >     .flatMapValues(v -> ...)
> >     .flatMapValues(v -> ...)
> >     .selectKey((k, v) -> v)
> >     .groupByKey(Grouped.with(..., ...))
> >     .windowedBy(
> >         TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
> >             .advanceBy(Duration.ofSeconds(windowSizeInSecs))
> >             .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
> >     .reduce((agg, new) -> {
> >         ...
> >         return agg;
> >     })
> >
>  .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> >     .toStream()
> >     .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
> >
> >
> >
> > On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <bi...@confluent.io> wrote:
> >
> >> Thanks for the reply Jonathan.
> >>
> >> Are you in a position to share your code so I can try to reproduce on my
> >> end?
> >>
> >> -Bill
> >>
> >>
> >> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli <
> >> jonathansantilli@gmail.com> wrote:
> >>
> >>> Hello Bill,
> >>>
> >>> am implementing the TimestampExtractor Interface, then using it to
> >> consume,
> >>> like:
> >>>
> >>> *final* KStream<..., ...> events = builder.stream(inputTopicList,
> >> Consumed.
> >>> *with*(keySerde, valueSerde).withTimestampExtractor(*new
> >> *OwnTimeExtractor(
> >>> ...)));
> >>>
> >>> Am not setting the default.timestamp.extractor config value.
> >>>
> >>> Cheers!
> >>> --
> >>> Jonathan
> >>>
> >>>
> >>> On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <bi...@confluent.io> wrote:
> >>>
> >>>> Hi Jonathan,
> >>>>
> >>>> Thanks for reporting this.  Which timestamp extractor are you using in
> >>> the
> >>>> configs?
> >>>>
> >>>> Thanks,
> >>>> Bill
> >>>>
> >>>> On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli <
> >>>> jonathansantilli@gmail.com> wrote:
> >>>>
> >>>>> Hello, hope you all are doing well,
> >>>>>
> >>>>> am testing the new version 2.3 for Kafka Streams specifically. I have
> >>>>> noticed that now, the implementation of the method extract from the
> >>>>> interface org.apache.kafka.streams.processor.TimestampExtractor
> >>>>>
> >>>>> *public* *long* extract(ConsumerRecord<Object, Object> record, *long*
> >>>>> previousTimestamp)
> >>>>>
> >>>>>
> >>>>> is always returning -1 as value.
> >>>>>
> >>>>>
> >>>>> Previous version 2.2.1 was returning the correct value for the record
> >>>>> partition.
> >>>>>
> >>>>> Am aware the interface is market as @InterfaceStability.Evolving and
> >> we
> >>>>> should not rely on the stability/compatibility. Am just wondering if
> >>> that
> >>>>> new behavior is intentional or is a bug.
> >>>>>
> >>>>>
> >>>>> Cheers!
> >>>>> --
> >>>>> Santilli Jonathan
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> Santilli Jonathan
> >>>
> >>
> >
> >
>
>

Re: org.apache.kafka.streams.processor.TimestampExtractor#extract method in version 2.3 always returns -1 as value

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Sounds like a regression to me.

We did change some code to track partition time differently. Can you
open a Jira?


-Matthias

On 6/26/19 7:58 AM, Jonathan Santilli wrote:
> Sure Bill, sure, is the same code I have reported the issue for the
> suppress some months ago:
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> 
> In fact, I have reported at that moment, that after restarting the app, the
> suppress was sending again downstream the already processed records.
> Now, with the version 2.2.1+ after restarting the app, the
> aggregation/suppress (do not know exactly where) is missing some records to
> be aggregated, even though they are in the input topic.
> 
> Kafka Version 2.3
> 
> *public* *class* OwnTimeExtractor *implements* TimestampExtractor {
> 
>     @Override
> 
>     *public* *long* extract(*final* ConsumerRecord<Object, Object> record,
> *final* *long* previousTimestamp) {
> 
> 
>         *// *previousTimestamp is always == -1
> 
>     }
> }
> 
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream<..., ...> events = builder
>         .stream(inputTopicNames, Consumed.with(..., ...)
>         .withTimestampExtractor(new OwnTimeExtractor());
> 
> events
>     .filter((k, v) -> ...)
>     .flatMapValues(v -> ...)
>     .flatMapValues(v -> ...)
>     .selectKey((k, v) -> v)
>     .groupByKey(Grouped.with(..., ...))
>     .windowedBy(
>         TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
>             .advanceBy(Duration.ofSeconds(windowSizeInSecs))
>             .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
>     .reduce((agg, new) -> {
>         ...
>         return agg;
>     })
>     .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>     .toStream()
>     .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
> 
> 
> 
> On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <bi...@confluent.io> wrote:
> 
>> Thanks for the reply Jonathan.
>>
>> Are you in a position to share your code so I can try to reproduce on my
>> end?
>>
>> -Bill
>>
>>
>> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli <
>> jonathansantilli@gmail.com> wrote:
>>
>>> Hello Bill,
>>>
>>> am implementing the TimestampExtractor Interface, then using it to
>> consume,
>>> like:
>>>
>>> *final* KStream<..., ...> events = builder.stream(inputTopicList,
>> Consumed.
>>> *with*(keySerde, valueSerde).withTimestampExtractor(*new
>> *OwnTimeExtractor(
>>> ...)));
>>>
>>> Am not setting the default.timestamp.extractor config value.
>>>
>>> Cheers!
>>> --
>>> Jonathan
>>>
>>>
>>> On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <bi...@confluent.io> wrote:
>>>
>>>> Hi Jonathan,
>>>>
>>>> Thanks for reporting this.  Which timestamp extractor are you using in
>>> the
>>>> configs?
>>>>
>>>> Thanks,
>>>> Bill
>>>>
>>>> On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli <
>>>> jonathansantilli@gmail.com> wrote:
>>>>
>>>>> Hello, hope you all are doing well,
>>>>>
>>>>> am testing the new version 2.3 for Kafka Streams specifically. I have
>>>>> noticed that now, the implementation of the method extract from the
>>>>> interface org.apache.kafka.streams.processor.TimestampExtractor
>>>>>
>>>>> *public* *long* extract(ConsumerRecord<Object, Object> record, *long*
>>>>> previousTimestamp)
>>>>>
>>>>>
>>>>> is always returning -1 as value.
>>>>>
>>>>>
>>>>> Previous version 2.2.1 was returning the correct value for the record
>>>>> partition.
>>>>>
>>>>> Am aware the interface is market as @InterfaceStability.Evolving and
>> we
>>>>> should not rely on the stability/compatibility. Am just wondering if
>>> that
>>>>> new behavior is intentional or is a bug.
>>>>>
>>>>>
>>>>> Cheers!
>>>>> --
>>>>> Santilli Jonathan
>>>>>
>>>>
>>>
>>>
>>> --
>>> Santilli Jonathan
>>>
>>
> 
> 


Re: org.apache.kafka.streams.processor.TimestampExtractor#extract method in version 2.3 always returns -1 as value

Posted by Jonathan Santilli <jo...@gmail.com>.
Sure Bill, sure, is the same code I have reported the issue for the
suppress some months ago:
https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio

In fact, I have reported at that moment, that after restarting the app, the
suppress was sending again downstream the already processed records.
Now, with the version 2.2.1+ after restarting the app, the
aggregation/suppress (do not know exactly where) is missing some records to
be aggregated, even though they are in the input topic.

Kafka Version 2.3

*public* *class* OwnTimeExtractor *implements* TimestampExtractor {

    @Override

    *public* *long* extract(*final* ConsumerRecord<Object, Object> record,
*final* *long* previousTimestamp) {


        *// *previousTimestamp is always == -1

    }
}

final StreamsBuilder builder = new StreamsBuilder();
final KStream<..., ...> events = builder
        .stream(inputTopicNames, Consumed.with(..., ...)
        .withTimestampExtractor(new OwnTimeExtractor());

events
    .filter((k, v) -> ...)
    .flatMapValues(v -> ...)
    .flatMapValues(v -> ...)
    .selectKey((k, v) -> v)
    .groupByKey(Grouped.with(..., ...))
    .windowedBy(
        TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
            .advanceBy(Duration.ofSeconds(windowSizeInSecs))
            .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
    .reduce((agg, new) -> {
        ...
        return agg;
    })
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream()
    .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));



On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <bi...@confluent.io> wrote:

> Thanks for the reply Jonathan.
>
> Are you in a position to share your code so I can try to reproduce on my
> end?
>
> -Bill
>
>
> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli <
> jonathansantilli@gmail.com> wrote:
>
> > Hello Bill,
> >
> > am implementing the TimestampExtractor Interface, then using it to
> consume,
> > like:
> >
> > *final* KStream<..., ...> events = builder.stream(inputTopicList,
> Consumed.
> > *with*(keySerde, valueSerde).withTimestampExtractor(*new
> *OwnTimeExtractor(
> > ...)));
> >
> > Am not setting the default.timestamp.extractor config value.
> >
> > Cheers!
> > --
> > Jonathan
> >
> >
> > On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <bi...@confluent.io> wrote:
> >
> > > Hi Jonathan,
> > >
> > > Thanks for reporting this.  Which timestamp extractor are you using in
> > the
> > > configs?
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli <
> > > jonathansantilli@gmail.com> wrote:
> > >
> > > > Hello, hope you all are doing well,
> > > >
> > > > am testing the new version 2.3 for Kafka Streams specifically. I have
> > > > noticed that now, the implementation of the method extract from the
> > > > interface org.apache.kafka.streams.processor.TimestampExtractor
> > > >
> > > > *public* *long* extract(ConsumerRecord<Object, Object> record, *long*
> > > > previousTimestamp)
> > > >
> > > >
> > > > is always returning -1 as value.
> > > >
> > > >
> > > > Previous version 2.2.1 was returning the correct value for the record
> > > > partition.
> > > >
> > > > Am aware the interface is market as @InterfaceStability.Evolving and
> we
> > > > should not rely on the stability/compatibility. Am just wondering if
> > that
> > > > new behavior is intentional or is a bug.
> > > >
> > > >
> > > > Cheers!
> > > > --
> > > > Santilli Jonathan
> > > >
> > >
> >
> >
> > --
> > Santilli Jonathan
> >
>


-- 
Santilli Jonathan

Re: org.apache.kafka.streams.processor.TimestampExtractor#extract method in version 2.3 always returns -1 as value

Posted by Bill Bejeck <bi...@confluent.io>.
Thanks for the reply Jonathan.

Are you in a position to share your code so I can try to reproduce on my
end?

-Bill


On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli <
jonathansantilli@gmail.com> wrote:

> Hello Bill,
>
> am implementing the TimestampExtractor Interface, then using it to consume,
> like:
>
> *final* KStream<..., ...> events = builder.stream(inputTopicList, Consumed.
> *with*(keySerde, valueSerde).withTimestampExtractor(*new *OwnTimeExtractor(
> ...)));
>
> Am not setting the default.timestamp.extractor config value.
>
> Cheers!
> --
> Jonathan
>
>
> On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <bi...@confluent.io> wrote:
>
> > Hi Jonathan,
> >
> > Thanks for reporting this.  Which timestamp extractor are you using in
> the
> > configs?
> >
> > Thanks,
> > Bill
> >
> > On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli <
> > jonathansantilli@gmail.com> wrote:
> >
> > > Hello, hope you all are doing well,
> > >
> > > am testing the new version 2.3 for Kafka Streams specifically. I have
> > > noticed that now, the implementation of the method extract from the
> > > interface org.apache.kafka.streams.processor.TimestampExtractor
> > >
> > > *public* *long* extract(ConsumerRecord<Object, Object> record, *long*
> > > previousTimestamp)
> > >
> > >
> > > is always returning -1 as value.
> > >
> > >
> > > Previous version 2.2.1 was returning the correct value for the record
> > > partition.
> > >
> > > Am aware the interface is market as @InterfaceStability.Evolving and we
> > > should not rely on the stability/compatibility. Am just wondering if
> that
> > > new behavior is intentional or is a bug.
> > >
> > >
> > > Cheers!
> > > --
> > > Santilli Jonathan
> > >
> >
>
>
> --
> Santilli Jonathan
>

Re: org.apache.kafka.streams.processor.TimestampExtractor#extract method in version 2.3 always returns -1 as value

Posted by Jonathan Santilli <jo...@gmail.com>.
Hello Bill,

am implementing the TimestampExtractor Interface, then using it to consume,
like:

*final* KStream<..., ...> events = builder.stream(inputTopicList, Consumed.
*with*(keySerde, valueSerde).withTimestampExtractor(*new *OwnTimeExtractor(
...)));

Am not setting the default.timestamp.extractor config value.

Cheers!
--
Jonathan


On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <bi...@confluent.io> wrote:

> Hi Jonathan,
>
> Thanks for reporting this.  Which timestamp extractor are you using in the
> configs?
>
> Thanks,
> Bill
>
> On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli <
> jonathansantilli@gmail.com> wrote:
>
> > Hello, hope you all are doing well,
> >
> > am testing the new version 2.3 for Kafka Streams specifically. I have
> > noticed that now, the implementation of the method extract from the
> > interface org.apache.kafka.streams.processor.TimestampExtractor
> >
> > *public* *long* extract(ConsumerRecord<Object, Object> record, *long*
> > previousTimestamp)
> >
> >
> > is always returning -1 as value.
> >
> >
> > Previous version 2.2.1 was returning the correct value for the record
> > partition.
> >
> > Am aware the interface is market as @InterfaceStability.Evolving and we
> > should not rely on the stability/compatibility. Am just wondering if that
> > new behavior is intentional or is a bug.
> >
> >
> > Cheers!
> > --
> > Santilli Jonathan
> >
>


-- 
Santilli Jonathan

Re: org.apache.kafka.streams.processor.TimestampExtractor#extract method in version 2.3 always returns -1 as value

Posted by Bill Bejeck <bi...@confluent.io>.
Hi Jonathan,

Thanks for reporting this.  Which timestamp extractor are you using in the
configs?

Thanks,
Bill

On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli <
jonathansantilli@gmail.com> wrote:

> Hello, hope you all are doing well,
>
> am testing the new version 2.3 for Kafka Streams specifically. I have
> noticed that now, the implementation of the method extract from the
> interface org.apache.kafka.streams.processor.TimestampExtractor
>
> *public* *long* extract(ConsumerRecord<Object, Object> record, *long*
> previousTimestamp)
>
>
> is always returning -1 as value.
>
>
> Previous version 2.2.1 was returning the correct value for the record
> partition.
>
> Am aware the interface is market as @InterfaceStability.Evolving and we
> should not rely on the stability/compatibility. Am just wondering if that
> new behavior is intentional or is a bug.
>
>
> Cheers!
> --
> Santilli Jonathan
>