You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Eleanore Jin <el...@gmail.com> on 2020/05/03 01:38:54 UTC

Broadcast stream causing GC overhead limit exceeded

Hi All,

I am using apache Beam with Flink (1.8.2). In my job, I am using Beam
sideinput (which translates into Flink NonKeyedBroadcastStream) to do
filter of the data from main stream.

I have experienced OOM: GC overhead limit exceeded continuously.

After did some experiments, I observed following behaviour:
1. run job without side input(broadcast stream): no OOM issue
2. run job with side input (kafka topic with 1 partition) with data
available from this side input: no OOM issue
3. run job with side input (kafka topic with 1 partition) without any data
from the side input: *OOM issue*
4. From the heap dump, the message (of type ObjectNode) cannot be GC'd
looks like due to the references hold by Broadcast stream
[image: image.png]

My question is: what is the behaviour from Broadcast stream if there is no
data available? Does it cache the data from main stream and wait until data
becoming available from Broadcast stream to process?

Thanks a lot!
Eleanore

Re: Broadcast stream causing GC overhead limit exceeded

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Eleanore,

Thanks for sharing your findings with us. :-)

Cheers, Fabian

Am Do., 7. Mai 2020 um 04:56 Uhr schrieb Eleanore Jin <
eleanore.jin@gmail.com>:

> Hi Fabian,
>
> I just got confirmation from Apache Beam community, Beam will buffer the
> data until there is data from broadcast stream.
>
> Thanks!
> Eleanore
>
> On Tue, May 5, 2020 at 12:31 AM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Eleanore,
>>
>> The "GC overhead limit exceeded" error shows that the JVM spends way too
>> much time garbage collecting and only recovers little memory with every run.
>> Since, the program doesn't make any progress in such a situation it is
>> terminated with the GC Overhead Error. This typically happens when lots of
>> temporary objects are created.
>> The root cause could be Flink, Beam, or your own code.
>> It's important to understand that this error is not directly related to a
>> shortage of memory (although more memory can help to mitigate the issue a
>> bit) but rather indicates an implementation issue.
>>
>> Coming back to your question, Flink's Broadcast stream does *not* block
>> or collect events from the non-broadcasted side if the broadcast side
>> doesn't serve events.
>> However, the user-implemented operators (Beam or your code in this case)
>> often puts non-broadcasted events into state to wait for input from the
>> other side.
>> Since the error is not about lack of memory, the buffering in Flink state
>> might not be the problem here.
>>
>> Best, Fabian
>>
>>
>>
>>
>>
>> Am So., 3. Mai 2020 um 03:39 Uhr schrieb Eleanore Jin <
>> eleanore.jin@gmail.com>:
>>
>>> Hi All,
>>>
>>> I am using apache Beam with Flink (1.8.2). In my job, I am using Beam
>>> sideinput (which translates into Flink NonKeyedBroadcastStream) to do
>>> filter of the data from main stream.
>>>
>>> I have experienced OOM: GC overhead limit exceeded continuously.
>>>
>>> After did some experiments, I observed following behaviour:
>>> 1. run job without side input(broadcast stream): no OOM issue
>>> 2. run job with side input (kafka topic with 1 partition) with data
>>> available from this side input: no OOM issue
>>> 3. run job with side input (kafka topic with 1 partition) without any
>>> data from the side input: *OOM issue*
>>> 4. From the heap dump, the message (of type ObjectNode) cannot be GC'd
>>> looks like due to the references hold by Broadcast stream
>>> [image: image.png]
>>>
>>> My question is: what is the behaviour from Broadcast stream if there is
>>> no data available? Does it cache the data from main stream and wait until
>>> data becoming available from Broadcast stream to process?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>

Re: Broadcast stream causing GC overhead limit exceeded

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Eleanore,

Thanks for sharing your findings with us. :-)

Cheers, Fabian

Am Do., 7. Mai 2020 um 04:56 Uhr schrieb Eleanore Jin <
eleanore.jin@gmail.com>:

