You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sushrut Shivaswamy <su...@gmail.com> on 2020/01/18 04:02:41 UTC

KTable Suppress not working

Hey,

I'm building a streams application where I'm trying to aggregate a stream
of events
and getting a list of events per key.
`eventStream
.groupByKey(Grouped.with(Serdes.String(), eventSerde))
.windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1)))
.aggregate(
    ArrayList::new, (eent, accum) -> {
        accum.add(event);
        return accum;
})
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream()
.map((windowedKey, value) -> new KeyValue<String,
List<Event>>(windowedKey.key(), value))
.map(eventProcessor::processEventsWindow)
.to("event-window-chunks-queue", Produced.with(Serdes.String(),
eventListSerde))`

As you can see I'm grouping events by key and capturing windowed lists of
events for further processing.
To be able to process the list of events per key in chunks I added
`suppress()`.
This does not seem to work though.
I get this error multiple times:
`Got error produce response with correlation id 5 on topic-partition
app-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0, retrying
(2147483646 attempts left). Error: NETWORK_EXCEPTION
WARN org.apache.kafka.clients.producer.internals.Sender - Received invalid
metadata error in produce request on partition
shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0 due to
org.apache.kafka.common.errors.NetworkException: The server disconnected
before a response was received.. Going to request metadata update now`

When I comment out the suppress() line it works fine but I get a large
number of events in a list while processing chunks since it does not
suppress already evaluated chunks.
Can anyone help me out with what could be happening here?

Regards,
Sushrut

Re: KTable Suppress not working

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
As far as my understanding goes, aggregated result for a window is not
included in next window.
Window would stay in state store till it gets deleted based on certain
setting however aggregated result for that window will include only the
records that occur within the window duration.

If you have sliding window then there will be overlap in the records
between the two windows, so aggregated result would be based on some common
records.

Example your records in first window is (R1, R2, R3, R4) and in second it
is (R3, R4, R5, R6)
So final data stored would be [W1, Aggr( R1, R2, R3, R4 )] and [W2, Aggr(
R3, R4, R5, R6 )]

As John pointed out that emitted data to downstream may or may not happen
after each record is aggregated for that window. It depends on how frequent
you want to commit your data.
So aggregated data will be built on following way:
Aggr( R1 )
Aggr( R1, R2 )
Aggr( R1, R2, R3 )
Aggr( R1, R2, R3, R4 )

But not all of these aggregated result may be emitted downstream.

Hope this helps.

Thanks
Sachin



On Tue, Jan 21, 2020 at 10:25 AM Sushrut Shivaswamy <
sushrut.shivaswamy@gmail.com> wrote:

