You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Alessandro Tagliapietra <ta...@gmail.com> on 2019/07/09 04:42:34 UTC

Re: Streams reprocessing whole topic when deployed but not locally

I think I'm having again this issue, this time though it only happens on
some state stores.

Here you can find the code and the logs
https://gist.github.com/alex88/1d60f63f0eee9f1568d89d5e1900fffc
We first seen that our confluent cloud bill went up 10x, then seen that our
streams processor was restarted 12 times (kubernetes pod), checking
confluent cloud usage it seems that the writes/reads went up from the usual
1-2 KB/s to 12-20 MB/s during app restarts.

I've then deployed a new version on a new container (no local store/state)
to see what happened:
 - first, it logs everything up to line 460 of the log file in the gist
 - at this point confluent cloud reports high read usage and the consumer
lag starts to increase, the app is accumulating messages behind
 - after a certain point, writes go up as well
 - when streams app transition to RUNNING state, the final aggregation
function resumes back to where it stopped (without reprocessing old data)
 - consumer lag goes back to 0

What makes me think it's re-reading everything are these lines:

Resetting offset for partition
myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2 to offset
20202847
Restoring task 0_2's state store KSTREAM-AGGREGATE-STATE-STORE-0000000004
from beginning of the changelog
myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2

At first I thought it's because I don't persist the aggregate store
changelog as I do with the "LastValueStore" store which has
"withLoggingEnabled()", but even that store has:

Resetting offset for partition myapp-id-LastValueStore-changelog-0 to
offset 403910
Restoring task 0_0's state store LastValueStore from beginning of the
changelog myapp-id-LastValueStore-changelog-0

Thank you everyone in advance

--
Alessandro Tagliapietra

On Thu, Jun 6, 2019 at 11:37 AM Alessandro Tagliapietra <
tagliapietra.alessandro@gmail.com> wrote:

> I'm not sure, one thing I know for sure is that on the cloud control
> panel, in the consumer lag page, the offset didn't reset on the input
> topic, so it was probably something after that.
>
> Anyway, thanks a lot for helping, if we experience that again I'll try to
> add more verbose logging to better understand what's going on.
>
> Have a great day
>
> --
> Alessandro Tagliapietra
>
>
> On Thu, Jun 6, 2019 at 11:27 AM Guozhang Wang <wa...@gmail.com> wrote:
>
>> Honestly I cannot think of an issue that fixed in 2.2.1 but not in 2.2.0
>> which could be correlated to your observations:
>>
>>
>> https://issues.apache.org/jira/issues/?filter=-1&jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%202.2.1%20AND%20component%20%3D%20streams%20order%20by%20updated%20DESC
>>
>> If you observed that on the cloud, both partitions of the source topic
>> gets
>> re-processed from the beginning, then it means the committed offsets were
>> somehow lost, and hence has to start reading the source topic from
>> scratch.
>> If this is a re-producible issue maybe there are some lurking things in
>> 2.2.0.
>>
>> On Thu, Jun 6, 2019 at 11:10 AM Alessandro Tagliapietra <
>> tagliapietra.alessandro@gmail.com> wrote:
>>
>> > Yes that's right,
>> >
>> > could that be the problem? Anyway, so far after upgrading to 2.2.1 from
>> > 2.2.0 we didn't experience that problem anymore.
>> >
>> > Regards
>> >
>> > --
>> > Alessandro Tagliapietra
>> >
>> >
>> > On Thu, Jun 6, 2019 at 10:50 AM Guozhang Wang <wa...@gmail.com>
>> wrote:
>> >
>> > > That's right, but local state is used as a "materialized view" of your
>> > > changelog topics: if you have nothing locally, then it has to
>> bootstrap
>> > > from the beginning of your changelog topic.
>> > >
>> > > But I think your question was about the source "sensors-input" topic,
>> not
>> > > the changelog topic. I looked at the logs from two runs, and it seems
>> > > locally your sensors-input has one partition, but on the cloud your
>> > > sensors-input has two partitions. Is that right?
>> > >
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Thu, Jun 6, 2019 at 10:23 AM Alessandro Tagliapietra <
>> > > tagliapietra.alessandro@gmail.com> wrote:
>> > >
>> > > > Isn't the windowing state stored in the additional state store
>> topics
>> > > that
>> > > > I had to additionally create?
>> > > >
>> > > > Like these I have here:
>> > > >
>> > > > sensors-pipeline-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
>> > > > sensors-pipeline-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog
>> > > >
>> > > > Thank you
>> > > >
>> > > > --
>> > > > Alessandro Tagliapietra
>> > > >
>> > > >
>> > > > On Thu, Jun 6, 2019 at 10:13 AM Guozhang Wang <wa...@gmail.com>
>> > > wrote:
>> > > >
>> > > > > If you deploy your streams app into a docker container, you'd
>> need to
>> > > > make
>> > > > > sure local state directories are preserved, since otherwise
>> whenever
>> > > you
>> > > > > restart all the state would be lost and Streams then has to
>> bootstrap
>> > > > from
>> > > > > scratch. E.g. if you are using K8s for cluster management, you'd
>> > better
>> > > > use
>> > > > > stateful sets to make sure local states are preserves across
>> > > > re-deployment.
>> > > > >
>> > > > >
>> > > > > Guozhang
>> > > > >
>> > > > > On Wed, Jun 5, 2019 at 4:52 PM Alessandro Tagliapietra <
>> > > > > tagliapietra.alessandro@gmail.com> wrote:
>> > > > >
>> > > > > > Hi Guozhang,
>> > > > > >
>> > > > > > sorry, by "app" i mean the stream processor app, the one shown
>> in
>> > > > > > pipeline.kt.
>> > > > > >
>> > > > > > The app reads a topic of data sent by a sensor each second and
>> > > > generates
>> > > > > a
>> > > > > > 20 second window output to another topic.
>> > > > > > My "problem" is that when running locally with my local kafka
>> > setup,
>> > > > > let's
>> > > > > > say I stop it and start it again, it continues processing the
>> last
>> > > > > window.
>> > > > > > When deploying the app into a docker container and using the
>> > > confluent
>> > > > > > cloud as broker, every time I restart the app it starts
>> processing
>> > > > again
>> > > > > > from the beginning of the input topic and generates again old
>> > windows
>> > > > it
>> > > > > > already processed.
>> > > > > >
>> > > > > > In the meantime I'm trying to upgrade to kafka 2.2.1 to see if I
>> > get
>> > > > any
>> > > > > > improvement.
>> > > > > >
>> > > > > > --
>> > > > > > Alessandro Tagliapietra
>> > > > > >
>> > > > > >
>> > > > > > On Wed, Jun 5, 2019 at 4:45 PM Guozhang Wang <
>> wangguoz@gmail.com>
>> > > > wrote:
>> > > > > >
>> > > > > > > Hello Alessandro,
>> > > > > > >
>> > > > > > > What did you do for `restarting the app online`? I'm not sure
>> I
>> > > > follow
>> > > > > > the
>> > > > > > > difference between "restart the streams app" and "restart the
>> app
>> > > > > online"
>> > > > > > > from your description.
>> > > > > > >
>> > > > > > >
>> > > > > > > Guozhang
>> > > > > > >
>> > > > > > >
>> > > > > > > On Wed, Jun 5, 2019 at 10:42 AM Alessandro Tagliapietra <
>> > > > > > > tagliapietra.alessandro@gmail.com> wrote:
>> > > > > > > >
>> > > > > > > > Hello everyone,
>> > > > > > > >
>> > > > > > > > I've a small streams app, the configuration and part of the
>> > code
>> > > > I'm
>> > > > > > > using
>> > > > > > > > can be found here
>> > > > > > > >
>> > https://gist.github.com/alex88/6b7b31c2b008817a24f63246557099bc
>> > > > > > > > There's also the log when the app is started locally and
>> when
>> > the
>> > > > app
>> > > > > > is
>> > > > > > > > started on our servers connecting to the confluent cloud
>> kafka
>> > > > > broker.
>> > > > > > > >
>> > > > > > > > The problem is that locally everything is working properly,
>> if
>> > I
>> > > > > > restart
>> > > > > > > > the streams app it just continues where it left, if I
>> restart
>> > the
>> > > > app
>> > > > > > > > online it reprocesses the whole topic.
>> > > > > > > >
>> > > > > > > > That shouldn't happen right?
>> > > > > > > >
>> > > > > > > > Thanks in advance
>> > > > > > > >
>> > > > > > > > --
>> > > > > > > > Alessandro Tagliapietra
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > --
>> > > > > > > -- Guozhang
>> > > > > > >
>> > > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > -- Guozhang
>> > > > >
>> > > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>>
>> --
>> -- Guozhang
>>
>

Re: Streams reprocessing whole topic when deployed but not locally

Posted by Alessandro Tagliapietra <ta...@gmail.com>.
Thanks Bruno and Patrik,

my fault was that since on confluent cloud auto creation is set to false,
I've manually created the topics, without taking care of changing the
cleanup policy.
Doing so the store changelogs kept the whole changes, in fact after
enabling the compacting policy the storage used went down from 25GB to 2GB

So I guess the stream was reading many and many GB to catch up with the
changes. I've also enabled persistence for the rocksdb local store after
watching
this talk
https://www.confluent.io/kafka-summit-sf18/deploying-kafka-streams-applications

After doing both things I think everything should be working fine now.

Thank you again for your help!

--
Alessandro Tagliapietra

On Wed, Jul 10, 2019 at 5:37 AM Patrik Kleindl <pk...@gmail.com> wrote:

