You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/13 20:21:29 UTC

[GitHub] [beam] lostluck commented on issue #21817: [Bug]: Difficult to write Go SDK pipelines that stay within memory constraints

lostluck commented on issue #21817:
URL: https://github.com/apache/beam/issues/21817#issuecomment-1154390432

   Thank you for filing the issue! 
   
   From your configuration, you've got [6 threads in parallel per worker](https://gist.github.com/gonzojive/6a5e32dbc5693770cfd07624f8c55bee#file-flink-conf-yaml-L102) 
   
   The short term fix is to process fewer bundles simultaneously, so reducing that number. The SDK is largely expecting the Runner to handle how to schedule work and similar, so it doesn't have any ability to deny the runner's request for processing, other than failing the bundle.
   
   At present the SDK isn't aware at all about how much memory the system is using, as it's unclear how the runner, or the system can handle that. 
   
   After all, unless the downloaded files are being streamed straight to the output files in the same DoFn, they will have to be in memory for some time.
   
   ------
   
   Is everything being executed on a single machine rather than a cluster? 
   What does the pipeline look like? Separated into multiple DoFns? Any Aggregations?
   
   
   How big are each of these files? I'll note that short of streaming a download directly to a file output, there's going to be buffering at least to the size of the file in question.
   
   -----
   
   I will note that the segment of the heap graph you've provided shows none of the places where allocations are occurring.
   
   ----
   
   That said, here's some areas to look into depending on the pipeline. TBH as described, neither of these are likely to help.
   
   As implemented, the SDK will buffer some number of elements per bundle being processed. See [datamgr.go](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/datamgr.go#L32) after that, that additional elements will not be accepted from the Runner until something has processed through. This happens using [standard channel blocking](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/datamgr.go#L454).
   
   The other place where memory might "back up" is the [Combiner Lifting Cache](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/combine.go#L436) this currently use a map, and a fixed cap on eviction size. We would love to make that more memory aware, so that more or less memory pressure will evict elements and allow things to GC. A good mechanism for this hasn't been determined, as in general, there's value in keeping the cache as full as possible so that elements are combined before the shuffle.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org