> Thanks John.
> That partially answers my question.
> I'm a little confused about when a window will expire.
> As you said, I will receive at most 20 events at T2 but as time goes on
> will the data from the first window always be included in the aggregated
> result?
>
> On Mon, Jan 20, 2020 at 7:55 AM John Roesler <vv...@apache.org> wrote:
>
> > Hi Sushrut,
> >
> > I have to confess I don’t think I fully understand your last message, but
> > I will try to help.
> >
> > It sounds like maybe you’re thinking that streams would just repeatedly
> > emit everything every commit? That is certainly not the case. If there
> are
> > only 10 events in window 1 and 10 in window 2, you would see at most 20
> > output events, regardless of any caching or suppression. That is, if you
> > disable all caches, you get one output record ( an updated aggregation
> > result) for each input record. Enabling caches only serves to reduce the
> > number.
> >
> > I hope this helps,
> > John
> >
> >
> > On Sat, Jan 18, 2020, at 08:36, Sushrut Shivaswamy wrote:
> > > Hey John,
> > >
> > > I tried following the docs here about the configs:
> > >
> `streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> > > 10 * 1024 * 1024L);
> > > // Set commit interval to 1 second.
> > > streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> 1000);`
> > >
> >
> https://kafka.apache.org/10/documentation/streams/developer-guide/memory-mgmt
> > >
> > > I'm trying to group events by id by accumulating them in a list and
> then
> > > spilt the aggregated list
> > > into smaller chunks for processing.
> > > I have a doubt about when windows expire and how aggregated values are
> > > flushed out.
> > > Lets assume in window 1(W1) 10 records arrived and in window 2(W2) 10
> > more
> > > records arrived for the same key.
> > > Assuming the cache can hold only 10 records in memory.
> > > Based on my understanding:
> > > At T1: 10 records from W1 are flushed
> > > At T2: 20 records from W1 + W2 are flushed.
> > > The records from W1 will be duplicated at the next commit time till
> that
> > > window expires.
> > > Is this accurate?
> > > If it is, can you share any way I can avoid/limit the number of times
> > > duplicate data is flushed?
> > >
> > > Thanks,
> > > Sushrut
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Sat, Jan 18, 2020 at 12:00 PM Sushrut Shivaswamy <
> > > sushrut.shivaswamy@gmail.com> wrote:
> > >
> > > > Thanks John,
> > > > I'll try increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> > > > and "COMMIT_INTERVAL_MS_CONFIG" configurations.
> > > >
> > > > Thanks,
> > > > Sushrut
> > > >
> > > > On Sat, Jan 18, 2020 at 11:31 AM John Roesler <vv...@apache.org>
> > wrote:
> > > >
> > > >> Ah, I should add, if you actually want to use suppression, or
> > > >> you need to resolve a similar error message in the future, you
> > > >> probably need to tweak the batch sizes and/or timeout configs
> > > >> of the various clients, and maybe the server as well.
> > > >>
> > > >> That error message kind of sounds like the server went silent
> > > >> long enough that the http session expired, or maybe it suffered
> > > >> a long pause of some kind (GC, de-scheduling, etc.) that caused
> > > >> the OS to hang up the socket.
> > > >>
> > > >> I'm not super familiar with diagnosing these issues; I'm just
> > > >> trying to point you in the right direction in case you wanted
> > > >> to directly solve the given error instead of trying something
> > > >> different.
> > > >>
> > > >> Thanks,
> > > >> -John
> > > >>
> > > >> On Fri, Jan 17, 2020, at 23:33, John Roesler wrote:
> > > >> > Hi Sushrut,
> > > >> >
> > > >> > That's frustrating... I haven't seen that before, but looking at
> the
> > > >> error
> > > >> > in combination with what you say happens without suppress makes
> > > >> > me think there's a large volume of data involved here. Probably,
> > > >> > the problem isn't specific to suppression, but it's just that the
> > > >> > interactions on the suppression buffers are pushing the system
> over
> > > >> > the edge.
> > > >> >
> > > >> > Counterintuitively, adding Suppression can actually increase your
> > > >> > broker traffic because the Suppression buffer has to provide
> > resiliency
> > > >> > guarantees, so it needs its own changelog, even though the
> > aggregation
> > > >> > immediately before it _also_ has a changelog.
> > > >> >
> > > >> > Judging from your description, you were just trying to batch more,
> > > >> rather
> > > >> > than specifically trying to get "final results" semantics for the
> > window
> > > >> > results. In that case, you might want to try removing the
> > suppression
> > > >> > and instead increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> > > >> > and "COMMIT_INTERVAL_MS_CONFIG" configurations.
> > > >> >
> > > >> > Hope this helps,
> > > >> > -John
> > > >> >
> > > >> > On Fri, Jan 17, 2020, at 22:02, Sushrut Shivaswamy wrote:
> > > >> > > Hey,
> > > >> > >
> > > >> > > I'm building a streams application where I'm trying to
> aggregate a
> > > >> stream
> > > >> > > of events
> > > >> > > and getting a list of events per key.
> > > >> > > `eventStream
> > > >> > > .groupByKey(Grouped.with(Serdes.String(), eventSerde))
> > > >> > >
> > > >>
> >
> .windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1)))
> > > >> > > .aggregate(
> > > >> > >     ArrayList::new, (eent, accum) -> {
> > > >> > >         accum.add(event);
> > > >> > >         return accum;
> > > >> > > })
> > > >> > >
> .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> > > >> > > .toStream()
> > > >> > > .map((windowedKey, value) -> new KeyValue<String,
> > > >> > > List<Event>>(windowedKey.key(), value))
> > > >> > > .map(eventProcessor::processEventsWindow)
> > > >> > > .to("event-window-chunks-queue", Produced.with(Serdes.String(),
> > > >> > > eventListSerde))`
> > > >> > >
> > > >> > > As you can see I'm grouping events by key and capturing windowed
> > > >> lists of
> > > >> > > events for further processing.
> > > >> > > To be able to process the list of events per key in chunks I
> added
> > > >> > > `suppress()`.
> > > >> > > This does not seem to work though.
> > > >> > > I get this error multiple times:
> > > >> > > `Got error produce response with correlation id 5 on
> > topic-partition
> > > >> > > app-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0,
> > > >> retrying
> > > >> > > (2147483646 attempts left). Error: NETWORK_EXCEPTION
> > > >> > > WARN org.apache.kafka.clients.producer.internals.Sender -
> Received
> > > >> invalid
> > > >> > > metadata error in produce request on partition
> > > >> > >
> shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0
> > > >> due to
> > > >> > > org.apache.kafka.common.errors.NetworkException: The server
> > > >> disconnected
> > > >> > > before a response was received.. Going to request metadata
> update
> > now`
> > > >> > >
> > > >> > > When I comment out the suppress() line it works fine but I get a
> > large
> > > >> > > number of events in a list while processing chunks since it does
> > not
> > > >> > > suppress already evaluated chunks.
> > > >> > > Can anyone help me out with what could be happening here?
> > > >> > >
> > > >> > > Regards,
> > > >> > > Sushrut
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Re: KTable Suppress not working