> Hi
> Regarding the I/O, RocksDB has something called write amplification which
> writes the data to multiple levels internally to enable better optimization
> at the cost of storage and I/O.
> This is also the reason the stores can get larger than the topics
> themselves.
> This can be modified by RocksDB settings, the simplest of them being
> compression.
> Setting this to lz4 has helped us quite a lot.
>
> Regarding compacted topics, as the compaction does not happen immediately
> and has a rather high threshold by default this can take a while.
> This means your application might have to restore a lot more messages than
> you assume.
>
> Hope that helps
> Best regards
> Patrik
>
> > Am 10.07.2019 um 14:22 schrieb Bruno Cadonna <br...@confluent.io>:
> >
> > Hi Alessandro,
> >
> >> - how do I specify the retention period of the data? Just by setting the
> >> max retention time for the changelog topic?
> >
> > For window and session stores, you can set retention time on a local
> > state store by using Materialized.withRetention(...). Consult the
> > javadocs for details. If the changelog topic on the broker does not
> > yet exist, the retention time of the changelog topic will have the
> > same retention time as the local store.
> >
> > Additionally, you could try to set the retention time in the streams
> > configuration by using `StreamsConfig.TopicPrefix("retention.ms")`.
> >
> >> - wouldn't be possible, for example for my LastValueStore to compact the
> >> changelog and keep only the last value stored for each key? Because
> that's
> >> all I would need for my use case
> >
> > That is what changelogs do. They just keep the last value written.
> >
> >> - why is it also writing that much?
> >
> > Could it be that after restore of the state stores from the
> > changelogs, your stream app just starts working and there is a peek in
> > writes because the the app has a large lag initially? Maybe somebody
> > else has a better explanation.
> >
> > Best,
> > Bruno
> >
> > On Wed, Jul 10, 2019 at 12:39 AM Alessandro Tagliapietra
> > <ta...@gmail.com> wrote:
> >>
> >> I've just noticed that the store topic created automatically by our
> streams
> >> app have different cleanup.policy.
> >> I think that's the main reason I'm seeing that big read/write IO,
> having a
> >> compact policy instead of delete would make the topic much smaller.
> >> I'll try that to also see the impact on our storage usage.
> >>
> >> --
> >> Alessandro Tagliapietra
> >>
> >>
> >> On Tue, Jul 9, 2019 at 6:06 AM Alessandro Tagliapietra <
> >> tagliapietra.alessandro@gmail.com> wrote:
> >>
> >>> Hi Bruno,
> >>>
> >>> Oh I see, I'll try to add a persistent disk where the local stores are.
> >>> I've other questions then:
> >>> - why is it also writing that much?
> >>> - how do I specify the retention period of the data? Just by setting
> the
> >>> max retention time for the changelog topic?
> >>> - wouldn't be possible, for example for my LastValueStore to compact
> the
> >>> changelog and keep only the last value stored for each key? Because
> that's
> >>> all I would need for my use case
> >>>
> >>> Thank you very much for your help
> >>>
> >>>> On Tue, Jul 9, 2019, 4:00 AM Bruno Cadonna <br...@confluent.io>
> wrote:
> >>>>
> >>>> Hi Alessandro,
> >>>>
> >>>> I am not sure I understand your issue completely. If you start your
> >>>> streams app in a new container without any existing local state, then
> >>>> it is expected that the changelog topics are read from the beginning
> >>>> to restore the local state stores. Am I misunderstanding you?
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On Tue, Jul 9, 2019 at 6:42 AM Alessandro Tagliapietra
> >>>> <ta...@gmail.com> wrote:
> >>>>>
> >>>>> I think I'm having again this issue, this time though it only
> happens on
> >>>>> some state stores.
> >>>>>
> >>>>> Here you can find the code and the logs
> >>>>> https://gist.github.com/alex88/1d60f63f0eee9f1568d89d5e1900fffc
> >>>>> We first seen that our confluent cloud bill went up 10x, then seen
> that
> >>>> our
> >>>>> streams processor was restarted 12 times (kubernetes pod), checking
> >>>>> confluent cloud usage it seems that the writes/reads went up from the
> >>>> usual
> >>>>> 1-2 KB/s to 12-20 MB/s during app restarts.
> >>>>>
> >>>>> I've then deployed a new version on a new container (no local
> >>>> store/state)
> >>>>> to see what happened:
> >>>>> - first, it logs everything up to line 460 of the log file in the
> gist
> >>>>> - at this point confluent cloud reports high read usage and the
> >>>> consumer
> >>>>> lag starts to increase, the app is accumulating messages behind
> >>>>> - after a certain point, writes go up as well
> >>>>> - when streams app transition to RUNNING state, the final aggregation
> >>>>> function resumes back to where it stopped (without reprocessing old
> >>>> data)
> >>>>> - consumer lag goes back to 0
> >>>>>
> >>>>> What makes me think it's re-reading everything are these lines:
> >>>>>
> >>>>> Resetting offset for partition
> >>>>> myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2 to
> offset
> >>>>> 20202847
> >>>>> Restoring task 0_2's state store
> >>>> KSTREAM-AGGREGATE-STATE-STORE-0000000004
> >>>>> from beginning of the changelog
> >>>>> myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2
> >>>>>
> >>>>> At first I thought it's because I don't persist the aggregate store
> >>>>> changelog as I do with the "LastValueStore" store which has
> >>>>> "withLoggingEnabled()", but even that store has:
> >>>>>
> >>>>> Resetting offset for partition myapp-id-LastValueStore-changelog-0 to
> >>>>> offset 403910
> >>>>> Restoring task 0_0's state store LastValueStore from beginning of the
> >>>>> changelog myapp-id-LastValueStore-changelog-0
> >>>>>
> >>>>> Thank you everyone in advance
> >>>>>
> >>>>> --
> >>>>> Alessandro Tagliapietra
> >>>>>
> >>>>> On Thu, Jun 6, 2019 at 11:37 AM Alessandro Tagliapietra <
> >>>>> tagliapietra.alessandro@gmail.com> wrote:
> >>>>>
> >>>>>> I'm not sure, one thing I know for sure is that on the cloud control
> >>>>>> panel, in the consumer lag page, the offset didn't reset on the
> input
> >>>>>> topic, so it was probably something after that.
> >>>>>>
> >>>>>> Anyway, thanks a lot for helping, if we experience that again I'll
> >>>> try to
> >>>>>> add more verbose logging to better understand what's going on.
> >>>>>>
> >>>>>> Have a great day
> >>>>>>
> >>>>>> --
> >>>>>> Alessandro Tagliapietra
> >>>>>>
> >>>>>>
> >>>>>> On Thu, Jun 6, 2019 at 11:27 AM Guozhang Wang <wa...@gmail.com>
> >>>> wrote:
> >>>>>>
> >>>>>>> Honestly I cannot think of an issue that fixed in 2.2.1 but not in
> >>>> 2.2.0
> >>>>>>> which could be correlated to your observations:
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>
> https://issues.apache.org/jira/issues/?filter=-1&jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%202.2.1%20AND%20component%20%3D%20streams%20order%20by%20updated%20DESC
> >>>>>>>
> >>>>>>> If you observed that on the cloud, both partitions of the source
> >>>> topic
> >>>>>>> gets
> >>>>>>> re-processed from the beginning, then it means the committed
> offsets
> >>>> were
> >>>>>>> somehow lost, and hence has to start reading the source topic from
> >>>>>>> scratch.
> >>>>>>> If this is a re-producible issue maybe there are some lurking
> things
> >>>> in
> >>>>>>> 2.2.0.
> >>>>>>>
> >>>>>>> On Thu, Jun 6, 2019 at 11:10 AM Alessandro Tagliapietra <
> >>>>>>> tagliapietra.alessandro@gmail.com> wrote:
> >>>>>>>
> >>>>>>>> Yes that's right,
> >>>>>>>>
> >>>>>>>> could that be the problem? Anyway, so far after upgrading to 2.2.1
> >>>> from
> >>>>>>>> 2.2.0 we didn't experience that problem anymore.
> >>>>>>>>
> >>>>>>>> Regards
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Alessandro Tagliapietra
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, Jun 6, 2019 at 10:50 AM Guozhang Wang <wangguoz@gmail.com
> >
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> That's right, but local state is used as a "materialized view"
> >>>> of your
> >>>>>>>>> changelog topics: if you have nothing locally, then it has to
> >>>>>>> bootstrap
> >>>>>>>>> from the beginning of your changelog topic.
> >>>>>>>>>
> >>>>>>>>> But I think your question was about the source "sensors-input"
> >>>> topic,
> >>>>>>> not
> >>>>>>>>> the changelog topic. I looked at the logs from two runs, and it
> >>>> seems
> >>>>>>>>> locally your sensors-input has one partition, but on the cloud
> >>>> your
> >>>>>>>>> sensors-input has two partitions. Is that right?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, Jun 6, 2019 at 10:23 AM Alessandro Tagliapietra <
> >>>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
> >>>>>>>>>
> >>>>>>>>>> Isn't the windowing state stored in the additional state store
> >>>>>>> topics
> >>>>>>>>> that
> >>>>>>>>>> I had to additionally create?
> >>>>>>>>>>
> >>>>>>>>>> Like these I have here:
> >>>>>>>>>>
> >>>>>>>>>>
> >>>> sensors-pipeline-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
> >>>>>>>>>>
> >>>> sensors-pipeline-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog
> >>>>>>>>>>
> >>>>>>>>>> Thank you
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> Alessandro Tagliapietra
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Thu, Jun 6, 2019 at 10:13 AM Guozhang Wang <
> >>>> wangguoz@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> If you deploy your streams app into a docker container, you'd
> >>>>>>> need to
> >>>>>>>>>> make
> >>>>>>>>>>> sure local state directories are preserved, since otherwise
> >>>>>>> whenever
> >>>>>>>>> you
> >>>>>>>>>>> restart all the state would be lost and Streams then has to
> >>>>>>> bootstrap
> >>>>>>>>>> from
> >>>>>>>>>>> scratch. E.g. if you are using K8s for cluster management,
> >>>> you'd
> >>>>>>>> better
> >>>>>>>>>> use
> >>>>>>>>>>> stateful sets to make sure local states are preserves across
> >>>>>>>>>> re-deployment.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Jun 5, 2019 at 4:52 PM Alessandro Tagliapietra <
> >>>>>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Guozhang,
> >>>>>>>>>>>>
> >>>>>>>>>>>> sorry, by "app" i mean the stream processor app, the one
> >>>> shown
> >>>>>>> in
> >>>>>>>>>>>> pipeline.kt.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The app reads a topic of data sent by a sensor each second
> >>>> and
> >>>>>>>>>> generates
> >>>>>>>>>>> a
> >>>>>>>>>>>> 20 second window output to another topic.
> >>>>>>>>>>>> My "problem" is that when running locally with my local
> >>>> kafka
> >>>>>>>> setup,
> >>>>>>>>>>> let's
> >>>>>>>>>>>> say I stop it and start it again, it continues processing
> >>>> the
> >>>>>>> last
> >>>>>>>>>>> window.
> >>>>>>>>>>>> When deploying the app into a docker container and using
> >>>> the
> >>>>>>>>> confluent
> >>>>>>>>>>>> cloud as broker, every time I restart the app it starts
> >>>>>>> processing
> >>>>>>>>>> again
> >>>>>>>>>>>> from the beginning of the input topic and generates again
> >>>> old
> >>>>>>>> windows
> >>>>>>>>>> it
> >>>>>>>>>>>> already processed.
> >>>>>>>>>>>>
> >>>>>>>>>>>> In the meantime I'm trying to upgrade to kafka 2.2.1 to
> >>>> see if I
> >>>>>>>> get
> >>>>>>>>>> any
> >>>>>>>>>>>> improvement.
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>>>>>>>> Alessandro Tagliapietra
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Wed, Jun 5, 2019 at 4:45 PM Guozhang Wang <
> >>>>>>> wangguoz@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hello Alessandro,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> What did you do for `restarting the app online`? I'm not
> >>>> sure
> >>>>>>> I
> >>>>>>>>>> follow
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> difference between "restart the streams app" and
> >>>> "restart the
> >>>>>>> app
> >>>>>>>>>>> online"
> >>>>>>>>>>>>> from your description.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, Jun 5, 2019 at 10:42 AM Alessandro Tagliapietra <
> >>>>>>>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hello everyone,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I've a small streams app, the configuration and part
> >>>> of the
> >>>>>>>> code
> >>>>>>>>>> I'm
> >>>>>>>>>>>>> using
> >>>>>>>>>>>>>> can be found here
> >>>>>>>>>>>>>>
> >>>>>>>> https://gist.github.com/alex88/6b7b31c2b008817a24f63246557099bc
> >>>>>>>>>>>>>> There's also the log when the app is started locally
> >>>> and
> >>>>>>> when
> >>>>>>>> the
> >>>>>>>>>> app
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>> started on our servers connecting to the confluent
> >>>> cloud
> >>>>>>> kafka
> >>>>>>>>>>> broker.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The problem is that locally everything is working
> >>>> properly,
> >>>>>>> if
> >>>>>>>> I
> >>>>>>>>>>>> restart
> >>>>>>>>>>>>>> the streams app it just continues where it left, if I
> >>>>>>> restart
> >>>>>>>> the
> >>>>>>>>>> app
> >>>>>>>>>>>>>> online it reprocesses the whole topic.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> That shouldn't happen right?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks in advance
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> --
> >>>>>>>>>>>>>> Alessandro Tagliapietra
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> -- Guozhang
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>
> >>>>
> >>>
>