> Hi Fabian,
>
> I just got confirmation from Apache Beam community, Beam will buffer the
> data until there is data from broadcast stream.
>
> Thanks!
> Eleanore
>
> On Tue, May 5, 2020 at 12:31 AM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Eleanore,
>>
>> The "GC overhead limit exceeded" error shows that the JVM spends way too
>> much time garbage collecting and only recovers little memory with every run.
>> Since, the program doesn't make any progress in such a situation it is
>> terminated with the GC Overhead Error. This typically happens when lots of
>> temporary objects are created.
>> The root cause could be Flink, Beam, or your own code.
>> It's important to understand that this error is not directly related to a
>> shortage of memory (although more memory can help to mitigate the issue a
>> bit) but rather indicates an implementation issue.
>>
>> Coming back to your question, Flink's Broadcast stream does *not* block
>> or collect events from the non-broadcasted side if the broadcast side
>> doesn't serve events.
>> However, the user-implemented operators (Beam or your code in this case)
>> often puts non-broadcasted events into state to wait for input from the
>> other side.
>> Since the error is not about lack of memory, the buffering in Flink state
>> might not be the problem here.
>>
>> Best, Fabian
>>
>>
>>
>>
>>
>> Am So., 3. Mai 2020 um 03:39 Uhr schrieb Eleanore Jin <
>> eleanore.jin@gmail.com>:
>>
>>> Hi All,
>>>
>>> I am using apache Beam with Flink (1.8.2). In my job, I am using Beam
>>> sideinput (which translates into Flink NonKeyedBroadcastStream) to do
>>> filter of the data from main stream.
>>>
>>> I have experienced OOM: GC overhead limit exceeded continuously.
>>>
>>> After did some experiments, I observed following behaviour:
>>> 1. run job without side input(broadcast stream): no OOM issue
>>> 2. run job with side input (kafka topic with 1 partition) with data
>>> available from this side input: no OOM issue
>>> 3. run job with side input (kafka topic with 1 partition) without any
>>> data from the side input: *OOM issue*
>>> 4. From the heap dump, the message (of type ObjectNode) cannot be GC'd
>>> looks like due to the references hold by Broadcast stream
>>> [image: image.png]
>>>
>>> My question is: what is the behaviour from Broadcast stream if there is
>>> no data available? Does it cache the data from main stream and wait until
>>> data becoming available from Broadcast stream to process?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>

Re: Broadcast stream causing GC overhead limit exceeded

Posted by Eleanore Jin <el...@gmail.com>.
Hi Fabian,

I just got confirmation from Apache Beam community, Beam will buffer the
data until there is data from broadcast stream.

Thanks!
Eleanore

On Tue, May 5, 2020 at 12:31 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Eleanore,
>
> The "GC overhead limit exceeded" error shows that the JVM spends way too
> much time garbage collecting and only recovers little memory with every run.
> Since, the program doesn't make any progress in such a situation it is
> terminated with the GC Overhead Error. This typically happens when lots of
> temporary objects are created.
> The root cause could be Flink, Beam, or your own code.
> It's important to understand that this error is not directly related to a
> shortage of memory (although more memory can help to mitigate the issue a
> bit) but rather indicates an implementation issue.
>
> Coming back to your question, Flink's Broadcast stream does *not* block or
> collect events from the non-broadcasted side if the broadcast side doesn't
> serve events.
> However, the user-implemented operators (Beam or your code in this case)
> often puts non-broadcasted events into state to wait for input from the
> other side.
> Since the error is not about lack of memory, the buffering in Flink state
> might not be the problem here.
>
> Best, Fabian
>
>
>
>
>
> Am So., 3. Mai 2020 um 03:39 Uhr schrieb Eleanore Jin <
> eleanore.jin@gmail.com>:
>
>> Hi All,
>>
>> I am using apache Beam with Flink (1.8.2). In my job, I am using Beam
>> sideinput (which translates into Flink NonKeyedBroadcastStream) to do
>> filter of the data from main stream.
>>
>> I have experienced OOM: GC overhead limit exceeded continuously.
>>
>> After did some experiments, I observed following behaviour:
>> 1. run job without side input(broadcast stream): no OOM issue
>> 2. run job with side input (kafka topic with 1 partition) with data
>> available from this side input: no OOM issue
>> 3. run job with side input (kafka topic with 1 partition) without any
>> data from the side input: *OOM issue*
>> 4. From the heap dump, the message (of type ObjectNode) cannot be GC'd
>> looks like due to the references hold by Broadcast stream
>> [image: image.png]
>>
>> My question is: what is the behaviour from Broadcast stream if there is
>> no data available? Does it cache the data from main stream and wait until
>> data becoming available from Broadcast stream to process?
>>
>> Thanks a lot!
>> Eleanore
>>
>

Re: Broadcast stream causing GC overhead limit exceeded

Posted by Eleanore Jin <el...@gmail.com>.
Hi Fabian,

I just got confirmation from Apache Beam community, Beam will buffer the
data until there is data from broadcast stream.

Thanks!
Eleanore