Posted by Sushrut Shivaswamy <su...@gmail.com>.
Thanks John.
That partially answers my question.
I'm a little confused about when a window will expire.
As you said, I will receive at most 20 events at T2 but as time goes on
will the data from the first window always be included in the aggregated
result?

On Mon, Jan 20, 2020 at 7:55 AM John Roesler <vv...@apache.org> wrote:

> Hi Sushrut,
>
> I have to confess I don’t think I fully understand your last message, but
> I will try to help.
>
> It sounds like maybe you’re thinking that streams would just repeatedly
> emit everything every commit? That is certainly not the case. If there are
> only 10 events in window 1 and 10 in window 2, you would see at most 20
> output events, regardless of any caching or suppression. That is, if you
> disable all caches, you get one output record ( an updated aggregation
> result) for each input record. Enabling caches only serves to reduce the
> number.
>
> I hope this helps,
> John
>
>
> On Sat, Jan 18, 2020, at 08:36, Sushrut Shivaswamy wrote:
> > Hey John,
> >
> > I tried following the docs here about the configs:
> > `streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> > 10 * 1024 * 1024L);
> > // Set commit interval to 1 second.
> > streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);`
> >
> https://kafka.apache.org/10/documentation/streams/developer-guide/memory-mgmt
> >
> > I'm trying to group events by id by accumulating them in a list and then
> > spilt the aggregated list
> > into smaller chunks for processing.
> > I have a doubt about when windows expire and how aggregated values are
> > flushed out.
> > Lets assume in window 1(W1) 10 records arrived and in window 2(W2) 10
> more
> > records arrived for the same key.
> > Assuming the cache can hold only 10 records in memory.
> > Based on my understanding:
> > At T1: 10 records from W1 are flushed
> > At T2: 20 records from W1 + W2 are flushed.
> > The records from W1 will be duplicated at the next commit time till that
> > window expires.
> > Is this accurate?
> > If it is, can you share any way I can avoid/limit the number of times
> > duplicate data is flushed?
> >
> > Thanks,
> > Sushrut
> >
> >
> >
> >
> >
> >
> > On Sat, Jan 18, 2020 at 12:00 PM Sushrut Shivaswamy <
> > sushrut.shivaswamy@gmail.com> wrote:
> >
> > > Thanks John,
> > > I'll try increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> > > and "COMMIT_INTERVAL_MS_CONFIG" configurations.
> > >
> > > Thanks,
> > > Sushrut
> > >
> > > On Sat, Jan 18, 2020 at 11:31 AM John Roesler <vv...@apache.org>
> wrote:
> > >
> > >> Ah, I should add, if you actually want to use suppression, or
> > >> you need to resolve a similar error message in the future, you
> > >> probably need to tweak the batch sizes and/or timeout configs
> > >> of the various clients, and maybe the server as well.
> > >>
> > >> That error message kind of sounds like the server went silent
> > >> long enough that the http session expired, or maybe it suffered
> > >> a long pause of some kind (GC, de-scheduling, etc.) that caused
> > >> the OS to hang up the socket.
> > >>
> > >> I'm not super familiar with diagnosing these issues; I'm just
> > >> trying to point you in the right direction in case you wanted
> > >> to directly solve the given error instead of trying something
> > >> different.
> > >>
> > >> Thanks,
> > >> -John
> > >>
> > >> On Fri, Jan 17, 2020, at 23:33, John Roesler wrote:
> > >> > Hi Sushrut,
> > >> >
> > >> > That's frustrating... I haven't seen that before, but looking at the
> > >> error
> > >> > in combination with what you say happens without suppress makes
> > >> > me think there's a large volume of data involved here. Probably,
> > >> > the problem isn't specific to suppression, but it's just that the
> > >> > interactions on the suppression buffers are pushing the system over
> > >> > the edge.
> > >> >
> > >> > Counterintuitively, adding Suppression can actually increase your
> > >> > broker traffic because the Suppression buffer has to provide
> resiliency
> > >> > guarantees, so it needs its own changelog, even though the
> aggregation
> > >> > immediately before it _also_ has a changelog.
> > >> >
> > >> > Judging from your description, you were just trying to batch more,
> > >> rather
> > >> > than specifically trying to get "final results" semantics for the
> window
> > >> > results. In that case, you might want to try removing the
> suppression
> > >> > and instead increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> > >> > and "COMMIT_INTERVAL_MS_CONFIG" configurations.
> > >> >
> > >> > Hope this helps,
> > >> > -John
> > >> >
> > >> > On Fri, Jan 17, 2020, at 22:02, Sushrut Shivaswamy wrote:
> > >> > > Hey,
> > >> > >
> > >> > > I'm building a streams application where I'm trying to aggregate a
> > >> stream
> > >> > > of events
> > >> > > and getting a list of events per key.
> > >> > > `eventStream
> > >> > > .groupByKey(Grouped.with(Serdes.String(), eventSerde))
> > >> > >
> > >>
> .windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1)))
> > >> > > .aggregate(
> > >> > >     ArrayList::new, (eent, accum) -> {
> > >> > >         accum.add(event);
> > >> > >         return accum;
> > >> > > })
> > >> > > .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> > >> > > .toStream()
> > >> > > .map((windowedKey, value) -> new KeyValue<String,
> > >> > > List<Event>>(windowedKey.key(), value))
> > >> > > .map(eventProcessor::processEventsWindow)
> > >> > > .to("event-window-chunks-queue", Produced.with(Serdes.String(),
> > >> > > eventListSerde))`
> > >> > >
> > >> > > As you can see I'm grouping events by key and capturing windowed
> > >> lists of
> > >> > > events for further processing.
> > >> > > To be able to process the list of events per key in chunks I added
> > >> > > `suppress()`.
> > >> > > This does not seem to work though.
> > >> > > I get this error multiple times:
> > >> > > `Got error produce response with correlation id 5 on
> topic-partition
> > >> > > app-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0,
> > >> retrying
> > >> > > (2147483646 attempts left). Error: NETWORK_EXCEPTION
> > >> > > WARN org.apache.kafka.clients.producer.internals.Sender - Received
> > >> invalid
> > >> > > metadata error in produce request on partition
> > >> > > shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0
> > >> due to
> > >> > > org.apache.kafka.common.errors.NetworkException: The server
> > >> disconnected
> > >> > > before a response was received.. Going to request metadata update
> now`
> > >> > >
> > >> > > When I comment out the suppress() line it works fine but I get a
> large
> > >> > > number of events in a list while processing chunks since it does
> not
> > >> > > suppress already evaluated chunks.
> > >> > > Can anyone help me out with what could be happening here?
> > >> > >
> > >> > > Regards,
> > >> > > Sushrut
> > >> > >
> > >> >
> > >>
> > >
> >
>