Re: Streams reprocessing whole topic when deployed but not locally

Posted by Patrik Kleindl <pk...@gmail.com>.
Hi
Regarding the I/O, RocksDB has something called write amplification which writes the data to multiple levels internally to enable better optimization at the cost of storage and I/O.
This is also the reason the stores can get larger than the topics themselves.
This can be modified by RocksDB settings, the simplest of them being compression.
Setting this to lz4 has helped us quite a lot.

Regarding compacted topics, as the compaction does not happen immediately and has a rather high threshold by default this can take a while.
This means your application might have to restore a lot more messages than you assume.

Hope that helps
Best regards 
Patrik 

> Am 10.07.2019 um 14:22 schrieb Bruno Cadonna <br...@confluent.io>:
> 
> Hi Alessandro,
> 
>> - how do I specify the retention period of the data? Just by setting the
>> max retention time for the changelog topic?
> 
> For window and session stores, you can set retention time on a local
> state store by using Materialized.withRetention(...). Consult the
> javadocs for details. If the changelog topic on the broker does not
> yet exist, the retention time of the changelog topic will have the
> same retention time as the local store.
> 
> Additionally, you could try to set the retention time in the streams
> configuration by using `StreamsConfig.TopicPrefix("retention.ms")`.
> 
>> - wouldn't be possible, for example for my LastValueStore to compact the
>> changelog and keep only the last value stored for each key? Because that's
>> all I would need for my use case
> 
> That is what changelogs do. They just keep the last value written.
> 
>> - why is it also writing that much?
> 
> Could it be that after restore of the state stores from the
> changelogs, your stream app just starts working and there is a peek in
> writes because the the app has a large lag initially? Maybe somebody
> else has a better explanation.
> 
> Best,
> Bruno
> 
> On Wed, Jul 10, 2019 at 12:39 AM Alessandro Tagliapietra
> <ta...@gmail.com> wrote:
>> 
>> I've just noticed that the store topic created automatically by our streams
>> app have different cleanup.policy.
>> I think that's the main reason I'm seeing that big read/write IO, having a
>> compact policy instead of delete would make the topic much smaller.
>> I'll try that to also see the impact on our storage usage.
>> 
>> --
>> Alessandro Tagliapietra
>> 
>> 
>> On Tue, Jul 9, 2019 at 6:06 AM Alessandro Tagliapietra <
>> tagliapietra.alessandro@gmail.com> wrote:
>> 
>>> Hi Bruno,
>>> 
>>> Oh I see, I'll try to add a persistent disk where the local stores are.
>>> I've other questions then:
>>> - why is it also writing that much?
>>> - how do I specify the retention period of the data? Just by setting the
>>> max retention time for the changelog topic?
>>> - wouldn't be possible, for example for my LastValueStore to compact the
>>> changelog and keep only the last value stored for each key? Because that's
>>> all I would need for my use case
>>> 
>>> Thank you very much for your help
>>> 
>>>> On Tue, Jul 9, 2019, 4:00 AM Bruno Cadonna <br...@confluent.io> wrote:
>>>> 
>>>> Hi Alessandro,
>>>> 
>>>> I am not sure I understand your issue completely. If you start your
>>>> streams app in a new container without any existing local state, then
>>>> it is expected that the changelog topics are read from the beginning
>>>> to restore the local state stores. Am I misunderstanding you?
>>>> 
>>>> Best,
>>>> Bruno
>>>> 
>>>> On Tue, Jul 9, 2019 at 6:42 AM Alessandro Tagliapietra
>>>> <ta...@gmail.com> wrote:
>>>>> 
>>>>> I think I'm having again this issue, this time though it only happens on
>>>>> some state stores.
>>>>> 
>>>>> Here you can find the code and the logs
>>>>> https://gist.github.com/alex88/1d60f63f0eee9f1568d89d5e1900fffc
>>>>> We first seen that our confluent cloud bill went up 10x, then seen that
>>>> our
>>>>> streams processor was restarted 12 times (kubernetes pod), checking
>>>>> confluent cloud usage it seems that the writes/reads went up from the
>>>> usual
>>>>> 1-2 KB/s to 12-20 MB/s during app restarts.
>>>>> 
>>>>> I've then deployed a new version on a new container (no local
>>>> store/state)
>>>>> to see what happened:
>>>>> - first, it logs everything up to line 460 of the log file in the gist
>>>>> - at this point confluent cloud reports high read usage and the
>>>> consumer
>>>>> lag starts to increase, the app is accumulating messages behind
>>>>> - after a certain point, writes go up as well
>>>>> - when streams app transition to RUNNING state, the final aggregation
>>>>> function resumes back to where it stopped (without reprocessing old
>>>> data)
>>>>> - consumer lag goes back to 0
>>>>> 
>>>>> What makes me think it's re-reading everything are these lines:
>>>>> 
>>>>> Resetting offset for partition
>>>>> myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2 to offset
>>>>> 20202847
>>>>> Restoring task 0_2's state store
>>>> KSTREAM-AGGREGATE-STATE-STORE-0000000004
>>>>> from beginning of the changelog
>>>>> myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2
>>>>> 
>>>>> At first I thought it's because I don't persist the aggregate store
>>>>> changelog as I do with the "LastValueStore" store which has
>>>>> "withLoggingEnabled()", but even that store has:
>>>>> 
>>>>> Resetting offset for partition myapp-id-LastValueStore-changelog-0 to
>>>>> offset 403910
>>>>> Restoring task 0_0's state store LastValueStore from beginning of the
>>>>> changelog myapp-id-LastValueStore-changelog-0
>>>>> 
>>>>> Thank you everyone in advance
>>>>> 
>>>>> --
>>>>> Alessandro Tagliapietra
>>>>> 
>>>>> On Thu, Jun 6, 2019 at 11:37 AM Alessandro Tagliapietra <
>>>>> tagliapietra.alessandro@gmail.com> wrote:
>>>>> 
>>>>>> I'm not sure, one thing I know for sure is that on the cloud control
>>>>>> panel, in the consumer lag page, the offset didn't reset on the input
>>>>>> topic, so it was probably something after that.
>>>>>> 
>>>>>> Anyway, thanks a lot for helping, if we experience that again I'll
>>>> try to
>>>>>> add more verbose logging to better understand what's going on.
>>>>>> 
>>>>>> Have a great day
>>>>>> 
>>>>>> --
>>>>>> Alessandro Tagliapietra
>>>>>> 
>>>>>> 
>>>>>> On Thu, Jun 6, 2019 at 11:27 AM Guozhang Wang <wa...@gmail.com>
>>>> wrote:
>>>>>> 
>>>>>>> Honestly I cannot think of an issue that fixed in 2.2.1 but not in
>>>> 2.2.0
>>>>>>> which could be correlated to your observations:
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>> https://issues.apache.org/jira/issues/?filter=-1&jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%202.2.1%20AND%20component%20%3D%20streams%20order%20by%20updated%20DESC
>>>>>>> 
>>>>>>> If you observed that on the cloud, both partitions of the source
>>>> topic
>>>>>>> gets
>>>>>>> re-processed from the beginning, then it means the committed offsets
>>>> were
>>>>>>> somehow lost, and hence has to start reading the source topic from
>>>>>>> scratch.
>>>>>>> If this is a re-producible issue maybe there are some lurking things
>>>> in
>>>>>>> 2.2.0.
>>>>>>> 
>>>>>>> On Thu, Jun 6, 2019 at 11:10 AM Alessandro Tagliapietra <
>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
>>>>>>> 
>>>>>>>> Yes that's right,
>>>>>>>> 
>>>>>>>> could that be the problem? Anyway, so far after upgrading to 2.2.1
>>>> from
>>>>>>>> 2.2.0 we didn't experience that problem anymore.
>>>>>>>> 
>>>>>>>> Regards
>>>>>>>> 
>>>>>>>> --
>>>>>>>> Alessandro Tagliapietra
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Thu, Jun 6, 2019 at 10:50 AM Guozhang Wang <wa...@gmail.com>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> That's right, but local state is used as a "materialized view"
>>>> of your
>>>>>>>>> changelog topics: if you have nothing locally, then it has to
>>>>>>> bootstrap
>>>>>>>>> from the beginning of your changelog topic.
>>>>>>>>> 
>>>>>>>>> But I think your question was about the source "sensors-input"
>>>> topic,
>>>>>>> not
>>>>>>>>> the changelog topic. I looked at the logs from two runs, and it
>>>> seems
>>>>>>>>> locally your sensors-input has one partition, but on the cloud
>>>> your
>>>>>>>>> sensors-input has two partitions. Is that right?
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Guozhang
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Thu, Jun 6, 2019 at 10:23 AM Alessandro Tagliapietra <
>>>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
>>>>>>>>> 
>>>>>>>>>> Isn't the windowing state stored in the additional state store
>>>>>>> topics
>>>>>>>>> that
>>>>>>>>>> I had to additionally create?
>>>>>>>>>> 
>>>>>>>>>> Like these I have here:
>>>>>>>>>> 
>>>>>>>>>> 
>>>> sensors-pipeline-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
>>>>>>>>>> 
>>>> sensors-pipeline-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog
>>>>>>>>>> 
>>>>>>>>>> Thank you
>>>>>>>>>> 
>>>>>>>>>> --
>>>>>>>>>> Alessandro Tagliapietra
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Thu, Jun 6, 2019 at 10:13 AM Guozhang Wang <
>>>> wangguoz@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> If you deploy your streams app into a docker container, you'd
>>>>>>> need to
>>>>>>>>>> make
>>>>>>>>>>> sure local state directories are preserved, since otherwise
>>>>>>> whenever
>>>>>>>>> you
>>>>>>>>>>> restart all the state would be lost and Streams then has to
>>>>>>> bootstrap
>>>>>>>>>> from
>>>>>>>>>>> scratch. E.g. if you are using K8s for cluster management,
>>>> you'd
>>>>>>>> better
>>>>>>>>>> use
>>>>>>>>>>> stateful sets to make sure local states are preserves across
>>>>>>>>>> re-deployment.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Guozhang
>>>>>>>>>>> 
>>>>>>>>>>> On Wed, Jun 5, 2019 at 4:52 PM Alessandro Tagliapietra <
>>>>>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi Guozhang,
>>>>>>>>>>>> 
>>>>>>>>>>>> sorry, by "app" i mean the stream processor app, the one
>>>> shown
>>>>>>> in
>>>>>>>>>>>> pipeline.kt.
>>>>>>>>>>>> 
>>>>>>>>>>>> The app reads a topic of data sent by a sensor each second
>>>> and
>>>>>>>>>> generates
>>>>>>>>>>> a
>>>>>>>>>>>> 20 second window output to another topic.
>>>>>>>>>>>> My "problem" is that when running locally with my local
>>>> kafka
>>>>>>>> setup,
>>>>>>>>>>> let's
>>>>>>>>>>>> say I stop it and start it again, it continues processing
>>>> the
>>>>>>> last
>>>>>>>>>>> window.
>>>>>>>>>>>> When deploying the app into a docker container and using
>>>> the
>>>>>>>>> confluent
>>>>>>>>>>>> cloud as broker, every time I restart the app it starts
>>>>>>> processing
>>>>>>>>>> again
>>>>>>>>>>>> from the beginning of the input topic and generates again
>>>> old
>>>>>>>> windows
>>>>>>>>>> it
>>>>>>>>>>>> already processed.
>>>>>>>>>>>> 
>>>>>>>>>>>> In the meantime I'm trying to upgrade to kafka 2.2.1 to
>>>> see if I
>>>>>>>> get
>>>>>>>>>> any
>>>>>>>>>>>> improvement.
>>>>>>>>>>>> 
>>>>>>>>>>>> --
>>>>>>>>>>>> Alessandro Tagliapietra
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Wed, Jun 5, 2019 at 4:45 PM Guozhang Wang <
>>>>>>> wangguoz@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hello Alessandro,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> What did you do for `restarting the app online`? I'm not
>>>> sure
>>>>>>> I
>>>>>>>>>> follow
>>>>>>>>>>>> the
>>>>>>>>>>>>> difference between "restart the streams app" and
>>>> "restart the
>>>>>>> app
>>>>>>>>>>> online"
>>>>>>>>>>>>> from your description.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Wed, Jun 5, 2019 at 10:42 AM Alessandro Tagliapietra <
>>>>>>>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hello everyone,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I've a small streams app, the configuration and part
>>>> of the
>>>>>>>> code
>>>>>>>>>> I'm
>>>>>>>>>>>>> using
>>>>>>>>>>>>>> can be found here
>>>>>>>>>>>>>> 
>>>>>>>> https://gist.github.com/alex88/6b7b31c2b008817a24f63246557099bc
>>>>>>>>>>>>>> There's also the log when the app is started locally
>>>> and
>>>>>>> when
>>>>>>>> the
>>>>>>>>>> app
>>>>>>>>>>>> is
>>>>>>>>>>>>>> started on our servers connecting to the confluent
>>>> cloud
>>>>>>> kafka
>>>>>>>>>>> broker.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> The problem is that locally everything is working
>>>> properly,
>>>>>>> if
>>>>>>>> I
>>>>>>>>>>>> restart
>>>>>>>>>>>>>> the streams app it just continues where it left, if I
>>>>>>> restart
>>>>>>>> the
>>>>>>>>>> app
>>>>>>>>>>>>>> online it reprocesses the whole topic.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> That shouldn't happen right?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks in advance
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Alessandro Tagliapietra
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> --
>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> -- Guozhang
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>> 
>>>>>> 
>>>> 
>>> 

