You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pavel Koroliov <af...@gmail.com> on 2018/10/29 12:28:25 UTC

Problem with kafka-streams aggregate windowedBy

Hi everyone! I use kafka-streams, and i have a problem when i use
windowedBy. Everything works well until I restart the application. After
restarting my aggregation starts from beginning.
Code bellow:
>
>     StreamsBuilder builder = new StreamsBuilder()
>     KStream stream = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
>
>     KTable table = stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
>             .aggregate(
>             { new AggregatorModel() },
>             { key, value, aggregate ->
>                 return aggregate.add(value)
>             }
>     )
>             .toStream()
>             .map({ k, v ->
>         new KeyValue<>(k.window().end(), v)
>     })
>             .to('output')
>
>     def config = new Properties()
>     config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
>     config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
>     config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, TimeUnit.SECONDS.toMillis(60))
>
>     KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
>     kafkaStreams.start()
>
>
I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
'latest' and 'earliest' but it didn't help.
Can you help me understand what I'm doing wrong?
Thank you.

Re: Problem with kafka-streams aggregate windowedBy

Posted by Vincenzo D'Amore <v....@gmail.com>.
Hi Pavel, did you understood why do you have such strange behaviour?

On Tue, Oct 30, 2018 at 12:22 PM Pavel Koroliov <af...@gmail.com>
wrote:

> I'm sorry guy's. Aggregation works fine, but i've found new problem with
> *groupByKey()*. After restart application some aggregations starts from
> beginning, although this key already has aggregated data. And some
> aggregations continue to summarize data. This is very strange, I did not
> expect such behavior)))).
>
> вт, 30 окт. 2018 г. в 0:43, Matthias J. Sax <ma...@confluent.io>:
>
> > Make sure to call `KafkaStreams#close()` to get the latest offsets
> > committed.
> >
> > Beside this, you can check the consumer and Streams logs in DEBUG mode,
> > to see what offset is picked up (or not).
> >
> >
> > -Matthias
> >
> > On 10/29/18 11:43 AM, Patrik Kleindl wrote:
> > > Hi
> > > How long does your application run? More than the 60 seconds you set
> for
> > commit interval?
> > > Have a look at
> >
> https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+
> > > and check if your offsets are really comitted
> > > Best regards
> > > Patrik
> > >
> > >> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov <af...@gmail.com>:
> > >>
> > >> Hi
> > >> No, my application id doesn't change
> > >>
> > >> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pk...@gmail.com>:
> > >>
> > >>> Hi
> > >>> Does your applicationId change?
> > >>> Best regards
> > >>> Patrik
> > >>>
> > >>>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <afgmeister@gmail.com
> >:
> > >>>>
> > >>>> Hi everyone! I use kafka-streams, and i have a problem when i use
> > >>>> windowedBy. Everything works well until I restart the application.
> > After
> > >>>> restarting my aggregation starts from beginning.
> > >>>> Code bellow:
> > >>>>>
> > >>>>>   StreamsBuilder builder = new StreamsBuilder()
> > >>>>>   KStream stream = builder.stream(topic,
> > >>> Consumed.with(Serdes.String(), Serdes.String()))
> > >>>>>
> > >>>>>   KTable table =
> > >>>
> >
> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
> > >>>>>           .aggregate(
> > >>>>>           { new AggregatorModel() },
> > >>>>>           { key, value, aggregate ->
> > >>>>>               return aggregate.add(value)
> > >>>>>           }
> > >>>>>   )
> > >>>>>           .toStream()
> > >>>>>           .map({ k, v ->
> > >>>>>       new KeyValue<>(k.window().end(), v)
> > >>>>>   })
> > >>>>>           .to('output')
> > >>>>>
> > >>>>>   def config = new Properties()
> > >>>>>   config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
> > >>>>>   config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > 'localhost:9092')
> > >>>>>   config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> > >>> TimeUnit.SECONDS.toMillis(60))
> > >>>>>
> > >>>>>   KafkaStreams kafkaStreams = new KafkaStreams(builder.build(),
> > config)
> > >>>>>   kafkaStreams.start()
> > >>>>>
> > >>>>>
> > >>>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set
> > to
> > >>>> 'latest' and 'earliest' but it didn't help.
> > >>>> Can you help me understand what I'm doing wrong?
> > >>>> Thank you.
> > >>>
> > >
> >
> >
>


-- 
Vincenzo D'Amore

Re: Problem with kafka-streams aggregate windowedBy

Posted by Pavel Koroliov <af...@gmail.com>.
I'm sorry guy's. Aggregation works fine, but i've found new problem with
*groupByKey()*. After restart application some aggregations starts from
beginning, although this key already has aggregated data. And some
aggregations continue to summarize data. This is very strange, I did not
expect such behavior)))).

