You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Eleanore Jin (Jira)" <ji...@apache.org> on 2020/05/07 04:27:00 UTC

[jira] [Updated] (BEAM-9914) Cache Main input while no data from side input could cause OOM

     [ https://issues.apache.org/jira/browse/BEAM-9914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eleanore Jin updated BEAM-9914:
-------------------------------
    Description: 
I am running beam(2.16)  with flink (1.8.2), in my pipeline there is a sideinput which reads from a compact kafka topic from earliest, and the sideinput value is used for filtering. I keeps on getting the OOM: GC overhead limit exceeded.
 
 After some more experience, I observed following:1. run pipeline without sideinput: no OOM issue
 2. run pipeline with sideinput (kafka topic with 1 partition) with data available from this side input: no OOM issue
 3. run pipeline with sideinput (kafka topic with 1 partition) {color:#ff0000}without {color}{color:#000000}any data from the side input: {color}{color:#ff0000}*_OOM issue_*{color}
  
 {color:#ff0000}**According to the email conversation with [~angoenka], beam buffers the message from main input if there is no data from side input. {color}
  
 From Flink side:
 Flink's Broadcast stream does *not* block or collect events from the non-broadcasted side if the broadcast side doesn't serve events.
 However, the user-implemented operators (Beam or your code in this case) often puts non-broadcasted events into state to wait for input from the other side.

  was:
I am running beam(2.16)  with flink (1.8.2), in my pipeline there is a sideinput which reads from a compact kafka topic from earliest, and the sideinput value is used for filtering. I keeps on getting the OOM: GC overhead limit exceeded.
 
!image-2020-05-06-21-25-55-323.png!
 
After some more experience, I observed following:1. run pipeline without sideinput: no OOM issue
2. run pipeline with sideinput (kafka topic with 1 partition) with data available from this side input: no OOM issue
3. run pipeline with sideinput (kafka topic with 1 partition) {color:#ff0000}without {color}{color:#000000}any data from the side input: {color}{color:#ff0000}*_OOM issue_*{color}
 
{color:#ff0000}**{color:#172b4d}According to the email conversation with [~angoenka], beam buffers the message from main input if there is no data from side input. {color}{color}
 
{color:#ff0000}{color:#172b4d}From Flink side:{color}{color}
Flink's Broadcast stream does *not* block or collect events from the non-broadcasted side if the broadcast side doesn't serve events.
However, the user-implemented operators (Beam or your code in this case) often puts non-broadcasted events into state to wait for input from the other side.


> Cache Main input while no data from side input could cause OOM
> --------------------------------------------------------------
>
>                 Key: BEAM-9914
>                 URL: https://issues.apache.org/jira/browse/BEAM-9914
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model
>    Affects Versions: 2.16.0
>            Reporter: Eleanore Jin
>            Priority: Major
>
> I am running beam(2.16)  with flink (1.8.2), in my pipeline there is a sideinput which reads from a compact kafka topic from earliest, and the sideinput value is used for filtering. I keeps on getting the OOM: GC overhead limit exceeded.
>  
>  After some more experience, I observed following:1. run pipeline without sideinput: no OOM issue
>  2. run pipeline with sideinput (kafka topic with 1 partition) with data available from this side input: no OOM issue
>  3. run pipeline with sideinput (kafka topic with 1 partition) {color:#ff0000}without {color}{color:#000000}any data from the side input: {color}{color:#ff0000}*_OOM issue_*{color}
>   
>  {color:#ff0000}**According to the email conversation with [~angoenka], beam buffers the message from main input if there is no data from side input. {color}
>   
>  From Flink side:
>  Flink's Broadcast stream does *not* block or collect events from the non-broadcasted side if the broadcast side doesn't serve events.
>  However, the user-implemented operators (Beam or your code in this case) often puts non-broadcasted events into state to wait for input from the other side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)