Re: Streams reprocessing whole topic when deployed but not locally

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Alessandro,

>  - how do I specify the retention period of the data? Just by setting the
> max retention time for the changelog topic?

For window and session stores, you can set retention time on a local
state store by using Materialized.withRetention(...). Consult the
javadocs for details. If the changelog topic on the broker does not
yet exist, the retention time of the changelog topic will have the
same retention time as the local store.

Additionally, you could try to set the retention time in the streams
configuration by using `StreamsConfig.TopicPrefix("retention.ms")`.

>  - wouldn't be possible, for example for my LastValueStore to compact the
> changelog and keep only the last value stored for each key? Because that's
> all I would need for my use case

That is what changelogs do. They just keep the last value written.

>  - why is it also writing that much?

Could it be that after restore of the state stores from the
changelogs, your stream app just starts working and there is a peek in
writes because the the app has a large lag initially? Maybe somebody
else has a better explanation.

Best,
Bruno

On Wed, Jul 10, 2019 at 12:39 AM Alessandro Tagliapietra
<ta...@gmail.com> wrote:
>
> I've just noticed that the store topic created automatically by our streams
> app have different cleanup.policy.
> I think that's the main reason I'm seeing that big read/write IO, having a
> compact policy instead of delete would make the topic much smaller.
> I'll try that to also see the impact on our storage usage.
>
> --
> Alessandro Tagliapietra
>
>
> On Tue, Jul 9, 2019 at 6:06 AM Alessandro Tagliapietra <
> tagliapietra.alessandro@gmail.com> wrote:
>
> > Hi Bruno,
> >
> > Oh I see, I'll try to add a persistent disk where the local stores are.
> > I've other questions then:
> >  - why is it also writing that much?
> >  - how do I specify the retention period of the data? Just by setting the
> > max retention time for the changelog topic?
> >  - wouldn't be possible, for example for my LastValueStore to compact the
> > changelog and keep only the last value stored for each key? Because that's
> > all I would need for my use case
> >
> > Thank you very much for your help
> >
> > On Tue, Jul 9, 2019, 4:00 AM Bruno Cadonna <br...@confluent.io> wrote:
> >
> >> Hi Alessandro,
> >>
> >> I am not sure I understand your issue completely. If you start your
> >> streams app in a new container without any existing local state, then
> >> it is expected that the changelog topics are read from the beginning
> >> to restore the local state stores. Am I misunderstanding you?
> >>
> >> Best,
> >> Bruno
> >>
> >> On Tue, Jul 9, 2019 at 6:42 AM Alessandro Tagliapietra
> >> <ta...@gmail.com> wrote:
> >> >
> >> > I think I'm having again this issue, this time though it only happens on
> >> > some state stores.
> >> >
> >> > Here you can find the code and the logs
> >> > https://gist.github.com/alex88/1d60f63f0eee9f1568d89d5e1900fffc
> >> > We first seen that our confluent cloud bill went up 10x, then seen that
> >> our
> >> > streams processor was restarted 12 times (kubernetes pod), checking
> >> > confluent cloud usage it seems that the writes/reads went up from the
> >> usual
> >> > 1-2 KB/s to 12-20 MB/s during app restarts.
> >> >
> >> > I've then deployed a new version on a new container (no local
> >> store/state)
> >> > to see what happened:
> >> >  - first, it logs everything up to line 460 of the log file in the gist
> >> >  - at this point confluent cloud reports high read usage and the
> >> consumer
> >> > lag starts to increase, the app is accumulating messages behind
> >> >  - after a certain point, writes go up as well
> >> >  - when streams app transition to RUNNING state, the final aggregation
> >> > function resumes back to where it stopped (without reprocessing old
> >> data)
> >> >  - consumer lag goes back to 0
> >> >
> >> > What makes me think it's re-reading everything are these lines:
> >> >
> >> > Resetting offset for partition
> >> > myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2 to offset
> >> > 20202847
> >> > Restoring task 0_2's state store
> >> KSTREAM-AGGREGATE-STATE-STORE-0000000004
> >> > from beginning of the changelog
> >> > myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2
> >> >
> >> > At first I thought it's because I don't persist the aggregate store
> >> > changelog as I do with the "LastValueStore" store which has
> >> > "withLoggingEnabled()", but even that store has:
> >> >
> >> > Resetting offset for partition myapp-id-LastValueStore-changelog-0 to
> >> > offset 403910
> >> > Restoring task 0_0's state store LastValueStore from beginning of the
> >> > changelog myapp-id-LastValueStore-changelog-0
> >> >
> >> > Thank you everyone in advance
> >> >
> >> > --
> >> > Alessandro Tagliapietra
> >> >
> >> > On Thu, Jun 6, 2019 at 11:37 AM Alessandro Tagliapietra <
> >> > tagliapietra.alessandro@gmail.com> wrote:
> >> >
> >> > > I'm not sure, one thing I know for sure is that on the cloud control
> >> > > panel, in the consumer lag page, the offset didn't reset on the input
> >> > > topic, so it was probably something after that.
> >> > >
> >> > > Anyway, thanks a lot for helping, if we experience that again I'll
> >> try to
> >> > > add more verbose logging to better understand what's going on.
> >> > >
> >> > > Have a great day
> >> > >
> >> > > --
> >> > > Alessandro Tagliapietra
> >> > >
> >> > >
> >> > > On Thu, Jun 6, 2019 at 11:27 AM Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >> > >
> >> > >> Honestly I cannot think of an issue that fixed in 2.2.1 but not in
> >> 2.2.0
> >> > >> which could be correlated to your observations:
> >> > >>
> >> > >>
> >> > >>
> >> https://issues.apache.org/jira/issues/?filter=-1&jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%202.2.1%20AND%20component%20%3D%20streams%20order%20by%20updated%20DESC
> >> > >>
> >> > >> If you observed that on the cloud, both partitions of the source
> >> topic
> >> > >> gets
> >> > >> re-processed from the beginning, then it means the committed offsets
> >> were
> >> > >> somehow lost, and hence has to start reading the source topic from
> >> > >> scratch.
> >> > >> If this is a re-producible issue maybe there are some lurking things
> >> in
> >> > >> 2.2.0.
> >> > >>
> >> > >> On Thu, Jun 6, 2019 at 11:10 AM Alessandro Tagliapietra <
> >> > >> tagliapietra.alessandro@gmail.com> wrote:
> >> > >>
> >> > >> > Yes that's right,
> >> > >> >
> >> > >> > could that be the problem? Anyway, so far after upgrading to 2.2.1
> >> from
> >> > >> > 2.2.0 we didn't experience that problem anymore.
> >> > >> >
> >> > >> > Regards
> >> > >> >
> >> > >> > --
> >> > >> > Alessandro Tagliapietra
> >> > >> >
> >> > >> >
> >> > >> > On Thu, Jun 6, 2019 at 10:50 AM Guozhang Wang <wa...@gmail.com>
> >> > >> wrote:
> >> > >> >
> >> > >> > > That's right, but local state is used as a "materialized view"
> >> of your
> >> > >> > > changelog topics: if you have nothing locally, then it has to
> >> > >> bootstrap
> >> > >> > > from the beginning of your changelog topic.
> >> > >> > >
> >> > >> > > But I think your question was about the source "sensors-input"
> >> topic,
> >> > >> not
> >> > >> > > the changelog topic. I looked at the logs from two runs, and it
> >> seems
> >> > >> > > locally your sensors-input has one partition, but on the cloud
> >> your
> >> > >> > > sensors-input has two partitions. Is that right?
> >> > >> > >
> >> > >> > >
> >> > >> > > Guozhang
> >> > >> > >
> >> > >> > >
> >> > >> > > On Thu, Jun 6, 2019 at 10:23 AM Alessandro Tagliapietra <
> >> > >> > > tagliapietra.alessandro@gmail.com> wrote:
> >> > >> > >
> >> > >> > > > Isn't the windowing state stored in the additional state store
> >> > >> topics
> >> > >> > > that
> >> > >> > > > I had to additionally create?
> >> > >> > > >
> >> > >> > > > Like these I have here:
> >> > >> > > >
> >> > >> > > >
> >> sensors-pipeline-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
> >> > >> > > >
> >> sensors-pipeline-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog
> >> > >> > > >
> >> > >> > > > Thank you
> >> > >> > > >
> >> > >> > > > --
> >> > >> > > > Alessandro Tagliapietra
> >> > >> > > >
> >> > >> > > >
> >> > >> > > > On Thu, Jun 6, 2019 at 10:13 AM Guozhang Wang <
> >> wangguoz@gmail.com>
> >> > >> > > wrote:
> >> > >> > > >
> >> > >> > > > > If you deploy your streams app into a docker container, you'd
> >> > >> need to
> >> > >> > > > make
> >> > >> > > > > sure local state directories are preserved, since otherwise
> >> > >> whenever
> >> > >> > > you
> >> > >> > > > > restart all the state would be lost and Streams then has to
> >> > >> bootstrap
> >> > >> > > > from
> >> > >> > > > > scratch. E.g. if you are using K8s for cluster management,
> >> you'd
> >> > >> > better
> >> > >> > > > use
> >> > >> > > > > stateful sets to make sure local states are preserves across
> >> > >> > > > re-deployment.
> >> > >> > > > >
> >> > >> > > > >
> >> > >> > > > > Guozhang
> >> > >> > > > >
> >> > >> > > > > On Wed, Jun 5, 2019 at 4:52 PM Alessandro Tagliapietra <
> >> > >> > > > > tagliapietra.alessandro@gmail.com> wrote:
> >> > >> > > > >
> >> > >> > > > > > Hi Guozhang,
> >> > >> > > > > >
> >> > >> > > > > > sorry, by "app" i mean the stream processor app, the one
> >> shown
> >> > >> in
> >> > >> > > > > > pipeline.kt.
> >> > >> > > > > >
> >> > >> > > > > > The app reads a topic of data sent by a sensor each second
> >> and
> >> > >> > > > generates
> >> > >> > > > > a
> >> > >> > > > > > 20 second window output to another topic.
> >> > >> > > > > > My "problem" is that when running locally with my local
> >> kafka
> >> > >> > setup,
> >> > >> > > > > let's
> >> > >> > > > > > say I stop it and start it again, it continues processing
> >> the
> >> > >> last
> >> > >> > > > > window.
> >> > >> > > > > > When deploying the app into a docker container and using
> >> the
> >> > >> > > confluent
> >> > >> > > > > > cloud as broker, every time I restart the app it starts
> >> > >> processing
> >> > >> > > > again
> >> > >> > > > > > from the beginning of the input topic and generates again
> >> old
> >> > >> > windows
> >> > >> > > > it
> >> > >> > > > > > already processed.
> >> > >> > > > > >
> >> > >> > > > > > In the meantime I'm trying to upgrade to kafka 2.2.1 to
> >> see if I
> >> > >> > get
> >> > >> > > > any
> >> > >> > > > > > improvement.
> >> > >> > > > > >
> >> > >> > > > > > --
> >> > >> > > > > > Alessandro Tagliapietra
> >> > >> > > > > >
> >> > >> > > > > >
> >> > >> > > > > > On Wed, Jun 5, 2019 at 4:45 PM Guozhang Wang <
> >> > >> wangguoz@gmail.com>
> >> > >> > > > wrote:
> >> > >> > > > > >
> >> > >> > > > > > > Hello Alessandro,
> >> > >> > > > > > >
> >> > >> > > > > > > What did you do for `restarting the app online`? I'm not
> >> sure
> >> > >> I
> >> > >> > > > follow
> >> > >> > > > > > the
> >> > >> > > > > > > difference between "restart the streams app" and
> >> "restart the
> >> > >> app
> >> > >> > > > > online"
> >> > >> > > > > > > from your description.
> >> > >> > > > > > >
> >> > >> > > > > > >
> >> > >> > > > > > > Guozhang
> >> > >> > > > > > >
> >> > >> > > > > > >
> >> > >> > > > > > > On Wed, Jun 5, 2019 at 10:42 AM Alessandro Tagliapietra <
> >> > >> > > > > > > tagliapietra.alessandro@gmail.com> wrote:
> >> > >> > > > > > > >
> >> > >> > > > > > > > Hello everyone,
> >> > >> > > > > > > >
> >> > >> > > > > > > > I've a small streams app, the configuration and part
> >> of the
> >> > >> > code
> >> > >> > > > I'm
> >> > >> > > > > > > using
> >> > >> > > > > > > > can be found here
> >> > >> > > > > > > >
> >> > >> > https://gist.github.com/alex88/6b7b31c2b008817a24f63246557099bc
> >> > >> > > > > > > > There's also the log when the app is started locally
> >> and
> >> > >> when
> >> > >> > the
> >> > >> > > > app
> >> > >> > > > > > is
> >> > >> > > > > > > > started on our servers connecting to the confluent
> >> cloud
> >> > >> kafka
> >> > >> > > > > broker.
> >> > >> > > > > > > >
> >> > >> > > > > > > > The problem is that locally everything is working
> >> properly,
> >> > >> if
> >> > >> > I
> >> > >> > > > > > restart
> >> > >> > > > > > > > the streams app it just continues where it left, if I
> >> > >> restart
> >> > >> > the
> >> > >> > > > app
> >> > >> > > > > > > > online it reprocesses the whole topic.
> >> > >> > > > > > > >
> >> > >> > > > > > > > That shouldn't happen right?
> >> > >> > > > > > > >
> >> > >> > > > > > > > Thanks in advance
> >> > >> > > > > > > >
> >> > >> > > > > > > > --
> >> > >> > > > > > > > Alessandro Tagliapietra
> >> > >> > > > > > >
> >> > >> > > > > > >
> >> > >> > > > > > >
> >> > >> > > > > > > --
> >> > >> > > > > > > -- Guozhang
> >> > >> > > > > > >
> >> > >> > > > > >
> >> > >> > > > >
> >> > >> > > > >
> >> > >> > > > > --
> >> > >> > > > > -- Guozhang
> >> > >> > > > >
> >> > >> > > >
> >> > >> > >
> >> > >> > >
> >> > >> > > --
> >> > >> > > -- Guozhang
> >> > >> > >
> >> > >> >
> >> > >>
> >> > >>
> >> > >> --
> >> > >> -- Guozhang
> >> > >>
> >> > >
> >>
> >