Re: KTable Suppress not working

Posted by John Roesler <vv...@apache.org>.
Hi Sushrut,

I have to confess I don’t think I fully understand your last message, but I will try to help.

It sounds like maybe you’re thinking that streams would just repeatedly emit everything every commit? That is certainly not the case. If there are only 10 events in window 1 and 10 in window 2, you would see at most 20 output events, regardless of any caching or suppression. That is, if you disable all caches, you get one output record ( an updated aggregation result) for each input record. Enabling caches only serves to reduce the number. 

I hope this helps,
John


On Sat, Jan 18, 2020, at 08:36, Sushrut Shivaswamy wrote:
> Hey John,
> 
> I tried following the docs here about the configs:
> `streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> 10 * 1024 * 1024L);
> // Set commit interval to 1 second.
> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);`
> https://kafka.apache.org/10/documentation/streams/developer-guide/memory-mgmt
> 
> I'm trying to group events by id by accumulating them in a list and then
> spilt the aggregated list
> into smaller chunks for processing.
> I have a doubt about when windows expire and how aggregated values are
> flushed out.
> Lets assume in window 1(W1) 10 records arrived and in window 2(W2) 10 more
> records arrived for the same key.
> Assuming the cache can hold only 10 records in memory.
> Based on my understanding:
> At T1: 10 records from W1 are flushed
> At T2: 20 records from W1 + W2 are flushed.
> The records from W1 will be duplicated at the next commit time till that
> window expires.
> Is this accurate?
> If it is, can you share any way I can avoid/limit the number of times
> duplicate data is flushed?
> 
> Thanks,
> Sushrut
> 
> 
> 
> 
> 
> 
> On Sat, Jan 18, 2020 at 12:00 PM Sushrut Shivaswamy <
> sushrut.shivaswamy@gmail.com> wrote:
> 
> > Thanks John,
> > I'll try increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> > and "COMMIT_INTERVAL_MS_CONFIG" configurations.
> >
> > Thanks,
> > Sushrut
> >
> > On Sat, Jan 18, 2020 at 11:31 AM John Roesler <vv...@apache.org> wrote:
> >
> >> Ah, I should add, if you actually want to use suppression, or
> >> you need to resolve a similar error message in the future, you
> >> probably need to tweak the batch sizes and/or timeout configs
> >> of the various clients, and maybe the server as well.
> >>
> >> That error message kind of sounds like the server went silent
> >> long enough that the http session expired, or maybe it suffered
> >> a long pause of some kind (GC, de-scheduling, etc.) that caused
> >> the OS to hang up the socket.
> >>
> >> I'm not super familiar with diagnosing these issues; I'm just
> >> trying to point you in the right direction in case you wanted
> >> to directly solve the given error instead of trying something
> >> different.
> >>
> >> Thanks,
> >> -John
> >>
> >> On Fri, Jan 17, 2020, at 23:33, John Roesler wrote:
> >> > Hi Sushrut,
> >> >
> >> > That's frustrating... I haven't seen that before, but looking at the
> >> error
> >> > in combination with what you say happens without suppress makes
> >> > me think there's a large volume of data involved here. Probably,
> >> > the problem isn't specific to suppression, but it's just that the
> >> > interactions on the suppression buffers are pushing the system over
> >> > the edge.
> >> >
> >> > Counterintuitively, adding Suppression can actually increase your
> >> > broker traffic because the Suppression buffer has to provide resiliency
> >> > guarantees, so it needs its own changelog, even though the aggregation
> >> > immediately before it _also_ has a changelog.
> >> >
> >> > Judging from your description, you were just trying to batch more,
> >> rather
> >> > than specifically trying to get "final results" semantics for the window
> >> > results. In that case, you might want to try removing the suppression
> >> > and instead increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> >> > and "COMMIT_INTERVAL_MS_CONFIG" configurations.
> >> >
> >> > Hope this helps,
> >> > -John
> >> >
> >> > On Fri, Jan 17, 2020, at 22:02, Sushrut Shivaswamy wrote:
> >> > > Hey,
> >> > >
> >> > > I'm building a streams application where I'm trying to aggregate a
> >> stream
> >> > > of events
> >> > > and getting a list of events per key.
> >> > > `eventStream
> >> > > .groupByKey(Grouped.with(Serdes.String(), eventSerde))
> >> > >
> >> .windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1)))
> >> > > .aggregate(
> >> > >     ArrayList::new, (eent, accum) -> {
> >> > >         accum.add(event);
> >> > >         return accum;
> >> > > })
> >> > > .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> >> > > .toStream()
> >> > > .map((windowedKey, value) -> new KeyValue<String,
> >> > > List<Event>>(windowedKey.key(), value))
> >> > > .map(eventProcessor::processEventsWindow)
> >> > > .to("event-window-chunks-queue", Produced.with(Serdes.String(),
> >> > > eventListSerde))`
> >> > >
> >> > > As you can see I'm grouping events by key and capturing windowed
> >> lists of
> >> > > events for further processing.
> >> > > To be able to process the list of events per key in chunks I added
> >> > > `suppress()`.
> >> > > This does not seem to work though.
> >> > > I get this error multiple times:
> >> > > `Got error produce response with correlation id 5 on topic-partition
> >> > > app-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0,
> >> retrying
> >> > > (2147483646 attempts left). Error: NETWORK_EXCEPTION
> >> > > WARN org.apache.kafka.clients.producer.internals.Sender - Received
> >> invalid
> >> > > metadata error in produce request on partition
> >> > > shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0
> >> due to
> >> > > org.apache.kafka.common.errors.NetworkException: The server
> >> disconnected
> >> > > before a response was received.. Going to request metadata update now`
> >> > >
> >> > > When I comment out the suppress() line it works fine but I get a large
> >> > > number of events in a list while processing chunks since it does not
> >> > > suppress already evaluated chunks.
> >> > > Can anyone help me out with what could be happening here?
> >> > >
> >> > > Regards,
> >> > > Sushrut
> >> > >
> >> >
> >>
> >
>

