You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Tao Wang <Ta...@theice.com> on 2019/10/25 16:12:02 UTC

kafka stream ktable with suppress operator

When using suppress operator with windowed Ktable, it looks like restarting the kafka stream causes the aggregated messages from the SUPPRESS-STATE-STORE published again..

Here is the sudo code .. anything I am missing or anything can be done to avoid this ..


KTable<Windowed<String>, String> test = <KStreamObject>
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3)))
.aggregate(
        ....
        ,
        Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store")
                    .withRetention(Duration.ofMinutes(5))
                    .with(Serdes.String(), Serdes.String())
   )
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))

.toStream()

.to("<topic_out>")





So when restarting the stream app, the <topic_out> will have duplicated messages from a while back ... is this expected behavior ?

Thanks,
Tao Wang





________________________________

This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.

Re: kafka stream ktable with suppress operator

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Sorry for the late reply.

> You mentioned that, reduce() could use RocksDB as stores by default
> while suppress() is in memory. Is that the reason that reduce() has
> both -repartition and -changelog topics while suppress() only has
> -changelog topic?

No. The repartitioning would happen if you change the record key before
the `reduce()`. Both stores will have a changelog topic (changelogs are
for in-memory and RocksDB stores alike).

> And will that be related to the shutdown hook? If I don't provide
> shutdown hook and perform a redeployment, will it cause above issue?

Well, as long as you call `KafakStreams#close()` when you stop your
instance, it should be fine. However, even if you do not call close(),
the application should not rebalance forever.

> 3) You mentioned reduce() is using RocksDB by default, but I also
> noticed offset has been set to zero for reduce changelog topic. Is it
> wrong?

If you don't use "exactly-once" a checkpoint file should be written on a
regular basis. If you do use "exaclty-once" the checkpoint file is only
written in a clean shutdown (ie, when you call `close()`).

However, I am wondering who you deploy your application? If you use
Kubernetes and don't use stateful sets, RocksDB does not help because a
new volume would be attached to the POD on restart. For this case,
stateful sets should help to avoid reading the full changelog topic.
(You might also consider using "static group membership" for this case).

Overall it's hard to say, and I am wondering a little bit, why
rebalancing happens in 5 minutes intervals. This may indicate that it's
correlated to metadata refresh. But hard to say why.


-Matthias