Re: Streams reprocessing whole topic when deployed but not locally

Posted by Alessandro Tagliapietra <ta...@gmail.com>.
I've just noticed that the store topic created automatically by our streams
app have different cleanup.policy.
I think that's the main reason I'm seeing that big read/write IO, having a
compact policy instead of delete would make the topic much smaller.
I'll try that to also see the impact on our storage usage.

--
Alessandro Tagliapietra


On Tue, Jul 9, 2019 at 6:06 AM Alessandro Tagliapietra <
tagliapietra.alessandro@gmail.com> wrote:

> Hi Bruno,
>
> Oh I see, I'll try to add a persistent disk where the local stores are.
> I've other questions then:
>  - why is it also writing that much?
>  - how do I specify the retention period of the data? Just by setting the
> max retention time for the changelog topic?
>  - wouldn't be possible, for example for my LastValueStore to compact the
> changelog and keep only the last value stored for each key? Because that's
> all I would need for my use case
>
> Thank you very much for your help
>
> On Tue, Jul 9, 2019, 4:00 AM Bruno Cadonna <br...@confluent.io> wrote:
>
>> Hi Alessandro,
>>
>> I am not sure I understand your issue completely. If you start your
>> streams app in a new container without any existing local state, then
>> it is expected that the changelog topics are read from the beginning
>> to restore the local state stores. Am I misunderstanding you?
>>
>> Best,
>> Bruno
>>
>> On Tue, Jul 9, 2019 at 6:42 AM Alessandro Tagliapietra
>> <ta...@gmail.com> wrote:
>> >
>> > I think I'm having again this issue, this time though it only happens on
>> > some state stores.
>> >
>> > Here you can find the code and the logs
>> > https://gist.github.com/alex88/1d60f63f0eee9f1568d89d5e1900fffc
>> > We first seen that our confluent cloud bill went up 10x, then seen that
>> our
>> > streams processor was restarted 12 times (kubernetes pod), checking
>> > confluent cloud usage it seems that the writes/reads went up from the
>> usual
>> > 1-2 KB/s to 12-20 MB/s during app restarts.
>> >
>> > I've then deployed a new version on a new container (no local
>> store/state)
>> > to see what happened:
>> >  - first, it logs everything up to line 460 of the log file in the gist
>> >  - at this point confluent cloud reports high read usage and the
>> consumer
>> > lag starts to increase, the app is accumulating messages behind
>> >  - after a certain point, writes go up as well
>> >  - when streams app transition to RUNNING state, the final aggregation
>> > function resumes back to where it stopped (without reprocessing old
>> data)
>> >  - consumer lag goes back to 0
>> >
>> > What makes me think it's re-reading everything are these lines:
>> >
>> > Resetting offset for partition
>> > myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2 to offset
>> > 20202847
>> > Restoring task 0_2's state store
>> KSTREAM-AGGREGATE-STATE-STORE-0000000004
>> > from beginning of the changelog
>> > myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2
>> >
>> > At first I thought it's because I don't persist the aggregate store
>> > changelog as I do with the "LastValueStore" store which has
>> > "withLoggingEnabled()", but even that store has:
>> >
>> > Resetting offset for partition myapp-id-LastValueStore-changelog-0 to
>> > offset 403910
>> > Restoring task 0_0's state store LastValueStore from beginning of the
>> > changelog myapp-id-LastValueStore-changelog-0
>> >
>> > Thank you everyone in advance
>> >
>> > --
>> > Alessandro Tagliapietra
>> >
>> > On Thu, Jun 6, 2019 at 11:37 AM Alessandro Tagliapietra <
>> > tagliapietra.alessandro@gmail.com> wrote:
>> >
>> > > I'm not sure, one thing I know for sure is that on the cloud control
>> > > panel, in the consumer lag page, the offset didn't reset on the input
>> > > topic, so it was probably something after that.
>> > >
>> > > Anyway, thanks a lot for helping, if we experience that again I'll
>> try to
>> > > add more verbose logging to better understand what's going on.
>> > >
>> > > Have a great day
>> > >
>> > > --
>> > > Alessandro Tagliapietra
>> > >
>> > >
>> > > On Thu, Jun 6, 2019 at 11:27 AM Guozhang Wang <wa...@gmail.com>
>> wrote:
>> > >
>> > >> Honestly I cannot think of an issue that fixed in 2.2.1 but not in
>> 2.2.0
>> > >> which could be correlated to your observations:
>> > >>
>> > >>
>> > >>
>> https://issues.apache.org/jira/issues/?filter=-1&jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%202.2.1%20AND%20component%20%3D%20streams%20order%20by%20updated%20DESC
>> > >>
>> > >> If you observed that on the cloud, both partitions of the source
>> topic
>> > >> gets
>> > >> re-processed from the beginning, then it means the committed offsets
>> were
>> > >> somehow lost, and hence has to start reading the source topic from
>> > >> scratch.
>> > >> If this is a re-producible issue maybe there are some lurking things
>> in
>> > >> 2.2.0.
>> > >>
>> > >> On Thu, Jun 6, 2019 at 11:10 AM Alessandro Tagliapietra <
>> > >> tagliapietra.alessandro@gmail.com> wrote:
>> > >>
>> > >> > Yes that's right,
>> > >> >
>> > >> > could that be the problem? Anyway, so far after upgrading to 2.2.1
>> from
>> > >> > 2.2.0 we didn't experience that problem anymore.
>> > >> >
>> > >> > Regards
>> > >> >
>> > >> > --
>> > >> > Alessandro Tagliapietra
>> > >> >
>> > >> >
>> > >> > On Thu, Jun 6, 2019 at 10:50 AM Guozhang Wang <wa...@gmail.com>
>> > >> wrote:
>> > >> >
>> > >> > > That's right, but local state is used as a "materialized view"
>> of your
>> > >> > > changelog topics: if you have nothing locally, then it has to
>> > >> bootstrap
>> > >> > > from the beginning of your changelog topic.
>> > >> > >
>> > >> > > But I think your question was about the source "sensors-input"
>> topic,
>> > >> not
>> > >> > > the changelog topic. I looked at the logs from two runs, and it
>> seems
>> > >> > > locally your sensors-input has one partition, but on the cloud
>> your
>> > >> > > sensors-input has two partitions. Is that right?
>> > >> > >
>> > >> > >
>> > >> > > Guozhang
>> > >> > >
>> > >> > >
>> > >> > > On Thu, Jun 6, 2019 at 10:23 AM Alessandro Tagliapietra <
>> > >> > > tagliapietra.alessandro@gmail.com> wrote:
>> > >> > >
>> > >> > > > Isn't the windowing state stored in the additional state store
>> > >> topics
>> > >> > > that
>> > >> > > > I had to additionally create?
>> > >> > > >
>> > >> > > > Like these I have here:
>> > >> > > >
>> > >> > > >
>> sensors-pipeline-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
>> > >> > > >
>> sensors-pipeline-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog
>> > >> > > >
>> > >> > > > Thank you
>> > >> > > >
>> > >> > > > --
>> > >> > > > Alessandro Tagliapietra
>> > >> > > >
>> > >> > > >
>> > >> > > > On Thu, Jun 6, 2019 at 10:13 AM Guozhang Wang <
>> wangguoz@gmail.com>
>> > >> > > wrote:
>> > >> > > >
>> > >> > > > > If you deploy your streams app into a docker container, you'd
>> > >> need to
>> > >> > > > make
>> > >> > > > > sure local state directories are preserved, since otherwise
>> > >> whenever
>> > >> > > you
>> > >> > > > > restart all the state would be lost and Streams then has to
>> > >> bootstrap
>> > >> > > > from
>> > >> > > > > scratch. E.g. if you are using K8s for cluster management,
>> you'd
>> > >> > better
>> > >> > > > use
>> > >> > > > > stateful sets to make sure local states are preserves across
>> > >> > > > re-deployment.
>> > >> > > > >
>> > >> > > > >
>> > >> > > > > Guozhang
>> > >> > > > >
>> > >> > > > > On Wed, Jun 5, 2019 at 4:52 PM Alessandro Tagliapietra <
>> > >> > > > > tagliapietra.alessandro@gmail.com> wrote:
>> > >> > > > >
>> > >> > > > > > Hi Guozhang,
>> > >> > > > > >
>> > >> > > > > > sorry, by "app" i mean the stream processor app, the one
>> shown
>> > >> in
>> > >> > > > > > pipeline.kt.
>> > >> > > > > >
>> > >> > > > > > The app reads a topic of data sent by a sensor each second
>> and
>> > >> > > > generates
>> > >> > > > > a
>> > >> > > > > > 20 second window output to another topic.
>> > >> > > > > > My "problem" is that when running locally with my local
>> kafka
>> > >> > setup,
>> > >> > > > > let's
>> > >> > > > > > say I stop it and start it again, it continues processing
>> the
>> > >> last
>> > >> > > > > window.
>> > >> > > > > > When deploying the app into a docker container and using
>> the
>> > >> > > confluent
>> > >> > > > > > cloud as broker, every time I restart the app it starts
>> > >> processing
>> > >> > > > again
>> > >> > > > > > from the beginning of the input topic and generates again
>> old
>> > >> > windows
>> > >> > > > it
>> > >> > > > > > already processed.
>> > >> > > > > >
>> > >> > > > > > In the meantime I'm trying to upgrade to kafka 2.2.1 to
>> see if I
>> > >> > get
>> > >> > > > any
>> > >> > > > > > improvement.
>> > >> > > > > >
>> > >> > > > > > --
>> > >> > > > > > Alessandro Tagliapietra
>> > >> > > > > >
>> > >> > > > > >
>> > >> > > > > > On Wed, Jun 5, 2019 at 4:45 PM Guozhang Wang <
>> > >> wangguoz@gmail.com>
>> > >> > > > wrote:
>> > >> > > > > >
>> > >> > > > > > > Hello Alessandro,
>> > >> > > > > > >
>> > >> > > > > > > What did you do for `restarting the app online`? I'm not
>> sure
>> > >> I
>> > >> > > > follow
>> > >> > > > > > the
>> > >> > > > > > > difference between "restart the streams app" and
>> "restart the
>> > >> app
>> > >> > > > > online"
>> > >> > > > > > > from your description.
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > > Guozhang
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > > On Wed, Jun 5, 2019 at 10:42 AM Alessandro Tagliapietra <
>> > >> > > > > > > tagliapietra.alessandro@gmail.com> wrote:
>> > >> > > > > > > >
>> > >> > > > > > > > Hello everyone,
>> > >> > > > > > > >
>> > >> > > > > > > > I've a small streams app, the configuration and part
>> of the
>> > >> > code
>> > >> > > > I'm
>> > >> > > > > > > using
>> > >> > > > > > > > can be found here
>> > >> > > > > > > >
>> > >> > https://gist.github.com/alex88/6b7b31c2b008817a24f63246557099bc
>> > >> > > > > > > > There's also the log when the app is started locally
>> and
>> > >> when
>> > >> > the
>> > >> > > > app
>> > >> > > > > > is
>> > >> > > > > > > > started on our servers connecting to the confluent
>> cloud
>> > >> kafka
>> > >> > > > > broker.
>> > >> > > > > > > >
>> > >> > > > > > > > The problem is that locally everything is working
>> properly,
>> > >> if
>> > >> > I
>> > >> > > > > > restart
>> > >> > > > > > > > the streams app it just continues where it left, if I
>> > >> restart
>> > >> > the
>> > >> > > > app
>> > >> > > > > > > > online it reprocesses the whole topic.
>> > >> > > > > > > >
>> > >> > > > > > > > That shouldn't happen right?
>> > >> > > > > > > >
>> > >> > > > > > > > Thanks in advance
>> > >> > > > > > > >
>> > >> > > > > > > > --
>> > >> > > > > > > > Alessandro Tagliapietra
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > > --
>> > >> > > > > > > -- Guozhang
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > > >
>> > >> > > > > --
>> > >> > > > > -- Guozhang
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> > >
>> > >> > > --
>> > >> > > -- Guozhang
>> > >> > >
>> > >> >
>> > >>
>> > >>
>> > >> --
>> > >> -- Guozhang
>> > >>
>> > >
>>
>