On Tue, May 5, 2020 at 12:31 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Eleanore,
>
> The "GC overhead limit exceeded" error shows that the JVM spends way too
> much time garbage collecting and only recovers little memory with every run.
> Since, the program doesn't make any progress in such a situation it is
> terminated with the GC Overhead Error. This typically happens when lots of
> temporary objects are created.
> The root cause could be Flink, Beam, or your own code.
> It's important to understand that this error is not directly related to a
> shortage of memory (although more memory can help to mitigate the issue a
> bit) but rather indicates an implementation issue.
>
> Coming back to your question, Flink's Broadcast stream does *not* block or
> collect events from the non-broadcasted side if the broadcast side doesn't
> serve events.
> However, the user-implemented operators (Beam or your code in this case)
> often puts non-broadcasted events into state to wait for input from the
> other side.
> Since the error is not about lack of memory, the buffering in Flink state
> might not be the problem here.
>
> Best, Fabian
>
>
>
>
>
> Am So., 3. Mai 2020 um 03:39 Uhr schrieb Eleanore Jin <
> eleanore.jin@gmail.com>:
>
>> Hi All,
>>
>> I am using apache Beam with Flink (1.8.2). In my job, I am using Beam
>> sideinput (which translates into Flink NonKeyedBroadcastStream) to do
>> filter of the data from main stream.
>>
>> I have experienced OOM: GC overhead limit exceeded continuously.
>>
>> After did some experiments, I observed following behaviour:
>> 1. run job without side input(broadcast stream): no OOM issue
>> 2. run job with side input (kafka topic with 1 partition) with data
>> available from this side input: no OOM issue
>> 3. run job with side input (kafka topic with 1 partition) without any
>> data from the side input: *OOM issue*
>> 4. From the heap dump, the message (of type ObjectNode) cannot be GC'd
>> looks like due to the references hold by Broadcast stream
>> [image: image.png]
>>
>> My question is: what is the behaviour from Broadcast stream if there is
>> no data available? Does it cache the data from main stream and wait until
>> data becoming available from Broadcast stream to process?
>>
>> Thanks a lot!
>> Eleanore
>>
>

Re: Broadcast stream causing GC overhead limit exceeded

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Eleanore,

The "GC overhead limit exceeded" error shows that the JVM spends way too
much time garbage collecting and only recovers little memory with every run.
Since, the program doesn't make any progress in such a situation it is
terminated with the GC Overhead Error. This typically happens when lots of
temporary objects are created.
The root cause could be Flink, Beam, or your own code.
It's important to understand that this error is not directly related to a
shortage of memory (although more memory can help to mitigate the issue a
bit) but rather indicates an implementation issue.

Coming back to your question, Flink's Broadcast stream does *not* block or
collect events from the non-broadcasted side if the broadcast side doesn't
serve events.
However, the user-implemented operators (Beam or your code in this case)
often puts non-broadcasted events into state to wait for input from the
other side.
Since the error is not about lack of memory, the buffering in Flink state
might not be the problem here.

Best, Fabian





Am So., 3. Mai 2020 um 03:39 Uhr schrieb Eleanore Jin <
eleanore.jin@gmail.com>:

> Hi All,
>
> I am using apache Beam with Flink (1.8.2). In my job, I am using Beam
> sideinput (which translates into Flink NonKeyedBroadcastStream) to do
> filter of the data from main stream.
>
> I have experienced OOM: GC overhead limit exceeded continuously.
>
> After did some experiments, I observed following behaviour:
> 1. run job without side input(broadcast stream): no OOM issue
> 2. run job with side input (kafka topic with 1 partition) with data
> available from this side input: no OOM issue
> 3. run job with side input (kafka topic with 1 partition) without any
> data from the side input: *OOM issue*
> 4. From the heap dump, the message (of type ObjectNode) cannot be GC'd
> looks like due to the references hold by Broadcast stream
> [image: image.png]
>
> My question is: what is the behaviour from Broadcast stream if there is no
> data available? Does it cache the data from main stream and wait until data
> becoming available from Broadcast stream to process?
>
> Thanks a lot!
> Eleanore
>

Re: Broadcast stream causing GC overhead limit exceeded

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Eleanore,

The "GC overhead limit exceeded" error shows that the JVM spends way too
much time garbage collecting and only recovers little memory with every run.
Since, the program doesn't make any progress in such a situation it is
terminated with the GC Overhead Error. This typically happens when lots of
temporary objects are created.
The root cause could be Flink, Beam, or your own code.
It's important to understand that this error is not directly related to a
shortage of memory (although more memory can help to mitigate the issue a
bit) but rather indicates an implementation issue.

Coming back to your question, Flink's Broadcast stream does *not* block or
collect events from the non-broadcasted side if the broadcast side doesn't
serve events.
However, the user-implemented operators (Beam or your code in this case)
often puts non-broadcasted events into state to wait for input from the
other side.
Since the error is not about lack of memory, the buffering in Flink state
might not be the problem here.

Best, Fabian





Am So., 3. Mai 2020 um 03:39 Uhr schrieb Eleanore Jin <
eleanore.jin@gmail.com>:

> Hi All,
>
> I am using apache Beam with Flink (1.8.2). In my job, I am using Beam
> sideinput (which translates into Flink NonKeyedBroadcastStream) to do
> filter of the data from main stream.
>
> I have experienced OOM: GC overhead limit exceeded continuously.
>
> After did some experiments, I observed following behaviour:
> 1. run job without side input(broadcast stream): no OOM issue
> 2. run job with side input (kafka topic with 1 partition) with data
> available from this side input: no OOM issue
> 3. run job with side input (kafka topic with 1 partition) without any
> data from the side input: *OOM issue*
> 4. From the heap dump, the message (of type ObjectNode) cannot be GC'd
> looks like due to the references hold by Broadcast stream
> [image: image.png]
>
> My question is: what is the behaviour from Broadcast stream if there is no
> data available? Does it cache the data from main stream and wait until data
> becoming available from Broadcast stream to process?
>
> Thanks a lot!
> Eleanore
>