On 11/4/19 7:42 AM, Xiyuan Hu wrote:
> Hi Matthias,
> 
> Could you help with above issue? Or any suggestions?
> 
> Thanks a lot!
> 
> On Thu, Oct 31, 2019 at 4:00 PM Xiyuan Hu <xi...@gmail.com> wrote:
>>
>> Hi Matthias,
>>
>> Some additional information, after I restart the app, it went to
>> endless rebalancing. Join rate loos like below attachment. It's
>> basically rebalanced every 5 minutes. I checked into each node
>> logging. And found below warning:
>>
>> On node A:
>> 2019/10/31 10:13:46 | 2019-10-31 10:13:46,543 WARN
>> [kafka-coordinator-heartbeat-thread | XXX]
>> o.a.k.c.c.i.AbstractCoordinator [Consumer
>> clientId=XXX-StreamThread-1-consumer, groupId=XXX] This member will
>> leave the group because consumer poll timeout has expired. This means
>> the time between subsequent calls to poll() was longer than the
>> configured max.poll.interval.ms, which typically implies that the poll
>> loop is spending too much time processing messages. You can address
>> this either by increasing max.poll.interval.ms or by reducing the
>> maximum size of batches returned in poll() with max.poll.records.
>> 2019/10/31 10:13:46 | 2019-10-31 10:13:46,544 INFO
>> [kafka-coordinator-heartbeat-thread | XXX]
>> o.a.k.c.c.i.AbstractCoordinator [Consumer
>> clientId=XXX-StreamThread-1-consumer, groupId=XXX] Member
>> XXX-StreamThread-1-consumer-7307ab88-9724-4af8-99b8-5d1c3ef5294f
>> sending LeaveGroup request to coordinator xx:9092 (id: 2147483644
>> rack: null)
>> 2019/10/31 10:13:52 | 2019-10-31 10:13:52,766 INFO
>> [XXX-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore Opening store
>> KSTREAM-REDUCE-STATE-STORE-0000000003.1572480000000 in regular mode
>> 2019/10/31 10:13:52 | 2019-10-31 10:13:52,767 INFO
>> [XXX-StreamThread-1] o.a.k.s.p.i.StoreChangelogReader stream-thread
>> [XXX-StreamThread-1] Restoring task 1_3's state store
>> KTABLE-SUPPRESS-STATE-STORE-0000000009 from beginning of the changelog
>> XXX-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog-3
>> 2019/10/31 10:13:52 | 2019-10-31 10:13:52,794 INFO
>> [XXX-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [Consumer
>> clientId=XXX-StreamThread-1-consumer, groupId=XXX] Revoking previously
>> assigned partitions
>> [XXX-KSTREAM-REDUCE-STATE-STORE-0000000003-repartition-3]
>> 2019/10/31 10:13:52 | 2019-10-31 10:13:52,794 INFO
>> [XXX-StreamThread-1] o.a.k.s.p.internals.StreamThread stream-thread
>> [XXX-StreamThread-1] State transition from PARTITIONS_ASSIGNED to
>> PARTITIONS_REVOKED
>>
>> While on other nodes say: 2019/10/31 10:13:47 Attempt to heartbeat
>> failed since group is rebalancing.
>>
>> If my understanding is correct, above warning caused the group
>> rebalancing? My questions are:
>> 1) Why it only happened after restart?
>> 2) Even if it rebalanced, why it keeps rebalancing like a endless
>> loop? I can't understand the behavior here.
>> 3) You mentioned reduce() is using RocksDB by default, but I also
>> noticed offset has been set to zero for reduce changelog topic. Is it
>> wrong?
>>
>> Thanks a lot for the help!
>>
>> On Thu, Oct 31, 2019 at 8:11 AM Xiyuan Hu <xi...@gmail.com> wrote:
>>>
>>> Hi Matthias,
>>>
>>> When I redeployment the application with the same application Id, it
>>> will cause a rebalance loop: partition revoked -> rebalance -> offset
>>> reset to zero -> partition assigned -> partition revoked.
>>>
>>> The app was running well before the redeployment, but once redeployed,
>>> it will keep rebalancing for hours and I have to switch to a new
>>> application id to stop that.
>>>
>>> You mentioned that, reduce() could use RocksDB as stores by default
>>> while suppress() is in memory. Is that the reason that reduce() has
>>> both -repartition and -changelog topics while suppress() only has
>>> -changelog topic?
>>>
>>> And will that be related to the shutdown hook? If I don't provide
>>> shutdown hook and perform a redeployment, will it cause above issue?
>>>
>>> Thanks!
>>>
>>> On Thu, Oct 31, 2019 at 5:55 AM Matthias J. Sax <ma...@confluent.io> wrote:
>>>>
>>>> Just a follow up: currently, suppress() only supports in-memory stores
>>>> (note, that `suppress()` has it's own store).
>>>>
>>>> For the actually `reduce()` store, you can pick between RocksDB and
>>>> in-memory (default is RocksDB). Hence, if you restart an application on
>>>> the same host, it should not be necessary to reload the state from the
>>>> changelog topic if you use RocksDB.
>>>>
>>>> However, the suppress buffer must be recreated from the
>>>> suppress-changelog topics on restart atm.
>>>>
>>>> Originally, `suppress()` intended to support persistent stores as well,
>>>> but it was not implement yet. We hope to close this gap in the future.
>>>>
>>>>>>> I haven't figured out the reason, but after restart, the app will keep
>>>>>>> reset changelog topic offset to ZERO and trigger rebalance.
>>>>
>>>> Resetting to zero would happen is the full state needs to be recovered.
>>>> However, this should not result in a rebalance. Can you elaborate on the
>>>> rebalancing issue you described?
>>>>
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 10/28/19 5:54 PM, Alex Brekken wrote:
>>>>> I assume you're using RocksDB as your state stores... The bytes out you're
>>>>> seeing on the changelog topics is probably because they are restoring your
>>>>> state stores.  If your state stores are in-memory, then on every
>>>>> application startup they're going to be restored from the changelog
>>>>> topics.  If your state stores are persistent (saved to disk), then a
>>>>> restore can still happen if you've lost your filesystem.  (maybe you're
>>>>> doing a state store cleanup on startup/shutdown, or have temporal storage
>>>>> such as emptyDir in k8s, for example)  So *I think* what you're seeing is
>>>>> normal, though if you want to dig deeper there are rocksdb metrics that can
>>>>> be exposed and will show restore related info.  Additionally, there is a
>>>>> StateRestoreListener interface that you can implement if you'd like to log
>>>>> some of the state store restoration details.
>>>>>
>>>>> Alex
>>>>>
>>>>> On Mon, Oct 28, 2019 at 4:41 PM Xiyuan Hu <xi...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I'm using 2.3.1 now and having the same issue. During restarting, I
>>>>>> noticed a lot logging like below:
>>>>>> Seeking to EARLIEST offset of partition
>>>>>> XX-KSTREAM-REDUCE-STATE-STORE-0000000014-changelog-41
>>>>>> Seeking to EARLIEST offset of partition
>>>>>> XX-KTABLE-SUPPRESS-STATE-STORE-0000000020-changelog-41
>>>>>>
>>>>>> After restarting, the bytesout of changelog topic is as high as
>>>>>> 800-900MB/s while normally, it has zero bytes out. Is this expected?
>>>>>> I haven't figured out the reason, but after restart, the app will keep
>>>>>> reset changelog topic offset to ZERO and trigger rebalance. It seems a
>>>>>> dead loop?
>>>>>> Rebalance -> reset to ZERO -> rebalance
>>>>>>
>>>>>> Is there any config I should set?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Sun, Oct 27, 2019 at 11:25 PM Matthias J. Sax <ma...@confluent.io>
>>>>>> wrote:
>>>>>>>
>>>>>>> What version are you using? We fixed couple of bugs in `suppress()` -- I
>>>>>>> would recommend to use latest 2.3.1 bug-fix release.
>>>>>>>
>>>>>>>
>>>>>>> -Matthia
>>>>>>>
>>>>>>> On 10/25/19 9:12 AM, Tao Wang wrote:
>>>>>>>> When using suppress operator with windowed Ktable, it looks like
>>>>>> restarting the kafka stream causes the aggregated messages from the
>>>>>> SUPPRESS-STATE-STORE published again..
>>>>>>>>
>>>>>>>> Here is the sudo code .. anything I am missing or anything can be done
>>>>>> to avoid this ..
>>>>>>>>
>>>>>>>>
>>>>>>>> KTable<Windowed<String>, String> test = <KStreamObject>
>>>>>>>> .groupByKey()
>>>>>>>>
>>>>>> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3)))
>>>>>>>> .aggregate(
>>>>>>>>         ....
>>>>>>>>         ,
>>>>>>>>
>>>>>>  Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store")
>>>>>>>>                     .withRetention(Duration.ofMinutes(5))
>>>>>>>>                     .with(Serdes.String(), Serdes.String())
>>>>>>>>    )
>>>>>>>>
>>>>>> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>>>>>>>>
>>>>>>>> .toStream()
>>>>>>>>
>>>>>>>> .to("<topic_out>")
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> So when restarting the stream app, the <topic_out> will have
>>>>>> duplicated messages from a while back ... is this expected behavior ?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Tao Wang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> ________________________________
>>>>>>>>
>>>>>>>> This message may contain confidential information and is intended for
>>>>>> specific recipients unless explicitly noted otherwise. If you have reason
>>>>>> to believe you are not an intended recipient of this message, please delete
>>>>>> it and notify the sender. This message may not represent the opinion of
>>>>>> Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and
>>>>>> does not constitute a contract or guarantee. Unencrypted electronic mail is
>>>>>> not secure and the recipient of this message is expected to provide
>>>>>> safeguards from viruses and pursue alternate means of communication where
>>>>>> privacy or a binding message is desired.
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>


Re: kafka stream ktable with suppress operator

Posted by Xiyuan Hu <xi...@gmail.com>.
Hi Matthias,

Could you help with above issue? Or any suggestions?

Thanks a lot!