Re: Streams reprocessing whole topic when deployed but not locally

Posted by Alessandro Tagliapietra <ta...@gmail.com>.
Hi Bruno,

Oh I see, I'll try to add a persistent disk where the local stores are.
I've other questions then:
 - why is it also writing that much?
 - how do I specify the retention period of the data? Just by setting the
max retention time for the changelog topic?
 - wouldn't be possible, for example for my LastValueStore to compact the
changelog and keep only the last value stored for each key? Because that's
all I would need for my use case

Thank you very much for your help

On Tue, Jul 9, 2019, 4:00 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Alessandro,
>
> I am not sure I understand your issue completely. If you start your
> streams app in a new container without any existing local state, then
> it is expected that the changelog topics are read from the beginning
> to restore the local state stores. Am I misunderstanding you?
>
> Best,
> Bruno
>
> On Tue, Jul 9, 2019 at 6:42 AM Alessandro Tagliapietra
> <ta...@gmail.com> wrote:
> >
> > I think I'm having again this issue, this time though it only happens on
> > some state stores.
> >
> > Here you can find the code and the logs
> > https://gist.github.com/alex88/1d60f63f0eee9f1568d89d5e1900fffc
> > We first seen that our confluent cloud bill went up 10x, then seen that
> our
> > streams processor was restarted 12 times (kubernetes pod), checking
> > confluent cloud usage it seems that the writes/reads went up from the
> usual
> > 1-2 KB/s to 12-20 MB/s during app restarts.
> >
> > I've then deployed a new version on a new container (no local
> store/state)
> > to see what happened:
> >  - first, it logs everything up to line 460 of the log file in the gist
> >  - at this point confluent cloud reports high read usage and the consumer
> > lag starts to increase, the app is accumulating messages behind
> >  - after a certain point, writes go up as well
> >  - when streams app transition to RUNNING state, the final aggregation
> > function resumes back to where it stopped (without reprocessing old data)
> >  - consumer lag goes back to 0
> >
> > What makes me think it's re-reading everything are these lines:
> >
> > Resetting offset for partition
> > myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2 to offset
> > 20202847
> > Restoring task 0_2's state store KSTREAM-AGGREGATE-STATE-STORE-0000000004
> > from beginning of the changelog
> > myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2
> >
> > At first I thought it's because I don't persist the aggregate store
> > changelog as I do with the "LastValueStore" store which has
> > "withLoggingEnabled()", but even that store has:
> >
> > Resetting offset for partition myapp-id-LastValueStore-changelog-0 to
> > offset 403910
> > Restoring task 0_0's state store LastValueStore from beginning of the
> > changelog myapp-id-LastValueStore-changelog-0
> >
> > Thank you everyone in advance
> >
> > --
> > Alessandro Tagliapietra
> >
> > On Thu, Jun 6, 2019 at 11:37 AM Alessandro Tagliapietra <
> > tagliapietra.alessandro@gmail.com> wrote:
> >
> > > I'm not sure, one thing I know for sure is that on the cloud control
> > > panel, in the consumer lag page, the offset didn't reset on the input
> > > topic, so it was probably something after that.
> > >
> > > Anyway, thanks a lot for helping, if we experience that again I'll try
> to
> > > add more verbose logging to better understand what's going on.
> > >
> > > Have a great day
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > >
> > > On Thu, Jun 6, 2019 at 11:27 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> > >
> > >> Honestly I cannot think of an issue that fixed in 2.2.1 but not in
> 2.2.0
> > >> which could be correlated to your observations:
> > >>
> > >>
> > >>
> https://issues.apache.org/jira/issues/?filter=-1&jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%202.2.1%20AND%20component%20%3D%20streams%20order%20by%20updated%20DESC
> > >>
> > >> If you observed that on the cloud, both partitions of the source topic
> > >> gets
> > >> re-processed from the beginning, then it means the committed offsets
> were
> > >> somehow lost, and hence has to start reading the source topic from
> > >> scratch.
> > >> If this is a re-producible issue maybe there are some lurking things
> in
> > >> 2.2.0.
> > >>
> > >> On Thu, Jun 6, 2019 at 11:10 AM Alessandro Tagliapietra <
> > >> tagliapietra.alessandro@gmail.com> wrote:
> > >>
> > >> > Yes that's right,
> > >> >
> > >> > could that be the problem? Anyway, so far after upgrading to 2.2.1
> from
> > >> > 2.2.0 we didn't experience that problem anymore.
> > >> >
> > >> > Regards
> > >> >
> > >> > --
> > >> > Alessandro Tagliapietra
> > >> >
> > >> >
> > >> > On Thu, Jun 6, 2019 at 10:50 AM Guozhang Wang <wa...@gmail.com>
> > >> wrote:
> > >> >
> > >> > > That's right, but local state is used as a "materialized view" of
> your
> > >> > > changelog topics: if you have nothing locally, then it has to
> > >> bootstrap
> > >> > > from the beginning of your changelog topic.
> > >> > >
> > >> > > But I think your question was about the source "sensors-input"
> topic,
> > >> not
> > >> > > the changelog topic. I looked at the logs from two runs, and it
> seems
> > >> > > locally your sensors-input has one partition, but on the cloud
> your
> > >> > > sensors-input has two partitions. Is that right?
> > >> > >
> > >> > >
> > >> > > Guozhang
> > >> > >
> > >> > >
> > >> > > On Thu, Jun 6, 2019 at 10:23 AM Alessandro Tagliapietra <
> > >> > > tagliapietra.alessandro@gmail.com> wrote:
> > >> > >
> > >> > > > Isn't the windowing state stored in the additional state store
> > >> topics
> > >> > > that
> > >> > > > I had to additionally create?
> > >> > > >
> > >> > > > Like these I have here:
> > >> > > >
> > >> > > >
> sensors-pipeline-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
> > >> > > >
> sensors-pipeline-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog
> > >> > > >
> > >> > > > Thank you
> > >> > > >
> > >> > > > --
> > >> > > > Alessandro Tagliapietra
> > >> > > >
> > >> > > >
> > >> > > > On Thu, Jun 6, 2019 at 10:13 AM Guozhang Wang <
> wangguoz@gmail.com>
> > >> > > wrote:
> > >> > > >
> > >> > > > > If you deploy your streams app into a docker container, you'd
> > >> need to
> > >> > > > make
> > >> > > > > sure local state directories are preserved, since otherwise
> > >> whenever
> > >> > > you
> > >> > > > > restart all the state would be lost and Streams then has to
> > >> bootstrap
> > >> > > > from
> > >> > > > > scratch. E.g. if you are using K8s for cluster management,
> you'd
> > >> > better
> > >> > > > use
> > >> > > > > stateful sets to make sure local states are preserves across
> > >> > > > re-deployment.
> > >> > > > >
> > >> > > > >
> > >> > > > > Guozhang
> > >> > > > >
> > >> > > > > On Wed, Jun 5, 2019 at 4:52 PM Alessandro Tagliapietra <
> > >> > > > > tagliapietra.alessandro@gmail.com> wrote:
> > >> > > > >
> > >> > > > > > Hi Guozhang,
> > >> > > > > >
> > >> > > > > > sorry, by "app" i mean the stream processor app, the one
> shown
> > >> in
> > >> > > > > > pipeline.kt.
> > >> > > > > >
> > >> > > > > > The app reads a topic of data sent by a sensor each second
> and
> > >> > > > generates
> > >> > > > > a
> > >> > > > > > 20 second window output to another topic.
> > >> > > > > > My "problem" is that when running locally with my local
> kafka
> > >> > setup,
> > >> > > > > let's
> > >> > > > > > say I stop it and start it again, it continues processing
> the
> > >> last
> > >> > > > > window.
> > >> > > > > > When deploying the app into a docker container and using the
> > >> > > confluent
> > >> > > > > > cloud as broker, every time I restart the app it starts
> > >> processing
> > >> > > > again
> > >> > > > > > from the beginning of the input topic and generates again
> old
> > >> > windows
> > >> > > > it
> > >> > > > > > already processed.
> > >> > > > > >
> > >> > > > > > In the meantime I'm trying to upgrade to kafka 2.2.1 to see
> if I
> > >> > get
> > >> > > > any
> > >> > > > > > improvement.
> > >> > > > > >
> > >> > > > > > --
> > >> > > > > > Alessandro Tagliapietra
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Wed, Jun 5, 2019 at 4:45 PM Guozhang Wang <
> > >> wangguoz@gmail.com>
> > >> > > > wrote:
> > >> > > > > >
> > >> > > > > > > Hello Alessandro,
> > >> > > > > > >
> > >> > > > > > > What did you do for `restarting the app online`? I'm not
> sure
> > >> I
> > >> > > > follow
> > >> > > > > > the
> > >> > > > > > > difference between "restart the streams app" and "restart
> the
> > >> app
> > >> > > > > online"
> > >> > > > > > > from your description.
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > Guozhang
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > On Wed, Jun 5, 2019 at 10:42 AM Alessandro Tagliapietra <
> > >> > > > > > > tagliapietra.alessandro@gmail.com> wrote:
> > >> > > > > > > >
> > >> > > > > > > > Hello everyone,
> > >> > > > > > > >
> > >> > > > > > > > I've a small streams app, the configuration and part of
> the
> > >> > code
> > >> > > > I'm
> > >> > > > > > > using
> > >> > > > > > > > can be found here
> > >> > > > > > > >
> > >> > https://gist.github.com/alex88/6b7b31c2b008817a24f63246557099bc
> > >> > > > > > > > There's also the log when the app is started locally and
> > >> when
> > >> > the
> > >> > > > app
> > >> > > > > > is
> > >> > > > > > > > started on our servers connecting to the confluent cloud
> > >> kafka
> > >> > > > > broker.
> > >> > > > > > > >
> > >> > > > > > > > The problem is that locally everything is working
> properly,
> > >> if
> > >> > I
> > >> > > > > > restart
> > >> > > > > > > > the streams app it just continues where it left, if I
> > >> restart
> > >> > the
> > >> > > > app
> > >> > > > > > > > online it reprocesses the whole topic.
> > >> > > > > > > >
> > >> > > > > > > > That shouldn't happen right?
> > >> > > > > > > >
> > >> > > > > > > > Thanks in advance
> > >> > > > > > > >
> > >> > > > > > > > --
> > >> > > > > > > > Alessandro Tagliapietra
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > --
> > >> > > > > > > -- Guozhang
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > --
> > >> > > > > -- Guozhang
> > >> > > > >
> > >> > > >
> > >> > >
> > >> > >
> > >> > > --
> > >> > > -- Guozhang
> > >> > >
> > >> >
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
>