Re: KTable Suppress not working

Posted by Sushrut Shivaswamy <su...@gmail.com>.
Hey John,

I tried following the docs here about the configs:
`streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
10 * 1024 * 1024L);
// Set commit interval to 1 second.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);`
https://kafka.apache.org/10/documentation/streams/developer-guide/memory-mgmt

I'm trying to group events by id by accumulating them in a list and then
spilt the aggregated list
into smaller chunks for processing.
I have a doubt about when windows expire and how aggregated values are
flushed out.
Lets assume in window 1(W1) 10 records arrived and in window 2(W2) 10 more
records arrived for the same key.
Assuming the cache can hold only 10 records in memory.
Based on my understanding:
At T1: 10 records from W1 are flushed
At T2: 20 records from W1 + W2 are flushed.
The records from W1 will be duplicated at the next commit time till that
window expires.
Is this accurate?
If it is, can you share any way I can avoid/limit the number of times
duplicate data is flushed?

Thanks,
Sushrut






On Sat, Jan 18, 2020 at 12:00 PM Sushrut Shivaswamy <
sushrut.shivaswamy@gmail.com> wrote:

> Thanks John,
> I'll try increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> and "COMMIT_INTERVAL_MS_CONFIG" configurations.
>
> Thanks,
> Sushrut
>
> On Sat, Jan 18, 2020 at 11:31 AM John Roesler <vv...@apache.org> wrote:
>
>> Ah, I should add, if you actually want to use suppression, or
>> you need to resolve a similar error message in the future, you
>> probably need to tweak the batch sizes and/or timeout configs
>> of the various clients, and maybe the server as well.
>>
>> That error message kind of sounds like the server went silent
>> long enough that the http session expired, or maybe it suffered
>> a long pause of some kind (GC, de-scheduling, etc.) that caused
>> the OS to hang up the socket.
>>
>> I'm not super familiar with diagnosing these issues; I'm just
>> trying to point you in the right direction in case you wanted
>> to directly solve the given error instead of trying something
>> different.
>>
>> Thanks,
>> -John
>>
>> On Fri, Jan 17, 2020, at 23:33, John Roesler wrote:
>> > Hi Sushrut,
>> >
>> > That's frustrating... I haven't seen that before, but looking at the
>> error
>> > in combination with what you say happens without suppress makes
>> > me think there's a large volume of data involved here. Probably,
>> > the problem isn't specific to suppression, but it's just that the
>> > interactions on the suppression buffers are pushing the system over
>> > the edge.
>> >
>> > Counterintuitively, adding Suppression can actually increase your
>> > broker traffic because the Suppression buffer has to provide resiliency
>> > guarantees, so it needs its own changelog, even though the aggregation
>> > immediately before it _also_ has a changelog.
>> >
>> > Judging from your description, you were just trying to batch more,
>> rather
>> > than specifically trying to get "final results" semantics for the window
>> > results. In that case, you might want to try removing the suppression
>> > and instead increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
>> > and "COMMIT_INTERVAL_MS_CONFIG" configurations.
>> >
>> > Hope this helps,
>> > -John
>> >
>> > On Fri, Jan 17, 2020, at 22:02, Sushrut Shivaswamy wrote:
>> > > Hey,
>> > >
>> > > I'm building a streams application where I'm trying to aggregate a
>> stream
>> > > of events
>> > > and getting a list of events per key.
>> > > `eventStream
>> > > .groupByKey(Grouped.with(Serdes.String(), eventSerde))
>> > >
>> .windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1)))
>> > > .aggregate(
>> > >     ArrayList::new, (eent, accum) -> {
>> > >         accum.add(event);
>> > >         return accum;
>> > > })
>> > > .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
>> > > .toStream()
>> > > .map((windowedKey, value) -> new KeyValue<String,
>> > > List<Event>>(windowedKey.key(), value))
>> > > .map(eventProcessor::processEventsWindow)
>> > > .to("event-window-chunks-queue", Produced.with(Serdes.String(),
>> > > eventListSerde))`
>> > >
>> > > As you can see I'm grouping events by key and capturing windowed
>> lists of
>> > > events for further processing.
>> > > To be able to process the list of events per key in chunks I added
>> > > `suppress()`.
>> > > This does not seem to work though.
>> > > I get this error multiple times:
>> > > `Got error produce response with correlation id 5 on topic-partition
>> > > app-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0,
>> retrying
>> > > (2147483646 attempts left). Error: NETWORK_EXCEPTION
>> > > WARN org.apache.kafka.clients.producer.internals.Sender - Received
>> invalid
>> > > metadata error in produce request on partition
>> > > shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0
>> due to
>> > > org.apache.kafka.common.errors.NetworkException: The server
>> disconnected
>> > > before a response was received.. Going to request metadata update now`
>> > >
>> > > When I comment out the suppress() line it works fine but I get a large
>> > > number of events in a list while processing chunks since it does not
>> > > suppress already evaluated chunks.
>> > > Can anyone help me out with what could be happening here?
>> > >
>> > > Regards,
>> > > Sushrut
>> > >
>> >
>>
>