On Thu, Oct 31, 2019 at 4:00 PM Xiyuan Hu <xi...@gmail.com> wrote:
>
> Hi Matthias,
>
> Some additional information, after I restart the app, it went to
> endless rebalancing. Join rate loos like below attachment. It's
> basically rebalanced every 5 minutes. I checked into each node
> logging. And found below warning:
>
> On node A:
> 2019/10/31 10:13:46 | 2019-10-31 10:13:46,543 WARN
> [kafka-coordinator-heartbeat-thread | XXX]
> o.a.k.c.c.i.AbstractCoordinator [Consumer
> clientId=XXX-StreamThread-1-consumer, groupId=XXX] This member will
> leave the group because consumer poll timeout has expired. This means
> the time between subsequent calls to poll() was longer than the
> configured max.poll.interval.ms, which typically implies that the poll
> loop is spending too much time processing messages. You can address
> this either by increasing max.poll.interval.ms or by reducing the
> maximum size of batches returned in poll() with max.poll.records.
> 2019/10/31 10:13:46 | 2019-10-31 10:13:46,544 INFO
> [kafka-coordinator-heartbeat-thread | XXX]
> o.a.k.c.c.i.AbstractCoordinator [Consumer
> clientId=XXX-StreamThread-1-consumer, groupId=XXX] Member
> XXX-StreamThread-1-consumer-7307ab88-9724-4af8-99b8-5d1c3ef5294f
> sending LeaveGroup request to coordinator xx:9092 (id: 2147483644
> rack: null)
> 2019/10/31 10:13:52 | 2019-10-31 10:13:52,766 INFO
> [XXX-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore Opening store
> KSTREAM-REDUCE-STATE-STORE-0000000003.1572480000000 in regular mode
> 2019/10/31 10:13:52 | 2019-10-31 10:13:52,767 INFO
> [XXX-StreamThread-1] o.a.k.s.p.i.StoreChangelogReader stream-thread
> [XXX-StreamThread-1] Restoring task 1_3's state store
> KTABLE-SUPPRESS-STATE-STORE-0000000009 from beginning of the changelog
> XXX-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog-3
> 2019/10/31 10:13:52 | 2019-10-31 10:13:52,794 INFO
> [XXX-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [Consumer
> clientId=XXX-StreamThread-1-consumer, groupId=XXX] Revoking previously
> assigned partitions
> [XXX-KSTREAM-REDUCE-STATE-STORE-0000000003-repartition-3]
> 2019/10/31 10:13:52 | 2019-10-31 10:13:52,794 INFO
> [XXX-StreamThread-1] o.a.k.s.p.internals.StreamThread stream-thread
> [XXX-StreamThread-1] State transition from PARTITIONS_ASSIGNED to
> PARTITIONS_REVOKED
>
> While on other nodes say: 2019/10/31 10:13:47 Attempt to heartbeat
> failed since group is rebalancing.
>
> If my understanding is correct, above warning caused the group
> rebalancing? My questions are:
> 1) Why it only happened after restart?
> 2) Even if it rebalanced, why it keeps rebalancing like a endless
> loop? I can't understand the behavior here.
> 3) You mentioned reduce() is using RocksDB by default, but I also
> noticed offset has been set to zero for reduce changelog topic. Is it
> wrong?
>
> Thanks a lot for the help!
>
> On Thu, Oct 31, 2019 at 8:11 AM Xiyuan Hu <xi...@gmail.com> wrote:
> >
> > Hi Matthias,
> >
> > When I redeployment the application with the same application Id, it
> > will cause a rebalance loop: partition revoked -> rebalance -> offset
> > reset to zero -> partition assigned -> partition revoked.
> >
> > The app was running well before the redeployment, but once redeployed,
> > it will keep rebalancing for hours and I have to switch to a new
> > application id to stop that.
> >
> > You mentioned that, reduce() could use RocksDB as stores by default
> > while suppress() is in memory. Is that the reason that reduce() has
> > both -repartition and -changelog topics while suppress() only has
> > -changelog topic?
> >
> > And will that be related to the shutdown hook? If I don't provide
> > shutdown hook and perform a redeployment, will it cause above issue?
> >
> > Thanks!
> >
> > On Thu, Oct 31, 2019 at 5:55 AM Matthias J. Sax <ma...@confluent.io> wrote:
> > >
> > > Just a follow up: currently, suppress() only supports in-memory stores
> > > (note, that `suppress()` has it's own store).
> > >
> > > For the actually `reduce()` store, you can pick between RocksDB and
> > > in-memory (default is RocksDB). Hence, if you restart an application on
> > > the same host, it should not be necessary to reload the state from the
> > > changelog topic if you use RocksDB.
> > >
> > > However, the suppress buffer must be recreated from the
> > > suppress-changelog topics on restart atm.
> > >
> > > Originally, `suppress()` intended to support persistent stores as well,
> > > but it was not implement yet. We hope to close this gap in the future.
> > >
> > > >>> I haven't figured out the reason, but after restart, the app will keep
> > > >>> reset changelog topic offset to ZERO and trigger rebalance.
> > >
> > > Resetting to zero would happen is the full state needs to be recovered.
> > > However, this should not result in a rebalance. Can you elaborate on the
> > > rebalancing issue you described?
> > >
> > >
> > >
> > > -Matthias
> > >
> > > On 10/28/19 5:54 PM, Alex Brekken wrote:
> > > > I assume you're using RocksDB as your state stores... The bytes out you're
> > > > seeing on the changelog topics is probably because they are restoring your
> > > > state stores.  If your state stores are in-memory, then on every
> > > > application startup they're going to be restored from the changelog
> > > > topics.  If your state stores are persistent (saved to disk), then a
> > > > restore can still happen if you've lost your filesystem.  (maybe you're
> > > > doing a state store cleanup on startup/shutdown, or have temporal storage
> > > > such as emptyDir in k8s, for example)  So *I think* what you're seeing is
> > > > normal, though if you want to dig deeper there are rocksdb metrics that can
> > > > be exposed and will show restore related info.  Additionally, there is a
> > > > StateRestoreListener interface that you can implement if you'd like to log
> > > > some of the state store restoration details.
> > > >
> > > > Alex
> > > >
> > > > On Mon, Oct 28, 2019 at 4:41 PM Xiyuan Hu <xi...@gmail.com> wrote:
> > > >
> > > >> Hi,
> > > >> I'm using 2.3.1 now and having the same issue. During restarting, I
> > > >> noticed a lot logging like below:
> > > >> Seeking to EARLIEST offset of partition
> > > >> XX-KSTREAM-REDUCE-STATE-STORE-0000000014-changelog-41
> > > >> Seeking to EARLIEST offset of partition
> > > >> XX-KTABLE-SUPPRESS-STATE-STORE-0000000020-changelog-41
> > > >>
> > > >> After restarting, the bytesout of changelog topic is as high as
> > > >> 800-900MB/s while normally, it has zero bytes out. Is this expected?
> > > >> I haven't figured out the reason, but after restart, the app will keep
> > > >> reset changelog topic offset to ZERO and trigger rebalance. It seems a
> > > >> dead loop?
> > > >> Rebalance -> reset to ZERO -> rebalance
> > > >>
> > > >> Is there any config I should set?
> > > >>
> > > >> Thanks!
> > > >>
> > > >> On Sun, Oct 27, 2019 at 11:25 PM Matthias J. Sax <ma...@confluent.io>
> > > >> wrote:
> > > >>>
> > > >>> What version are you using? We fixed couple of bugs in `suppress()` -- I
> > > >>> would recommend to use latest 2.3.1 bug-fix release.
> > > >>>
> > > >>>
> > > >>> -Matthia
> > > >>>
> > > >>> On 10/25/19 9:12 AM, Tao Wang wrote:
> > > >>>> When using suppress operator with windowed Ktable, it looks like
> > > >> restarting the kafka stream causes the aggregated messages from the
> > > >> SUPPRESS-STATE-STORE published again..
> > > >>>>
> > > >>>> Here is the sudo code .. anything I am missing or anything can be done
> > > >> to avoid this ..
> > > >>>>
> > > >>>>
> > > >>>> KTable<Windowed<String>, String> test = <KStreamObject>
> > > >>>> .groupByKey()
> > > >>>>
> > > >> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3)))
> > > >>>> .aggregate(
> > > >>>>         ....
> > > >>>>         ,
> > > >>>>
> > > >>  Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store")
> > > >>>>                     .withRetention(Duration.ofMinutes(5))
> > > >>>>                     .with(Serdes.String(), Serdes.String())
> > > >>>>    )
> > > >>>>
> > > >> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > >>>>
> > > >>>> .toStream()
> > > >>>>
> > > >>>> .to("<topic_out>")
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> So when restarting the stream app, the <topic_out> will have
> > > >> duplicated messages from a while back ... is this expected behavior ?
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Tao Wang
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> ________________________________
> > > >>>>
> > > >>>> This message may contain confidential information and is intended for
> > > >> specific recipients unless explicitly noted otherwise. If you have reason
> > > >> to believe you are not an intended recipient of this message, please delete
> > > >> it and notify the sender. This message may not represent the opinion of
> > > >> Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and
> > > >> does not constitute a contract or guarantee. Unencrypted electronic mail is
> > > >> not secure and the recipient of this message is expected to provide
> > > >> safeguards from viruses and pursue alternate means of communication where
> > > >> privacy or a binding message is desired.
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >

Re: kafka stream ktable with suppress operator

Posted by Xiyuan Hu <xi...@gmail.com>.
Hi Matthias,

Some additional information, after I restart the app, it went to
endless rebalancing. Join rate loos like below attachment. It's
basically rebalanced every 5 minutes. I checked into each node
logging. And found below warning:

On node A:
2019/10/31 10:13:46 | 2019-10-31 10:13:46,543 WARN
[kafka-coordinator-heartbeat-thread | XXX]
o.a.k.c.c.i.AbstractCoordinator [Consumer
clientId=XXX-StreamThread-1-consumer, groupId=XXX] This member will
leave the group because consumer poll timeout has expired. This means
the time between subsequent calls to poll() was longer than the
configured max.poll.interval.ms, which typically implies that the poll
loop is spending too much time processing messages. You can address
this either by increasing max.poll.interval.ms or by reducing the
maximum size of batches returned in poll() with max.poll.records.
2019/10/31 10:13:46 | 2019-10-31 10:13:46,544 INFO
[kafka-coordinator-heartbeat-thread | XXX]
o.a.k.c.c.i.AbstractCoordinator [Consumer
clientId=XXX-StreamThread-1-consumer, groupId=XXX] Member
XXX-StreamThread-1-consumer-7307ab88-9724-4af8-99b8-5d1c3ef5294f
sending LeaveGroup request to coordinator xx:9092 (id: 2147483644
rack: null)
2019/10/31 10:13:52 | 2019-10-31 10:13:52,766 INFO
[XXX-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore Opening store
KSTREAM-REDUCE-STATE-STORE-0000000003.1572480000000 in regular mode
2019/10/31 10:13:52 | 2019-10-31 10:13:52,767 INFO
[XXX-StreamThread-1] o.a.k.s.p.i.StoreChangelogReader stream-thread
[XXX-StreamThread-1] Restoring task 1_3's state store
KTABLE-SUPPRESS-STATE-STORE-0000000009 from beginning of the changelog
XXX-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog-3
2019/10/31 10:13:52 | 2019-10-31 10:13:52,794 INFO
[XXX-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [Consumer
clientId=XXX-StreamThread-1-consumer, groupId=XXX] Revoking previously
assigned partitions
[XXX-KSTREAM-REDUCE-STATE-STORE-0000000003-repartition-3]
2019/10/31 10:13:52 | 2019-10-31 10:13:52,794 INFO
[XXX-StreamThread-1] o.a.k.s.p.internals.StreamThread stream-thread
[XXX-StreamThread-1] State transition from PARTITIONS_ASSIGNED to
PARTITIONS_REVOKED

While on other nodes say: 2019/10/31 10:13:47 Attempt to heartbeat
failed since group is rebalancing.

If my understanding is correct, above warning caused the group
rebalancing? My questions are:
1) Why it only happened after restart?
2) Even if it rebalanced, why it keeps rebalancing like a endless
loop? I can't understand the behavior here.
3) You mentioned reduce() is using RocksDB by default, but I also
noticed offset has been set to zero for reduce changelog topic. Is it
wrong?

Thanks a lot for the help!