вт, 30 окт. 2018 г. в 0:43, Matthias J. Sax <ma...@confluent.io>:

> Make sure to call `KafkaStreams#close()` to get the latest offsets
> committed.
>
> Beside this, you can check the consumer and Streams logs in DEBUG mode,
> to see what offset is picked up (or not).
>
>
> -Matthias
>
> On 10/29/18 11:43 AM, Patrik Kleindl wrote:
> > Hi
> > How long does your application run? More than the 60 seconds you set for
> commit interval?
> > Have a look at
> https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+
> > and check if your offsets are really comitted
> > Best regards
> > Patrik
> >
> >> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov <af...@gmail.com>:
> >>
> >> Hi
> >> No, my application id doesn't change
> >>
> >> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pk...@gmail.com>:
> >>
> >>> Hi
> >>> Does your applicationId change?
> >>> Best regards
> >>> Patrik
> >>>
> >>>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <af...@gmail.com>:
> >>>>
> >>>> Hi everyone! I use kafka-streams, and i have a problem when i use
> >>>> windowedBy. Everything works well until I restart the application.
> After
> >>>> restarting my aggregation starts from beginning.
> >>>> Code bellow:
> >>>>>
> >>>>>   StreamsBuilder builder = new StreamsBuilder()
> >>>>>   KStream stream = builder.stream(topic,
> >>> Consumed.with(Serdes.String(), Serdes.String()))
> >>>>>
> >>>>>   KTable table =
> >>>
> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
> >>>>>           .aggregate(
> >>>>>           { new AggregatorModel() },
> >>>>>           { key, value, aggregate ->
> >>>>>               return aggregate.add(value)
> >>>>>           }
> >>>>>   )
> >>>>>           .toStream()
> >>>>>           .map({ k, v ->
> >>>>>       new KeyValue<>(k.window().end(), v)
> >>>>>   })
> >>>>>           .to('output')
> >>>>>
> >>>>>   def config = new Properties()
> >>>>>   config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
> >>>>>   config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> 'localhost:9092')
> >>>>>   config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> >>> TimeUnit.SECONDS.toMillis(60))
> >>>>>
> >>>>>   KafkaStreams kafkaStreams = new KafkaStreams(builder.build(),
> config)
> >>>>>   kafkaStreams.start()
> >>>>>
> >>>>>
> >>>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set
> to
> >>>> 'latest' and 'earliest' but it didn't help.
> >>>> Can you help me understand what I'm doing wrong?
> >>>> Thank you.
> >>>
> >
>
>

Re: Problem with kafka-streams aggregate windowedBy

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Make sure to call `KafkaStreams#close()` to get the latest offsets
committed.

Beside this, you can check the consumer and Streams logs in DEBUG mode,
to see what offset is picked up (or not).


-Matthias

On 10/29/18 11:43 AM, Patrik Kleindl wrote:
> Hi
> How long does your application run? More than the 60 seconds you set for commit interval?
> Have a look at https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+ 
> and check if your offsets are really comitted
> Best regards
> Patrik
> 
>> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov <af...@gmail.com>:
>>
>> Hi
>> No, my application id doesn't change
>>
>> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pk...@gmail.com>:
>>
>>> Hi
>>> Does your applicationId change?
>>> Best regards
>>> Patrik
>>>
>>>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <af...@gmail.com>:
>>>>
>>>> Hi everyone! I use kafka-streams, and i have a problem when i use
>>>> windowedBy. Everything works well until I restart the application. After
>>>> restarting my aggregation starts from beginning.
>>>> Code bellow:
>>>>>
>>>>>   StreamsBuilder builder = new StreamsBuilder()
>>>>>   KStream stream = builder.stream(topic,
>>> Consumed.with(Serdes.String(), Serdes.String()))
>>>>>
>>>>>   KTable table =
>>> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
>>>>>           .aggregate(
>>>>>           { new AggregatorModel() },
>>>>>           { key, value, aggregate ->
>>>>>               return aggregate.add(value)
>>>>>           }
>>>>>   )
>>>>>           .toStream()
>>>>>           .map({ k, v ->
>>>>>       new KeyValue<>(k.window().end(), v)
>>>>>   })
>>>>>           .to('output')
>>>>>
>>>>>   def config = new Properties()
>>>>>   config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
>>>>>   config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
>>>>>   config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
>>> TimeUnit.SECONDS.toMillis(60))
>>>>>
>>>>>   KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
>>>>>   kafkaStreams.start()
>>>>>
>>>>>
>>>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
>>>> 'latest' and 'earliest' but it didn't help.
>>>> Can you help me understand what I'm doing wrong?
>>>> Thank you.
>>>
> 