Re: KTable Suppress not working

Posted by Sushrut Shivaswamy <su...@gmail.com>.
Thanks John,
I'll try increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
and "COMMIT_INTERVAL_MS_CONFIG" configurations.

Thanks,
Sushrut

On Sat, Jan 18, 2020 at 11:31 AM John Roesler <vv...@apache.org> wrote:

> Ah, I should add, if you actually want to use suppression, or
> you need to resolve a similar error message in the future, you
> probably need to tweak the batch sizes and/or timeout configs
> of the various clients, and maybe the server as well.
>
> That error message kind of sounds like the server went silent
> long enough that the http session expired, or maybe it suffered
> a long pause of some kind (GC, de-scheduling, etc.) that caused
> the OS to hang up the socket.
>
> I'm not super familiar with diagnosing these issues; I'm just
> trying to point you in the right direction in case you wanted
> to directly solve the given error instead of trying something
> different.
>
> Thanks,
> -John
>
> On Fri, Jan 17, 2020, at 23:33, John Roesler wrote:
> > Hi Sushrut,
> >
> > That's frustrating... I haven't seen that before, but looking at the
> error
> > in combination with what you say happens without suppress makes
> > me think there's a large volume of data involved here. Probably,
> > the problem isn't specific to suppression, but it's just that the
> > interactions on the suppression buffers are pushing the system over
> > the edge.
> >
> > Counterintuitively, adding Suppression can actually increase your
> > broker traffic because the Suppression buffer has to provide resiliency
> > guarantees, so it needs its own changelog, even though the aggregation
> > immediately before it _also_ has a changelog.
> >
> > Judging from your description, you were just trying to batch more, rather
> > than specifically trying to get "final results" semantics for the window
> > results. In that case, you might want to try removing the suppression
> > and instead increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> > and "COMMIT_INTERVAL_MS_CONFIG" configurations.
> >
> > Hope this helps,
> > -John
> >
> > On Fri, Jan 17, 2020, at 22:02, Sushrut Shivaswamy wrote:
> > > Hey,
> > >
> > > I'm building a streams application where I'm trying to aggregate a
> stream
> > > of events
> > > and getting a list of events per key.
> > > `eventStream
> > > .groupByKey(Grouped.with(Serdes.String(), eventSerde))
> > >
> .windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1)))
> > > .aggregate(
> > >     ArrayList::new, (eent, accum) -> {
> > >         accum.add(event);
> > >         return accum;
> > > })
> > > .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> > > .toStream()
> > > .map((windowedKey, value) -> new KeyValue<String,
> > > List<Event>>(windowedKey.key(), value))
> > > .map(eventProcessor::processEventsWindow)
> > > .to("event-window-chunks-queue", Produced.with(Serdes.String(),
> > > eventListSerde))`
> > >
> > > As you can see I'm grouping events by key and capturing windowed lists
> of
> > > events for further processing.
> > > To be able to process the list of events per key in chunks I added
> > > `suppress()`.
> > > This does not seem to work though.
> > > I get this error multiple times:
> > > `Got error produce response with correlation id 5 on topic-partition
> > > app-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0,
> retrying
> > > (2147483646 attempts left). Error: NETWORK_EXCEPTION
> > > WARN org.apache.kafka.clients.producer.internals.Sender - Received
> invalid
> > > metadata error in produce request on partition
> > > shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0 due
> to
> > > org.apache.kafka.common.errors.NetworkException: The server
> disconnected
> > > before a response was received.. Going to request metadata update now`
> > >
> > > When I comment out the suppress() line it works fine but I get a large
> > > number of events in a list while processing chunks since it does not
> > > suppress already evaluated chunks.
> > > Can anyone help me out with what could be happening here?
> > >
> > > Regards,
> > > Sushrut
> > >
> >
>