On Thu, Oct 31, 2019 at 8:11 AM Xiyuan Hu <xi...@gmail.com> wrote:
>
> Hi Matthias,
>
> When I redeployment the application with the same application Id, it
> will cause a rebalance loop: partition revoked -> rebalance -> offset
> reset to zero -> partition assigned -> partition revoked.
>
> The app was running well before the redeployment, but once redeployed,
> it will keep rebalancing for hours and I have to switch to a new
> application id to stop that.
>
> You mentioned that, reduce() could use RocksDB as stores by default
> while suppress() is in memory. Is that the reason that reduce() has
> both -repartition and -changelog topics while suppress() only has
> -changelog topic?
>
> And will that be related to the shutdown hook? If I don't provide
> shutdown hook and perform a redeployment, will it cause above issue?
>
> Thanks!
>
> On Thu, Oct 31, 2019 at 5:55 AM Matthias J. Sax <ma...@confluent.io> wrote:
> >
> > Just a follow up: currently, suppress() only supports in-memory stores
> > (note, that `suppress()` has it's own store).
> >
> > For the actually `reduce()` store, you can pick between RocksDB and
> > in-memory (default is RocksDB). Hence, if you restart an application on
> > the same host, it should not be necessary to reload the state from the
> > changelog topic if you use RocksDB.
> >
> > However, the suppress buffer must be recreated from the
> > suppress-changelog topics on restart atm.
> >
> > Originally, `suppress()` intended to support persistent stores as well,
> > but it was not implement yet. We hope to close this gap in the future.
> >
> > >>> I haven't figured out the reason, but after restart, the app will keep
> > >>> reset changelog topic offset to ZERO and trigger rebalance.
> >
> > Resetting to zero would happen is the full state needs to be recovered.
> > However, this should not result in a rebalance. Can you elaborate on the
> > rebalancing issue you described?
> >
> >
> >
> > -Matthias
> >
> > On 10/28/19 5:54 PM, Alex Brekken wrote:
> > > I assume you're using RocksDB as your state stores... The bytes out you're
> > > seeing on the changelog topics is probably because they are restoring your
> > > state stores.  If your state stores are in-memory, then on every
> > > application startup they're going to be restored from the changelog
> > > topics.  If your state stores are persistent (saved to disk), then a
> > > restore can still happen if you've lost your filesystem.  (maybe you're
> > > doing a state store cleanup on startup/shutdown, or have temporal storage
> > > such as emptyDir in k8s, for example)  So *I think* what you're seeing is
> > > normal, though if you want to dig deeper there are rocksdb metrics that can
> > > be exposed and will show restore related info.  Additionally, there is a
> > > StateRestoreListener interface that you can implement if you'd like to log
> > > some of the state store restoration details.
> > >
> > > Alex
> > >
> > > On Mon, Oct 28, 2019 at 4:41 PM Xiyuan Hu <xi...@gmail.com> wrote:
> > >
> > >> Hi,
> > >> I'm using 2.3.1 now and having the same issue. During restarting, I
> > >> noticed a lot logging like below:
> > >> Seeking to EARLIEST offset of partition
> > >> XX-KSTREAM-REDUCE-STATE-STORE-0000000014-changelog-41
> > >> Seeking to EARLIEST offset of partition
> > >> XX-KTABLE-SUPPRESS-STATE-STORE-0000000020-changelog-41
> > >>
> > >> After restarting, the bytesout of changelog topic is as high as
> > >> 800-900MB/s while normally, it has zero bytes out. Is this expected?
> > >> I haven't figured out the reason, but after restart, the app will keep
> > >> reset changelog topic offset to ZERO and trigger rebalance. It seems a
> > >> dead loop?
> > >> Rebalance -> reset to ZERO -> rebalance
> > >>
> > >> Is there any config I should set?
> > >>
> > >> Thanks!
> > >>
> > >> On Sun, Oct 27, 2019 at 11:25 PM Matthias J. Sax <ma...@confluent.io>
> > >> wrote:
> > >>>
> > >>> What version are you using? We fixed couple of bugs in `suppress()` -- I
> > >>> would recommend to use latest 2.3.1 bug-fix release.
> > >>>
> > >>>
> > >>> -Matthia
> > >>>
> > >>> On 10/25/19 9:12 AM, Tao Wang wrote:
> > >>>> When using suppress operator with windowed Ktable, it looks like
> > >> restarting the kafka stream causes the aggregated messages from the
> > >> SUPPRESS-STATE-STORE published again..
> > >>>>
> > >>>> Here is the sudo code .. anything I am missing or anything can be done
> > >> to avoid this ..
> > >>>>
> > >>>>
> > >>>> KTable<Windowed<String>, String> test = <KStreamObject>
> > >>>> .groupByKey()
> > >>>>
> > >> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3)))
> > >>>> .aggregate(
> > >>>>         ....
> > >>>>         ,
> > >>>>
> > >>  Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store")
> > >>>>                     .withRetention(Duration.ofMinutes(5))
> > >>>>                     .with(Serdes.String(), Serdes.String())
> > >>>>    )
> > >>>>
> > >> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > >>>>
> > >>>> .toStream()
> > >>>>
> > >>>> .to("<topic_out>")
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> So when restarting the stream app, the <topic_out> will have
> > >> duplicated messages from a while back ... is this expected behavior ?
> > >>>>
> > >>>> Thanks,
> > >>>> Tao Wang
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> ________________________________
> > >>>>
> > >>>> This message may contain confidential information and is intended for
> > >> specific recipients unless explicitly noted otherwise. If you have reason
> > >> to believe you are not an intended recipient of this message, please delete
> > >> it and notify the sender. This message may not represent the opinion of
> > >> Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and
> > >> does not constitute a contract or guarantee. Unencrypted electronic mail is
> > >> not secure and the recipient of this message is expected to provide
> > >> safeguards from viruses and pursue alternate means of communication where
> > >> privacy or a binding message is desired.
> > >>>>
> > >>>
> > >>
> > >
> >

Re: kafka stream ktable with suppress operator

Posted by Xiyuan Hu <xi...@gmail.com>.
Hi Matthias,

When I redeployment the application with the same application Id, it
will cause a rebalance loop: partition revoked -> rebalance -> offset
reset to zero -> partition assigned -> partition revoked.

The app was running well before the redeployment, but once redeployed,
it will keep rebalancing for hours and I have to switch to a new
application id to stop that.

You mentioned that, reduce() could use RocksDB as stores by default
while suppress() is in memory. Is that the reason that reduce() has
both -repartition and -changelog topics while suppress() only has
-changelog topic?

And will that be related to the shutdown hook? If I don't provide
shutdown hook and perform a redeployment, will it cause above issue?

Thanks!