Re: Problem with kafka-streams aggregate windowedBy

Posted by Patrik Kleindl <pk...@gmail.com>.
Hi
How long does your application run? More than the 60 seconds you set for commit interval?
Have a look at https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+ 
and check if your offsets are really comitted
Best regards
Patrik

> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov <af...@gmail.com>:
> 
> Hi
> No, my application id doesn't change
> 
> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pk...@gmail.com>:
> 
>> Hi
>> Does your applicationId change?
>> Best regards
>> Patrik
>> 
>>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <af...@gmail.com>:
>>> 
>>> Hi everyone! I use kafka-streams, and i have a problem when i use
>>> windowedBy. Everything works well until I restart the application. After
>>> restarting my aggregation starts from beginning.
>>> Code bellow:
>>>> 
>>>>   StreamsBuilder builder = new StreamsBuilder()
>>>>   KStream stream = builder.stream(topic,
>> Consumed.with(Serdes.String(), Serdes.String()))
>>>> 
>>>>   KTable table =
>> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
>>>>           .aggregate(
>>>>           { new AggregatorModel() },
>>>>           { key, value, aggregate ->
>>>>               return aggregate.add(value)
>>>>           }
>>>>   )
>>>>           .toStream()
>>>>           .map({ k, v ->
>>>>       new KeyValue<>(k.window().end(), v)
>>>>   })
>>>>           .to('output')
>>>> 
>>>>   def config = new Properties()
>>>>   config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
>>>>   config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
>>>>   config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
>> TimeUnit.SECONDS.toMillis(60))
>>>> 
>>>>   KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
>>>>   kafkaStreams.start()
>>>> 
>>>> 
>>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
>>> 'latest' and 'earliest' but it didn't help.
>>> Can you help me understand what I'm doing wrong?
>>> Thank you.
>> 

Re: Problem with kafka-streams aggregate windowedBy

Posted by Pavel Koroliov <af...@gmail.com>.
Hi
No, my application id doesn't change

пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pk...@gmail.com>:

> Hi
> Does your applicationId change?
> Best regards
> Patrik
>
> > Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <af...@gmail.com>:
> >
> > Hi everyone! I use kafka-streams, and i have a problem when i use
> > windowedBy. Everything works well until I restart the application. After
> > restarting my aggregation starts from beginning.
> > Code bellow:
> >>
> >>    StreamsBuilder builder = new StreamsBuilder()
> >>    KStream stream = builder.stream(topic,
> Consumed.with(Serdes.String(), Serdes.String()))
> >>
> >>    KTable table =
> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
> >>            .aggregate(
> >>            { new AggregatorModel() },
> >>            { key, value, aggregate ->
> >>                return aggregate.add(value)
> >>            }
> >>    )
> >>            .toStream()
> >>            .map({ k, v ->
> >>        new KeyValue<>(k.window().end(), v)
> >>    })
> >>            .to('output')
> >>
> >>    def config = new Properties()
> >>    config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
> >>    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
> >>    config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> TimeUnit.SECONDS.toMillis(60))
> >>
> >>    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
> >>    kafkaStreams.start()
> >>
> >>
> > I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
> > 'latest' and 'earliest' but it didn't help.
> > Can you help me understand what I'm doing wrong?
> > Thank you.
>

Re: Problem with kafka-streams aggregate windowedBy

Posted by Patrik Kleindl <pk...@gmail.com>.
Hi
Does your applicationId change?
Best regards 
Patrik

> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <af...@gmail.com>:
> 
> Hi everyone! I use kafka-streams, and i have a problem when i use
> windowedBy. Everything works well until I restart the application. After
> restarting my aggregation starts from beginning.
> Code bellow:
>> 
>>    StreamsBuilder builder = new StreamsBuilder()
>>    KStream stream = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
>> 
>>    KTable table = stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
>>            .aggregate(
>>            { new AggregatorModel() },
>>            { key, value, aggregate ->
>>                return aggregate.add(value)
>>            }
>>    )
>>            .toStream()
>>            .map({ k, v ->
>>        new KeyValue<>(k.window().end(), v)
>>    })
>>            .to('output')
>> 
>>    def config = new Properties()
>>    config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
>>    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092')
>>    config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, TimeUnit.SECONDS.toMillis(60))
>> 
>>    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
>>    kafkaStreams.start()
>> 
>> 
> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set to
> 'latest' and 'earliest' but it didn't help.
> Can you help me understand what I'm doing wrong?
> Thank you.