Re: KTable Suppress not working

Posted by John Roesler <vv...@apache.org>.
Ah, I should add, if you actually want to use suppression, or
you need to resolve a similar error message in the future, you
probably need to tweak the batch sizes and/or timeout configs
of the various clients, and maybe the server as well.

That error message kind of sounds like the server went silent
long enough that the http session expired, or maybe it suffered
a long pause of some kind (GC, de-scheduling, etc.) that caused
the OS to hang up the socket.

I'm not super familiar with diagnosing these issues; I'm just 
trying to point you in the right direction in case you wanted
to directly solve the given error instead of trying something 
different.

Thanks,
-John

On Fri, Jan 17, 2020, at 23:33, John Roesler wrote:
> Hi Sushrut,
> 
> That's frustrating... I haven't seen that before, but looking at the error
> in combination with what you say happens without suppress makes
> me think there's a large volume of data involved here. Probably,
> the problem isn't specific to suppression, but it's just that the
> interactions on the suppression buffers are pushing the system over
> the edge.
> 
> Counterintuitively, adding Suppression can actually increase your
> broker traffic because the Suppression buffer has to provide resiliency
> guarantees, so it needs its own changelog, even though the aggregation
> immediately before it _also_ has a changelog.
> 
> Judging from your description, you were just trying to batch more, rather
> than specifically trying to get "final results" semantics for the window
> results. In that case, you might want to try removing the suppression
> and instead increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> and "COMMIT_INTERVAL_MS_CONFIG" configurations.
> 
> Hope this helps,
> -John
> 
> On Fri, Jan 17, 2020, at 22:02, Sushrut Shivaswamy wrote:
> > Hey,
> > 
> > I'm building a streams application where I'm trying to aggregate a stream
> > of events
> > and getting a list of events per key.
> > `eventStream
> > .groupByKey(Grouped.with(Serdes.String(), eventSerde))
> > .windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1)))
> > .aggregate(
> >     ArrayList::new, (eent, accum) -> {
> >         accum.add(event);
> >         return accum;
> > })
> > .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> > .toStream()
> > .map((windowedKey, value) -> new KeyValue<String,
> > List<Event>>(windowedKey.key(), value))
> > .map(eventProcessor::processEventsWindow)
> > .to("event-window-chunks-queue", Produced.with(Serdes.String(),
> > eventListSerde))`
> > 
> > As you can see I'm grouping events by key and capturing windowed lists of
> > events for further processing.
> > To be able to process the list of events per key in chunks I added
> > `suppress()`.
> > This does not seem to work though.
> > I get this error multiple times:
> > `Got error produce response with correlation id 5 on topic-partition
> > app-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0, retrying
> > (2147483646 attempts left). Error: NETWORK_EXCEPTION
> > WARN org.apache.kafka.clients.producer.internals.Sender - Received invalid
> > metadata error in produce request on partition
> > shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0 due to
> > org.apache.kafka.common.errors.NetworkException: The server disconnected
> > before a response was received.. Going to request metadata update now`
> > 
> > When I comment out the suppress() line it works fine but I get a large
> > number of events in a list while processing chunks since it does not
> > suppress already evaluated chunks.
> > Can anyone help me out with what could be happening here?
> > 
> > Regards,
> > Sushrut
> >
>