Re: Streams reprocessing whole topic when deployed but not locally

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Alessandro,

I am not sure I understand your issue completely. If you start your
streams app in a new container without any existing local state, then
it is expected that the changelog topics are read from the beginning
to restore the local state stores. Am I misunderstanding you?

Best,
Bruno

On Tue, Jul 9, 2019 at 6:42 AM Alessandro Tagliapietra
<ta...@gmail.com> wrote:
>
> I think I'm having again this issue, this time though it only happens on
> some state stores.
>
> Here you can find the code and the logs
> https://gist.github.com/alex88/1d60f63f0eee9f1568d89d5e1900fffc
> We first seen that our confluent cloud bill went up 10x, then seen that our
> streams processor was restarted 12 times (kubernetes pod), checking
> confluent cloud usage it seems that the writes/reads went up from the usual
> 1-2 KB/s to 12-20 MB/s during app restarts.
>
> I've then deployed a new version on a new container (no local store/state)
> to see what happened:
>  - first, it logs everything up to line 460 of the log file in the gist
>  - at this point confluent cloud reports high read usage and the consumer
> lag starts to increase, the app is accumulating messages behind
>  - after a certain point, writes go up as well
>  - when streams app transition to RUNNING state, the final aggregation
> function resumes back to where it stopped (without reprocessing old data)
>  - consumer lag goes back to 0
>
> What makes me think it's re-reading everything are these lines:
>
> Resetting offset for partition
> myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2 to offset
> 20202847
> Restoring task 0_2's state store KSTREAM-AGGREGATE-STATE-STORE-0000000004
> from beginning of the changelog
> myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2
>
> At first I thought it's because I don't persist the aggregate store
> changelog as I do with the "LastValueStore" store which has
> "withLoggingEnabled()", but even that store has:
>
> Resetting offset for partition myapp-id-LastValueStore-changelog-0 to
> offset 403910
> Restoring task 0_0's state store LastValueStore from beginning of the
> changelog myapp-id-LastValueStore-changelog-0
>
> Thank you everyone in advance
>
> --
> Alessandro Tagliapietra
>
> On Thu, Jun 6, 2019 at 11:37 AM Alessandro Tagliapietra <
> tagliapietra.alessandro@gmail.com> wrote:
>
> > I'm not sure, one thing I know for sure is that on the cloud control
> > panel, in the consumer lag page, the offset didn't reset on the input
> > topic, so it was probably something after that.
> >
> > Anyway, thanks a lot for helping, if we experience that again I'll try to
> > add more verbose logging to better understand what's going on.
> >
> > Have a great day
> >
> > --
> > Alessandro Tagliapietra
> >
> >
> > On Thu, Jun 6, 2019 at 11:27 AM Guozhang Wang <wa...@gmail.com> wrote:
> >
> >> Honestly I cannot think of an issue that fixed in 2.2.1 but not in 2.2.0
> >> which could be correlated to your observations:
> >>
> >>
> >> https://issues.apache.org/jira/issues/?filter=-1&jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%202.2.1%20AND%20component%20%3D%20streams%20order%20by%20updated%20DESC
> >>
> >> If you observed that on the cloud, both partitions of the source topic
> >> gets
> >> re-processed from the beginning, then it means the committed offsets were
> >> somehow lost, and hence has to start reading the source topic from
> >> scratch.
> >> If this is a re-producible issue maybe there are some lurking things in
> >> 2.2.0.
> >>
> >> On Thu, Jun 6, 2019 at 11:10 AM Alessandro Tagliapietra <
> >> tagliapietra.alessandro@gmail.com> wrote:
> >>
> >> > Yes that's right,
> >> >
> >> > could that be the problem? Anyway, so far after upgrading to 2.2.1 from
> >> > 2.2.0 we didn't experience that problem anymore.
> >> >
> >> > Regards
> >> >
> >> > --
> >> > Alessandro Tagliapietra
> >> >
> >> >
> >> > On Thu, Jun 6, 2019 at 10:50 AM Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >> >
> >> > > That's right, but local state is used as a "materialized view" of your
> >> > > changelog topics: if you have nothing locally, then it has to
> >> bootstrap
> >> > > from the beginning of your changelog topic.
> >> > >
> >> > > But I think your question was about the source "sensors-input" topic,
> >> not
> >> > > the changelog topic. I looked at the logs from two runs, and it seems
> >> > > locally your sensors-input has one partition, but on the cloud your
> >> > > sensors-input has two partitions. Is that right?
> >> > >
> >> > >
> >> > > Guozhang
> >> > >
> >> > >
> >> > > On Thu, Jun 6, 2019 at 10:23 AM Alessandro Tagliapietra <
> >> > > tagliapietra.alessandro@gmail.com> wrote:
> >> > >
> >> > > > Isn't the windowing state stored in the additional state store
> >> topics
> >> > > that
> >> > > > I had to additionally create?
> >> > > >
> >> > > > Like these I have here:
> >> > > >
> >> > > > sensors-pipeline-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
> >> > > > sensors-pipeline-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog
> >> > > >
> >> > > > Thank you
> >> > > >
> >> > > > --
> >> > > > Alessandro Tagliapietra
> >> > > >
> >> > > >
> >> > > > On Thu, Jun 6, 2019 at 10:13 AM Guozhang Wang <wa...@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > > If you deploy your streams app into a docker container, you'd
> >> need to
> >> > > > make
> >> > > > > sure local state directories are preserved, since otherwise
> >> whenever
> >> > > you
> >> > > > > restart all the state would be lost and Streams then has to
> >> bootstrap
> >> > > > from
> >> > > > > scratch. E.g. if you are using K8s for cluster management, you'd
> >> > better
> >> > > > use
> >> > > > > stateful sets to make sure local states are preserves across
> >> > > > re-deployment.
> >> > > > >
> >> > > > >
> >> > > > > Guozhang
> >> > > > >
> >> > > > > On Wed, Jun 5, 2019 at 4:52 PM Alessandro Tagliapietra <
> >> > > > > tagliapietra.alessandro@gmail.com> wrote:
> >> > > > >
> >> > > > > > Hi Guozhang,
> >> > > > > >
> >> > > > > > sorry, by "app" i mean the stream processor app, the one shown
> >> in
> >> > > > > > pipeline.kt.
> >> > > > > >
> >> > > > > > The app reads a topic of data sent by a sensor each second and
> >> > > > generates
> >> > > > > a
> >> > > > > > 20 second window output to another topic.
> >> > > > > > My "problem" is that when running locally with my local kafka
> >> > setup,
> >> > > > > let's
> >> > > > > > say I stop it and start it again, it continues processing the
> >> last
> >> > > > > window.
> >> > > > > > When deploying the app into a docker container and using the
> >> > > confluent
> >> > > > > > cloud as broker, every time I restart the app it starts
> >> processing
> >> > > > again
> >> > > > > > from the beginning of the input topic and generates again old
> >> > windows
> >> > > > it
> >> > > > > > already processed.
> >> > > > > >
> >> > > > > > In the meantime I'm trying to upgrade to kafka 2.2.1 to see if I
> >> > get
> >> > > > any
> >> > > > > > improvement.
> >> > > > > >
> >> > > > > > --
> >> > > > > > Alessandro Tagliapietra
> >> > > > > >
> >> > > > > >
> >> > > > > > On Wed, Jun 5, 2019 at 4:45 PM Guozhang Wang <
> >> wangguoz@gmail.com>
> >> > > > wrote:
> >> > > > > >
> >> > > > > > > Hello Alessandro,
> >> > > > > > >
> >> > > > > > > What did you do for `restarting the app online`? I'm not sure
> >> I
> >> > > > follow
> >> > > > > > the
> >> > > > > > > difference between "restart the streams app" and "restart the
> >> app
> >> > > > > online"
> >> > > > > > > from your description.
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > Guozhang
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Wed, Jun 5, 2019 at 10:42 AM Alessandro Tagliapietra <
> >> > > > > > > tagliapietra.alessandro@gmail.com> wrote:
> >> > > > > > > >
> >> > > > > > > > Hello everyone,
> >> > > > > > > >
> >> > > > > > > > I've a small streams app, the configuration and part of the
> >> > code
> >> > > > I'm
> >> > > > > > > using
> >> > > > > > > > can be found here
> >> > > > > > > >
> >> > https://gist.github.com/alex88/6b7b31c2b008817a24f63246557099bc
> >> > > > > > > > There's also the log when the app is started locally and
> >> when
> >> > the
> >> > > > app
> >> > > > > > is
> >> > > > > > > > started on our servers connecting to the confluent cloud
> >> kafka
> >> > > > > broker.
> >> > > > > > > >
> >> > > > > > > > The problem is that locally everything is working properly,
> >> if
> >> > I
> >> > > > > > restart
> >> > > > > > > > the streams app it just continues where it left, if I
> >> restart
> >> > the
> >> > > > app
> >> > > > > > > > online it reprocesses the whole topic.
> >> > > > > > > >
> >> > > > > > > > That shouldn't happen right?
> >> > > > > > > >
> >> > > > > > > > Thanks in advance
> >> > > > > > > >
> >> > > > > > > > --
> >> > > > > > > > Alessandro Tagliapietra
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > --
> >> > > > > > > -- Guozhang
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > > >
> >> > > > > --
> >> > > > > -- Guozhang
> >> > > > >
> >> > > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >