You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 12:36:27 UTC

[GitHub] [beam] damccorm opened a new issue, #19652: Intermittent empty accumulator values in extractOutput of Combine.perKey on Dataflow

damccorm opened a new issue, #19652:
URL: https://github.com/apache/beam/issues/19652

   We are using Spotify’s scio 0.7.2 which is built with Apache Beam 2.10.0 on Google Dataflow in streaming mode with fixed windows.
   
   Using the above versions we have observed a strange and unfortunately intermittent behaviour with Combine.perKey transform used to achieve a reduce operation, e.g. emitting the max value per key or the value based on the last element with the key in window.
   
   Such reductions are implemented in scio as Combine.CombineFn with the accumulator created as an empty ArrayList and extractOutput doing the actual reduction and returning the output value.
   
   This works well when at trigger time combine accumulator is non empty and I understand that there should be no triggers fired if there are no input messages processed in the given window for a given key. Otherwise if it is fired I think we may assume there was at least one event with a given key in a given window and it should be in accumulator.
   
   The transform is part of a job consisting of 40-50 transforms that is consuming messages from two different PubSub topics, transforming, windowing, combining them and then joining to emit output messages to a PubSub topic. Messages in input topics are pulled at 5-300 per second rate depending on a time of day.
   
   We did run this job split into 3 separate jobs for 6**** months and observed no similar problems but it was not optimal as each of those jobs were using 10-30% of worker CPU. It is after we combined those separate jobs into one *we have started observing exceptions* in the step where the specific transform was used and for which the direct cause is an empty accumulator at the time when window triggers are fired. Those errors happened on subscriptions that had 1 hour retention set and the CPUs were quite stressed then.
   
   We tried changing machine type to larger ones “-n2” -\> “-n4” and an hour of retention was consumed without errors. After another try with retention of 3 hours that was successful we tried consuming 6 hours of retention which then again failed.
   
   We have found similar issues at scio's bugtracker:
   
   [https://github.com/spotify/scio/issues/778](https://github.com/spotify/scio/issues/778)
   
   [https://github.com/spotify/scio/issues/1620](https://github.com/spotify/scio/issues/1620)
   
   The workaround proposed there is to use a custom `aggregateByKey` transform which is also based on Combine.perKey but uses a `zero` value which is output when the accumulator is empty. We used this workaround but it is not optimal as there are some cases that there is no good default value, e.g. last/first message in window.
   
   While searching through Beam's jira I have found an issue that may be similar to ours: https://issues.apache.org/jira/browse/BEAM-7614
   
   I assume that this issues happen when the CPU, memory or both are stressed in a catch up phase after starting a job with some retention to consume.
   
   Imported from Jira [BEAM-7639](https://issues.apache.org/jira/browse/BEAM-7639). Original Jira may contain additional context.
   Reported by: piter.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org