On Thu, Oct 31, 2019 at 5:55 AM Matthias J. Sax <ma...@confluent.io> wrote:
>
> Just a follow up: currently, suppress() only supports in-memory stores
> (note, that `suppress()` has it's own store).
>
> For the actually `reduce()` store, you can pick between RocksDB and
> in-memory (default is RocksDB). Hence, if you restart an application on
> the same host, it should not be necessary to reload the state from the
> changelog topic if you use RocksDB.
>
> However, the suppress buffer must be recreated from the
> suppress-changelog topics on restart atm.
>
> Originally, `suppress()` intended to support persistent stores as well,
> but it was not implement yet. We hope to close this gap in the future.
>
> >>> I haven't figured out the reason, but after restart, the app will keep
> >>> reset changelog topic offset to ZERO and trigger rebalance.
>
> Resetting to zero would happen is the full state needs to be recovered.
> However, this should not result in a rebalance. Can you elaborate on the
> rebalancing issue you described?
>
>
>
> -Matthias
>
> On 10/28/19 5:54 PM, Alex Brekken wrote:
> > I assume you're using RocksDB as your state stores... The bytes out you're
> > seeing on the changelog topics is probably because they are restoring your
> > state stores.  If your state stores are in-memory, then on every
> > application startup they're going to be restored from the changelog
> > topics.  If your state stores are persistent (saved to disk), then a
> > restore can still happen if you've lost your filesystem.  (maybe you're
> > doing a state store cleanup on startup/shutdown, or have temporal storage
> > such as emptyDir in k8s, for example)  So *I think* what you're seeing is
> > normal, though if you want to dig deeper there are rocksdb metrics that can
> > be exposed and will show restore related info.  Additionally, there is a
> > StateRestoreListener interface that you can implement if you'd like to log
> > some of the state store restoration details.
> >
> > Alex
> >
> > On Mon, Oct 28, 2019 at 4:41 PM Xiyuan Hu <xi...@gmail.com> wrote:
> >
> >> Hi,
> >> I'm using 2.3.1 now and having the same issue. During restarting, I
> >> noticed a lot logging like below:
> >> Seeking to EARLIEST offset of partition
> >> XX-KSTREAM-REDUCE-STATE-STORE-0000000014-changelog-41
> >> Seeking to EARLIEST offset of partition
> >> XX-KTABLE-SUPPRESS-STATE-STORE-0000000020-changelog-41
> >>
> >> After restarting, the bytesout of changelog topic is as high as
> >> 800-900MB/s while normally, it has zero bytes out. Is this expected?
> >> I haven't figured out the reason, but after restart, the app will keep
> >> reset changelog topic offset to ZERO and trigger rebalance. It seems a
> >> dead loop?
> >> Rebalance -> reset to ZERO -> rebalance
> >>
> >> Is there any config I should set?
> >>
> >> Thanks!
> >>
> >> On Sun, Oct 27, 2019 at 11:25 PM Matthias J. Sax <ma...@confluent.io>
> >> wrote:
> >>>
> >>> What version are you using? We fixed couple of bugs in `suppress()` -- I
> >>> would recommend to use latest 2.3.1 bug-fix release.
> >>>
> >>>
> >>> -Matthia
> >>>
> >>> On 10/25/19 9:12 AM, Tao Wang wrote:
> >>>> When using suppress operator with windowed Ktable, it looks like
> >> restarting the kafka stream causes the aggregated messages from the
> >> SUPPRESS-STATE-STORE published again..
> >>>>
> >>>> Here is the sudo code .. anything I am missing or anything can be done
> >> to avoid this ..
> >>>>
> >>>>
> >>>> KTable<Windowed<String>, String> test = <KStreamObject>
> >>>> .groupByKey()
> >>>>
> >> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3)))
> >>>> .aggregate(
> >>>>         ....
> >>>>         ,
> >>>>
> >>  Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store")
> >>>>                     .withRetention(Duration.ofMinutes(5))
> >>>>                     .with(Serdes.String(), Serdes.String())
> >>>>    )
> >>>>
> >> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> >>>>
> >>>> .toStream()
> >>>>
> >>>> .to("<topic_out>")
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> So when restarting the stream app, the <topic_out> will have
> >> duplicated messages from a while back ... is this expected behavior ?
> >>>>
> >>>> Thanks,
> >>>> Tao Wang
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> ________________________________
> >>>>
> >>>> This message may contain confidential information and is intended for
> >> specific recipients unless explicitly noted otherwise. If you have reason
> >> to believe you are not an intended recipient of this message, please delete
> >> it and notify the sender. This message may not represent the opinion of
> >> Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and
> >> does not constitute a contract or guarantee. Unencrypted electronic mail is
> >> not secure and the recipient of this message is expected to provide
> >> safeguards from viruses and pursue alternate means of communication where
> >> privacy or a binding message is desired.
> >>>>
> >>>
> >>
> >
>

Re: kafka stream ktable with suppress operator

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Just a follow up: currently, suppress() only supports in-memory stores
(note, that `suppress()` has it's own store).

For the actually `reduce()` store, you can pick between RocksDB and
in-memory (default is RocksDB). Hence, if you restart an application on
the same host, it should not be necessary to reload the state from the
changelog topic if you use RocksDB.

However, the suppress buffer must be recreated from the
suppress-changelog topics on restart atm.

Originally, `suppress()` intended to support persistent stores as well,
but it was not implement yet. We hope to close this gap in the future.

>>> I haven't figured out the reason, but after restart, the app will keep
>>> reset changelog topic offset to ZERO and trigger rebalance.

Resetting to zero would happen is the full state needs to be recovered.
However, this should not result in a rebalance. Can you elaborate on the
rebalancing issue you described?



-Matthias

On 10/28/19 5:54 PM, Alex Brekken wrote:
> I assume you're using RocksDB as your state stores... The bytes out you're
> seeing on the changelog topics is probably because they are restoring your
> state stores.  If your state stores are in-memory, then on every
> application startup they're going to be restored from the changelog
> topics.  If your state stores are persistent (saved to disk), then a
> restore can still happen if you've lost your filesystem.  (maybe you're
> doing a state store cleanup on startup/shutdown, or have temporal storage
> such as emptyDir in k8s, for example)  So *I think* what you're seeing is
> normal, though if you want to dig deeper there are rocksdb metrics that can
> be exposed and will show restore related info.  Additionally, there is a
> StateRestoreListener interface that you can implement if you'd like to log
> some of the state store restoration details.
> 
> Alex
> 
> On Mon, Oct 28, 2019 at 4:41 PM Xiyuan Hu <xi...@gmail.com> wrote:
> 
>> Hi,
>> I'm using 2.3.1 now and having the same issue. During restarting, I
>> noticed a lot logging like below:
>> Seeking to EARLIEST offset of partition
>> XX-KSTREAM-REDUCE-STATE-STORE-0000000014-changelog-41
>> Seeking to EARLIEST offset of partition
>> XX-KTABLE-SUPPRESS-STATE-STORE-0000000020-changelog-41
>>
>> After restarting, the bytesout of changelog topic is as high as
>> 800-900MB/s while normally, it has zero bytes out. Is this expected?
>> I haven't figured out the reason, but after restart, the app will keep
>> reset changelog topic offset to ZERO and trigger rebalance. It seems a
>> dead loop?
>> Rebalance -> reset to ZERO -> rebalance
>>
>> Is there any config I should set?
>>
>> Thanks!
>>
>> On Sun, Oct 27, 2019 at 11:25 PM Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>>
>>> What version are you using? We fixed couple of bugs in `suppress()` -- I
>>> would recommend to use latest 2.3.1 bug-fix release.
>>>
>>>
>>> -Matthia
>>>
>>> On 10/25/19 9:12 AM, Tao Wang wrote:
>>>> When using suppress operator with windowed Ktable, it looks like
>> restarting the kafka stream causes the aggregated messages from the
>> SUPPRESS-STATE-STORE published again..
>>>>
>>>> Here is the sudo code .. anything I am missing or anything can be done
>> to avoid this ..
>>>>
>>>>
>>>> KTable<Windowed<String>, String> test = <KStreamObject>
>>>> .groupByKey()
>>>>
>> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3)))
>>>> .aggregate(
>>>>         ....
>>>>         ,
>>>>
>>  Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store")
>>>>                     .withRetention(Duration.ofMinutes(5))
>>>>                     .with(Serdes.String(), Serdes.String())
>>>>    )
>>>>
>> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>>>>
>>>> .toStream()
>>>>
>>>> .to("<topic_out>")
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> So when restarting the stream app, the <topic_out> will have
>> duplicated messages from a while back ... is this expected behavior ?
>>>>
>>>> Thanks,
>>>> Tao Wang
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> ________________________________
>>>>
>>>> This message may contain confidential information and is intended for
>> specific recipients unless explicitly noted otherwise. If you have reason
>> to believe you are not an intended recipient of this message, please delete
>> it and notify the sender. This message may not represent the opinion of
>> Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and
>> does not constitute a contract or guarantee. Unencrypted electronic mail is
>> not secure and the recipient of this message is expected to provide
>> safeguards from viruses and pursue alternate means of communication where
>> privacy or a binding message is desired.
>>>>
>>>
>>
> 


Re: kafka stream ktable with suppress operator

Posted by Alex Brekken <br...@gmail.com>.
I assume you're using RocksDB as your state stores... The bytes out you're
seeing on the changelog topics is probably because they are restoring your
state stores.  If your state stores are in-memory, then on every
application startup they're going to be restored from the changelog
topics.  If your state stores are persistent (saved to disk), then a
restore can still happen if you've lost your filesystem.  (maybe you're
doing a state store cleanup on startup/shutdown, or have temporal storage
such as emptyDir in k8s, for example)  So *I think* what you're seeing is
normal, though if you want to dig deeper there are rocksdb metrics that can
be exposed and will show restore related info.  Additionally, there is a
StateRestoreListener interface that you can implement if you'd like to log
some of the state store restoration details.

Alex

On Mon, Oct 28, 2019 at 4:41 PM Xiyuan Hu <xi...@gmail.com> wrote:

> Hi,
> I'm using 2.3.1 now and having the same issue. During restarting, I
> noticed a lot logging like below:
> Seeking to EARLIEST offset of partition
> XX-KSTREAM-REDUCE-STATE-STORE-0000000014-changelog-41
> Seeking to EARLIEST offset of partition
> XX-KTABLE-SUPPRESS-STATE-STORE-0000000020-changelog-41
>
> After restarting, the bytesout of changelog topic is as high as
> 800-900MB/s while normally, it has zero bytes out. Is this expected?
> I haven't figured out the reason, but after restart, the app will keep
> reset changelog topic offset to ZERO and trigger rebalance. It seems a
> dead loop?
> Rebalance -> reset to ZERO -> rebalance
>
> Is there any config I should set?
>
> Thanks!
>
> On Sun, Oct 27, 2019 at 11:25 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
> >
> > What version are you using? We fixed couple of bugs in `suppress()` -- I
> > would recommend to use latest 2.3.1 bug-fix release.
> >
> >
> > -Matthia
> >
> > On 10/25/19 9:12 AM, Tao Wang wrote:
> > > When using suppress operator with windowed Ktable, it looks like
> restarting the kafka stream causes the aggregated messages from the
> SUPPRESS-STATE-STORE published again..
> > >
> > > Here is the sudo code .. anything I am missing or anything can be done
> to avoid this ..
> > >
> > >
> > > KTable<Windowed<String>, String> test = <KStreamObject>
> > > .groupByKey()
> > >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3)))
> > > .aggregate(
> > >         ....
> > >         ,
> > >
>  Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store")
> > >                     .withRetention(Duration.ofMinutes(5))
> > >                     .with(Serdes.String(), Serdes.String())
> > >    )
> > >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > >
> > > .toStream()
> > >
> > > .to("<topic_out>")
> > >
> > >
> > >
> > >
> > >
> > > So when restarting the stream app, the <topic_out> will have
> duplicated messages from a while back ... is this expected behavior ?
> > >
> > > Thanks,
> > > Tao Wang
> > >
> > >
> > >
> > >
> > >
> > > ________________________________
> > >
> > > This message may contain confidential information and is intended for
> specific recipients unless explicitly noted otherwise. If you have reason
> to believe you are not an intended recipient of this message, please delete
> it and notify the sender. This message may not represent the opinion of
> Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and
> does not constitute a contract or guarantee. Unencrypted electronic mail is
> not secure and the recipient of this message is expected to provide
> safeguards from viruses and pursue alternate means of communication where
> privacy or a binding message is desired.
> > >
> >
>

Re: kafka stream ktable with suppress operator

Posted by Xiyuan Hu <xi...@gmail.com>.
Hi,
I'm using 2.3.1 now and having the same issue. During restarting, I
noticed a lot logging like below:
Seeking to EARLIEST offset of partition
XX-KSTREAM-REDUCE-STATE-STORE-0000000014-changelog-41
Seeking to EARLIEST offset of partition
XX-KTABLE-SUPPRESS-STATE-STORE-0000000020-changelog-41

After restarting, the bytesout of changelog topic is as high as
800-900MB/s while normally, it has zero bytes out. Is this expected?
I haven't figured out the reason, but after restart, the app will keep
reset changelog topic offset to ZERO and trigger rebalance. It seems a
dead loop?
Rebalance -> reset to ZERO -> rebalance

Is there any config I should set?

Thanks!

On Sun, Oct 27, 2019 at 11:25 PM Matthias J. Sax <ma...@confluent.io> wrote:
>
> What version are you using? We fixed couple of bugs in `suppress()` -- I
> would recommend to use latest 2.3.1 bug-fix release.
>
>
> -Matthia
>
> On 10/25/19 9:12 AM, Tao Wang wrote:
> > When using suppress operator with windowed Ktable, it looks like restarting the kafka stream causes the aggregated messages from the SUPPRESS-STATE-STORE published again..
> >
> > Here is the sudo code .. anything I am missing or anything can be done to avoid this ..
> >
> >
> > KTable<Windowed<String>, String> test = <KStreamObject>
> > .groupByKey()
> > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3)))
> > .aggregate(
> >         ....
> >         ,
> >         Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store")
> >                     .withRetention(Duration.ofMinutes(5))
> >                     .with(Serdes.String(), Serdes.String())
> >    )
> > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> >
> > .toStream()
> >
> > .to("<topic_out>")
> >
> >
> >
> >
> >
> > So when restarting the stream app, the <topic_out> will have duplicated messages from a while back ... is this expected behavior ?
> >
> > Thanks,
> > Tao Wang
> >
> >
> >
> >
> >
> > ________________________________
> >
> > This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.
> >
>

RE: kafka stream ktable with suppress operator

Posted by Tao Wang <Ta...@theice.com>.
I just tested out using kafka stream 2.3.0 and the issue went away.. Restarting the stream application did not cause all the messages to be reproduced again any more.. and the suppressor start producing from when it was shutdown.

Thanks for the help!

Tao

-----Original Message-----
From: Tao Wang
Sent: Tuesday, October 29, 2019 10:09 AM
To: users@kafka.apache.org
Subject: RE: kafka stream ktable with suppress operator

Thanks Matthia for the reply.  I am using kafka-stream-2.1.0 jar. I will try 2.3.1 and see if the issue still there.

Tao



-----Original Message-----
From: Matthias J. Sax [mailto:matthias@confluent.io]
Sent: Sunday, October 27, 2019 11:25 PM
To: users@kafka.apache.org
Subject: Re: kafka stream ktable with suppress operator

What version are you using? We fixed couple of bugs in `suppress()` -- I
would recommend to use latest 2.3.1 bug-fix release.


-Matthia

On 10/25/19 9:12 AM, Tao Wang wrote:
> When using suppress operator with windowed Ktable, it looks like restarting the kafka stream causes the aggregated messages from the SUPPRESS-STATE-STORE published again..
>
> Here is the sudo code .. anything I am missing or anything can be done to avoid this ..
>
>
> KTable<Windowed<String>, String> test = <KStreamObject>
> .groupByKey()
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3)))
> .aggregate(
>         ....
>         ,
>         Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store")
>                     .withRetention(Duration.ofMinutes(5))
>                     .with(Serdes.String(), Serdes.String())
>    )
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>
> .toStream()
>
> .to("<topic_out>")
>
>
>
>
>
> So when restarting the stream app, the <topic_out> will have duplicated messages from a while back ... is this expected behavior ?
>
> Thanks,
> Tao Wang
>
>
>
>
>
> ________________________________
>
> This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.
>


________________________________

This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.

RE: kafka stream ktable with suppress operator

Posted by Tao Wang <Ta...@theice.com>.
Thanks Matthia for the reply.  I am using kafka-stream-2.1.0 jar. I will try 2.3.1 and see if the issue still there.

Tao



-----Original Message-----
From: Matthias J. Sax [mailto:matthias@confluent.io]
Sent: Sunday, October 27, 2019 11:25 PM
To: users@kafka.apache.org
Subject: Re: kafka stream ktable with suppress operator

What version are you using? We fixed couple of bugs in `suppress()` -- I
would recommend to use latest 2.3.1 bug-fix release.


-Matthia

On 10/25/19 9:12 AM, Tao Wang wrote:
> When using suppress operator with windowed Ktable, it looks like restarting the kafka stream causes the aggregated messages from the SUPPRESS-STATE-STORE published again..
>
> Here is the sudo code .. anything I am missing or anything can be done to avoid this ..
>
>
> KTable<Windowed<String>, String> test = <KStreamObject>
> .groupByKey()
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3)))
> .aggregate(
>         ....
>         ,
>         Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store")
>                     .withRetention(Duration.ofMinutes(5))
>                     .with(Serdes.String(), Serdes.String())
>    )
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>
> .toStream()
>
> .to("<topic_out>")
>
>
>
>
>
> So when restarting the stream app, the <topic_out> will have duplicated messages from a while back ... is this expected behavior ?
>
> Thanks,
> Tao Wang
>
>
>
>
>
> ________________________________
>
> This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.
>


________________________________

This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.

Re: kafka stream ktable with suppress operator

Posted by "Matthias J. Sax" <ma...@confluent.io>.
What version are you using? We fixed couple of bugs in `suppress()` -- I
would recommend to use latest 2.3.1 bug-fix release.


-Matthia

On 10/25/19 9:12 AM, Tao Wang wrote:
> When using suppress operator with windowed Ktable, it looks like restarting the kafka stream causes the aggregated messages from the SUPPRESS-STATE-STORE published again..
> 
> Here is the sudo code .. anything I am missing or anything can be done to avoid this ..
> 
> 
> KTable<Windowed<String>, String> test = <KStreamObject>
> .groupByKey()
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3)))
> .aggregate(
>         ....
>         ,
>         Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store")
>                     .withRetention(Duration.ofMinutes(5))
>                     .with(Serdes.String(), Serdes.String())
>    )
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> 
> .toStream()
> 
> .to("<topic_out>")
> 
> 
> 
> 
> 
> So when restarting the stream app, the <topic_out> will have duplicated messages from a while back ... is this expected behavior ?
> 
> Thanks,
> Tao Wang
> 
> 
> 
> 
> 
> ________________________________
> 
> This message may contain confidential information and is intended for specific recipients unless explicitly noted otherwise. If you have reason to believe you are not an intended recipient of this message, please delete it and notify the sender. This message may not represent the opinion of Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates, and does not constitute a contract or guarantee. Unencrypted electronic mail is not secure and the recipient of this message is expected to provide safeguards from viruses and pursue alternate means of communication where privacy or a binding message is desired.
>