You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pavel Koroliov <af...@gmail.com> on 2018/10/29 12:28:25 UTC
Problem with kafka-streams aggregate windowedBy
Hi everyone! I use kafka-streams, and i have a problem when i use
windowedBy. Everything works well until I restart the application. After
restarting my aggregation starts from beginning.
Code bellow:
>
> StreamsBuilder builder = new StreamsBuilder()
> KStream stream = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
>
> KTable table = stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
> .aggregate(
> { new AggregatorModel() },
> { key, value, aggregate ->
> return aggregate.add(value)
> }
> )
> .toStream()
> .map({ k, v ->
> new KeyValue<>(k.window().end(), v)
> })
> .to('output')
>
> def config = new Properties()
> config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
> config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, TimeUnit.SECONDS.toMillis(60))
>
> KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
> kafkaStreams.start()
>
>
I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
'latest' and 'earliest' but it didn't help.
Can you help me understand what I'm doing wrong?
Thank you.
Re: Problem with kafka-streams aggregate windowedBy
Posted by Vincenzo D'Amore <v....@gmail.com>.
Hi Pavel, did you understood why do you have such strange behaviour?
On Tue, Oct 30, 2018 at 12:22 PM Pavel Koroliov <af...@gmail.com>
wrote:
> I'm sorry guy's. Aggregation works fine, but i've found new problem with
> *groupByKey()*. After restart application some aggregations starts from
> beginning, although this key already has aggregated data. And some
> aggregations continue to summarize data. This is very strange, I did not
> expect such behavior)))).
>
> вт, 30 окт. 2018 г. в 0:43, Matthias J. Sax <ma...@confluent.io>:
>
> > Make sure to call `KafkaStreams#close()` to get the latest offsets
> > committed.
> >
> > Beside this, you can check the consumer and Streams logs in DEBUG mode,
> > to see what offset is picked up (or not).
> >
> >
> > -Matthias
> >
> > On 10/29/18 11:43 AM, Patrik Kleindl wrote:
> > > Hi
> > > How long does your application run? More than the 60 seconds you set
> for
> > commit interval?
> > > Have a look at
> >
> https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+
> > > and check if your offsets are really comitted
> > > Best regards
> > > Patrik
> > >
> > >> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov <af...@gmail.com>:
> > >>
> > >> Hi
> > >> No, my application id doesn't change
> > >>
> > >> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pk...@gmail.com>:
> > >>
> > >>> Hi
> > >>> Does your applicationId change?
> > >>> Best regards
> > >>> Patrik
> > >>>
> > >>>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <afgmeister@gmail.com
> >:
> > >>>>
> > >>>> Hi everyone! I use kafka-streams, and i have a problem when i use
> > >>>> windowedBy. Everything works well until I restart the application.
> > After
> > >>>> restarting my aggregation starts from beginning.
> > >>>> Code bellow:
> > >>>>>
> > >>>>> StreamsBuilder builder = new StreamsBuilder()
> > >>>>> KStream stream = builder.stream(topic,
> > >>> Consumed.with(Serdes.String(), Serdes.String()))
> > >>>>>
> > >>>>> KTable table =
> > >>>
> >
> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
> > >>>>> .aggregate(
> > >>>>> { new AggregatorModel() },
> > >>>>> { key, value, aggregate ->
> > >>>>> return aggregate.add(value)
> > >>>>> }
> > >>>>> )
> > >>>>> .toStream()
> > >>>>> .map({ k, v ->
> > >>>>> new KeyValue<>(k.window().end(), v)
> > >>>>> })
> > >>>>> .to('output')
> > >>>>>
> > >>>>> def config = new Properties()
> > >>>>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
> > >>>>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > 'localhost:9092')
> > >>>>> config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> > >>> TimeUnit.SECONDS.toMillis(60))
> > >>>>>
> > >>>>> KafkaStreams kafkaStreams = new KafkaStreams(builder.build(),
> > config)
> > >>>>> kafkaStreams.start()
> > >>>>>
> > >>>>>
> > >>>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set
> > to
> > >>>> 'latest' and 'earliest' but it didn't help.
> > >>>> Can you help me understand what I'm doing wrong?
> > >>>> Thank you.
> > >>>
> > >
> >
> >
>
--
Vincenzo D'Amore
Re: Problem with kafka-streams aggregate windowedBy
Posted by Pavel Koroliov <af...@gmail.com>.
I'm sorry guy's. Aggregation works fine, but i've found new problem with
*groupByKey()*. After restart application some aggregations starts from
beginning, although this key already has aggregated data. And some
aggregations continue to summarize data. This is very strange, I did not
expect such behavior)))).
вт, 30 окт. 2018 г. в 0:43, Matthias J. Sax <ma...@confluent.io>:
> Make sure to call `KafkaStreams#close()` to get the latest offsets
> committed.
>
> Beside this, you can check the consumer and Streams logs in DEBUG mode,
> to see what offset is picked up (or not).
>
>
> -Matthias
>
> On 10/29/18 11:43 AM, Patrik Kleindl wrote:
> > Hi
> > How long does your application run? More than the 60 seconds you set for
> commit interval?
> > Have a look at
> https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+
> > and check if your offsets are really comitted
> > Best regards
> > Patrik
> >
> >> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov <af...@gmail.com>:
> >>
> >> Hi
> >> No, my application id doesn't change
> >>
> >> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pk...@gmail.com>:
> >>
> >>> Hi
> >>> Does your applicationId change?
> >>> Best regards
> >>> Patrik
> >>>
> >>>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <af...@gmail.com>:
> >>>>
> >>>> Hi everyone! I use kafka-streams, and i have a problem when i use
> >>>> windowedBy. Everything works well until I restart the application.
> After
> >>>> restarting my aggregation starts from beginning.
> >>>> Code bellow:
> >>>>>
> >>>>> StreamsBuilder builder = new StreamsBuilder()
> >>>>> KStream stream = builder.stream(topic,
> >>> Consumed.with(Serdes.String(), Serdes.String()))
> >>>>>
> >>>>> KTable table =
> >>>
> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
> >>>>> .aggregate(
> >>>>> { new AggregatorModel() },
> >>>>> { key, value, aggregate ->
> >>>>> return aggregate.add(value)
> >>>>> }
> >>>>> )
> >>>>> .toStream()
> >>>>> .map({ k, v ->
> >>>>> new KeyValue<>(k.window().end(), v)
> >>>>> })
> >>>>> .to('output')
> >>>>>
> >>>>> def config = new Properties()
> >>>>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
> >>>>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> 'localhost:9092')
> >>>>> config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> >>> TimeUnit.SECONDS.toMillis(60))
> >>>>>
> >>>>> KafkaStreams kafkaStreams = new KafkaStreams(builder.build(),
> config)
> >>>>> kafkaStreams.start()
> >>>>>
> >>>>>
> >>>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set
> to
> >>>> 'latest' and 'earliest' but it didn't help.
> >>>> Can you help me understand what I'm doing wrong?
> >>>> Thank you.
> >>>
> >
>
>
Re: Problem with kafka-streams aggregate windowedBy
Posted by "Matthias J. Sax" <ma...@confluent.io>.
Make sure to call `KafkaStreams#close()` to get the latest offsets
committed.
Beside this, you can check the consumer and Streams logs in DEBUG mode,
to see what offset is picked up (or not).
-Matthias
On 10/29/18 11:43 AM, Patrik Kleindl wrote:
> Hi
> How long does your application run? More than the 60 seconds you set for commit interval?
> Have a look at https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+
> and check if your offsets are really comitted
> Best regards
> Patrik
>
>> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov <af...@gmail.com>:
>>
>> Hi
>> No, my application id doesn't change
>>
>> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pk...@gmail.com>:
>>
>>> Hi
>>> Does your applicationId change?
>>> Best regards
>>> Patrik
>>>
>>>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <af...@gmail.com>:
>>>>
>>>> Hi everyone! I use kafka-streams, and i have a problem when i use
>>>> windowedBy. Everything works well until I restart the application. After
>>>> restarting my aggregation starts from beginning.
>>>> Code bellow:
>>>>>
>>>>> StreamsBuilder builder = new StreamsBuilder()
>>>>> KStream stream = builder.stream(topic,
>>> Consumed.with(Serdes.String(), Serdes.String()))
>>>>>
>>>>> KTable table =
>>> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
>>>>> .aggregate(
>>>>> { new AggregatorModel() },
>>>>> { key, value, aggregate ->
>>>>> return aggregate.add(value)
>>>>> }
>>>>> )
>>>>> .toStream()
>>>>> .map({ k, v ->
>>>>> new KeyValue<>(k.window().end(), v)
>>>>> })
>>>>> .to('output')
>>>>>
>>>>> def config = new Properties()
>>>>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
>>>>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
>>>>> config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
>>> TimeUnit.SECONDS.toMillis(60))
>>>>>
>>>>> KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
>>>>> kafkaStreams.start()
>>>>>
>>>>>
>>>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
>>>> 'latest' and 'earliest' but it didn't help.
>>>> Can you help me understand what I'm doing wrong?
>>>> Thank you.
>>>
>
Re: Problem with kafka-streams aggregate windowedBy
Posted by Patrik Kleindl <pk...@gmail.com>.
Hi
How long does your application run? More than the 60 seconds you set for commit interval?
Have a look at https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+
and check if your offsets are really comitted
Best regards
Patrik
> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov <af...@gmail.com>:
>
> Hi
> No, my application id doesn't change
>
> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pk...@gmail.com>:
>
>> Hi
>> Does your applicationId change?
>> Best regards
>> Patrik
>>
>>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <af...@gmail.com>:
>>>
>>> Hi everyone! I use kafka-streams, and i have a problem when i use
>>> windowedBy. Everything works well until I restart the application. After
>>> restarting my aggregation starts from beginning.
>>> Code bellow:
>>>>
>>>> StreamsBuilder builder = new StreamsBuilder()
>>>> KStream stream = builder.stream(topic,
>> Consumed.with(Serdes.String(), Serdes.String()))
>>>>
>>>> KTable table =
>> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
>>>> .aggregate(
>>>> { new AggregatorModel() },
>>>> { key, value, aggregate ->
>>>> return aggregate.add(value)
>>>> }
>>>> )
>>>> .toStream()
>>>> .map({ k, v ->
>>>> new KeyValue<>(k.window().end(), v)
>>>> })
>>>> .to('output')
>>>>
>>>> def config = new Properties()
>>>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
>>>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
>>>> config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
>> TimeUnit.SECONDS.toMillis(60))
>>>>
>>>> KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
>>>> kafkaStreams.start()
>>>>
>>>>
>>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
>>> 'latest' and 'earliest' but it didn't help.
>>> Can you help me understand what I'm doing wrong?
>>> Thank you.
>>
Re: Problem with kafka-streams aggregate windowedBy
Posted by Pavel Koroliov <af...@gmail.com>.
Hi
No, my application id doesn't change
пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pk...@gmail.com>:
> Hi
> Does your applicationId change?
> Best regards
> Patrik
>
> > Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <af...@gmail.com>:
> >
> > Hi everyone! I use kafka-streams, and i have a problem when i use
> > windowedBy. Everything works well until I restart the application. After
> > restarting my aggregation starts from beginning.
> > Code bellow:
> >>
> >> StreamsBuilder builder = new StreamsBuilder()
> >> KStream stream = builder.stream(topic,
> Consumed.with(Serdes.String(), Serdes.String()))
> >>
> >> KTable table =
> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
> >> .aggregate(
> >> { new AggregatorModel() },
> >> { key, value, aggregate ->
> >> return aggregate.add(value)
> >> }
> >> )
> >> .toStream()
> >> .map({ k, v ->
> >> new KeyValue<>(k.window().end(), v)
> >> })
> >> .to('output')
> >>
> >> def config = new Properties()
> >> config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
> >> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
> >> config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> TimeUnit.SECONDS.toMillis(60))
> >>
> >> KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
> >> kafkaStreams.start()
> >>
> >>
> > I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
> > 'latest' and 'earliest' but it didn't help.
> > Can you help me understand what I'm doing wrong?
> > Thank you.
>
Re: Problem with kafka-streams aggregate windowedBy
Posted by Patrik Kleindl <pk...@gmail.com>.
Hi
Does your applicationId change?
Best regards
Patrik
> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <af...@gmail.com>:
>
> Hi everyone! I use kafka-streams, and i have a problem when i use
> windowedBy. Everything works well until I restart the application. After
> restarting my aggregation starts from beginning.
> Code bellow:
>>
>> StreamsBuilder builder = new StreamsBuilder()
>> KStream stream = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
>>
>> KTable table = stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
>> .aggregate(
>> { new AggregatorModel() },
>> { key, value, aggregate ->
>> return aggregate.add(value)
>> }
>> )
>> .toStream()
>> .map({ k, v ->
>> new KeyValue<>(k.window().end(), v)
>> })
>> .to('output')
>>
>> def config = new Properties()
>> config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
>> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
>> config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, TimeUnit.SECONDS.toMillis(60))
>>
>> KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
>> kafkaStreams.start()
>>
>>
> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
> 'latest' and 'earliest' but it didn't help.
> Can you help me understand what I'm doing wrong?
> Thank you.