Re: KTable Suppress not working

Posted by John Roesler <vv...@apache.org>.
Hi Sushrut,

That's frustrating... I haven't seen that before, but looking at the error
in combination with what you say happens without suppress makes
me think there's a large volume of data involved here. Probably,
the problem isn't specific to suppression, but it's just that the
interactions on the suppression buffers are pushing the system over
the edge.

Counterintuitively, adding Suppression can actually increase your
broker traffic because the Suppression buffer has to provide resiliency
guarantees, so it needs its own changelog, even though the aggregation
immediately before it _also_ has a changelog.

Judging from your description, you were just trying to batch more, rather
than specifically trying to get "final results" semantics for the window
results. In that case, you might want to try removing the suppression
and instead increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
and "COMMIT_INTERVAL_MS_CONFIG" configurations.

Hope this helps,
-John

On Fri, Jan 17, 2020, at 22:02, Sushrut Shivaswamy wrote:
> Hey,
> 
> I'm building a streams application where I'm trying to aggregate a stream
> of events
> and getting a list of events per key.
> `eventStream
> .groupByKey(Grouped.with(Serdes.String(), eventSerde))
> .windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1)))
> .aggregate(
>     ArrayList::new, (eent, accum) -> {
>         accum.add(event);
>         return accum;
> })
> .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> .toStream()
> .map((windowedKey, value) -> new KeyValue<String,
> List<Event>>(windowedKey.key(), value))
> .map(eventProcessor::processEventsWindow)
> .to("event-window-chunks-queue", Produced.with(Serdes.String(),
> eventListSerde))`
> 
> As you can see I'm grouping events by key and capturing windowed lists of
> events for further processing.
> To be able to process the list of events per key in chunks I added
> `suppress()`.
> This does not seem to work though.
> I get this error multiple times:
> `Got error produce response with correlation id 5 on topic-partition
> app-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0, retrying
> (2147483646 attempts left). Error: NETWORK_EXCEPTION
> WARN org.apache.kafka.clients.producer.internals.Sender - Received invalid
> metadata error in produce request on partition
> shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0 due to
> org.apache.kafka.common.errors.NetworkException: The server disconnected
> before a response was received.. Going to request metadata update now`
> 
> When I comment out the suppress() line it works fine but I get a large
> number of events in a list while processing chunks since it does not
> suppress already evaluated chunks.
> Can anyone help me out with what could be happening here?
> 
> Regards,
> Sushrut
>