You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Vincenzo D'Amore <v....@gmail.com> on 2019/01/27 23:48:27 UTC

Re: Problem with kafka-streams aggregate windowedBy

Hi Pavel, did you understood why do you have such strange behaviour?

On Tue, Oct 30, 2018 at 12:22 PM Pavel Koroliov <af...@gmail.com>
wrote:

> I'm sorry guy's. Aggregation works fine, but i've found new problem with
> *groupByKey()*. After restart application some aggregations starts from
> beginning, although this key already has aggregated data. And some
> aggregations continue to summarize data. This is very strange, I did not
> expect such behavior)))).
>
> вт, 30 окт. 2018 г. в 0:43, Matthias J. Sax <ma...@confluent.io>:
>
> > Make sure to call `KafkaStreams#close()` to get the latest offsets
> > committed.
> >
> > Beside this, you can check the consumer and Streams logs in DEBUG mode,
> > to see what offset is picked up (or not).
> >
> >
> > -Matthias
> >
> > On 10/29/18 11:43 AM, Patrik Kleindl wrote:
> > > Hi
> > > How long does your application run? More than the 60 seconds you set
> for
> > commit interval?
> > > Have a look at
> >
> https://sematext.com/opensee/m/Kafka/uyzND1SdDRMgROjn?subj=Re+Kafka+Streams+why+aren+t+offsets+being+committed+
> > > and check if your offsets are really comitted
> > > Best regards
> > > Patrik
> > >
> > >> Am 29.10.2018 um 18:20 schrieb Pavel Koroliov <af...@gmail.com>:
> > >>
> > >> Hi
> > >> No, my application id doesn't change
> > >>
> > >> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl <pk...@gmail.com>:
> > >>
> > >>> Hi
> > >>> Does your applicationId change?
> > >>> Best regards
> > >>> Patrik
> > >>>
> > >>>> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov <afgmeister@gmail.com
> >:
> > >>>>
> > >>>> Hi everyone! I use kafka-streams, and i have a problem when i use
> > >>>> windowedBy. Everything works well until I restart the application.
> > After
> > >>>> restarting my aggregation starts from beginning.
> > >>>> Code bellow:
> > >>>>>
> > >>>>>   StreamsBuilder builder = new StreamsBuilder()
> > >>>>>   KStream stream = builder.stream(topic,
> > >>> Consumed.with(Serdes.String(), Serdes.String()))
> > >>>>>
> > >>>>>   KTable table =
> > >>>
> >
> stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
> > >>>>>           .aggregate(
> > >>>>>           { new AggregatorModel() },
> > >>>>>           { key, value, aggregate ->
> > >>>>>               return aggregate.add(value)
> > >>>>>           }
> > >>>>>   )
> > >>>>>           .toStream()
> > >>>>>           .map({ k, v ->
> > >>>>>       new KeyValue<>(k.window().end(), v)
> > >>>>>   })
> > >>>>>           .to('output')
> > >>>>>
> > >>>>>   def config = new Properties()
> > >>>>>   config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
> > >>>>>   config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > 'localhost:9092')
> > >>>>>   config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> > >>> TimeUnit.SECONDS.toMillis(60))
> > >>>>>
> > >>>>>   KafkaStreams kafkaStreams = new KafkaStreams(builder.build(),
> > config)
> > >>>>>   kafkaStreams.start()
> > >>>>>
> > >>>>>
> > >>>> I've tried add to config ConsumerConfig.AUTO_OFFSET_RESET_CONFIG set
> > to
> > >>>> 'latest' and 'earliest' but it didn't help.
> > >>>> Can you help me understand what I'm doing wrong?
> > >>>> Thank you.
> > >>>
> > >
> >
> >
>


-- 
Vincenzo D'Amore