You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Feba Fathima <fe...@gmail.com> on 2020/12/15 09:10:44 UTC

Doubts on Looping inside a beam transform. Processing sequentially using Apache Beam

Hi,

   We are creating a beam pipeline to do batch processing of data bundles.
The pipeline reads records using CassandraIO. We want to process the data
in batches of 30 min then group/stitch 30 min data and write it to another
table. I have 300 bundles for each employee and we need to process at least
process 50 employees using the limited resources(~2Gi). But currently the
heap usage is very high so that we are only able to process 1 employee(with
~4Gi). if we give more data we are getting Out of memory/Heap errors.

Is there a way to process 1 employee at a time. Like a loop so that we can
process all employees sequentially with our ~2Gi.

We have also posted the same question on Stack Overflow and did not get a
help till now either.

https://stackoverflow.com/questions/65274909/looping-inside-a-beam-transform-process-sequentially-using-apache-beam

Kindly guide us through this if someone is familiar with the scenario.

--
Thanks & Regards,
Feba Fathima

Re: Doubts on Looping inside a beam transform. Processing sequentially using Apache Beam

Posted by Vincent Marquez <vi...@gmail.com>.
Hi Feba,  I can't say for sure *where* your pipeline is running out of
memory, but I'm going to guess that it's due to the fact that CassandraIO
currently only has the ability to read up an entire table, or have a single
query attached.  So if you are calling CassandraIO.read() that grabs all
the "user data", it's going to load it up as much as possible.

I have a pull request to add a readAll() method to CassandraIO that should
allow you to do what you want.  Ismeal and I have been working on it on and
off for quite some time but hoping to get it merged in this month.  The way
readAll works is that it receives as INPUT what query/data needs to be
retrieved from Cassandra, so it can then be used beyond just the first part
of the pipeline.  We are using this quite a lot (from my branch) at my
current gig for when we have pipelines similar to yours.  Here is what it
would look like:

CassanraIO.<User>read() --->  MapElements into a query for readAll()  --->
cassandraIO.<UserData>readAll()  ---> aggregation of user data ---> output


This way if you only have one thread doing the aggregation of user data,
you'll in effect only be doing one user at a time.  I'm not sure when
exactly readAll will be merged in, but you could also write your own
connector that does something similar by copying my code (or taking
inspiration from it, etc).

*~Vincent*


On Tue, Dec 15, 2020 at 1:11 AM Feba Fathima <fe...@gmail.com>
wrote:

>
> Hi,
>
>    We are creating a beam pipeline to do batch processing of data
> bundles. The pipeline reads records using CassandraIO. We want to process
> the data in batches of 30 min then group/stitch 30 min data and write it to
> another table. I have 300 bundles for each employee and we need to process
> at least process 50 employees using the limited resources(~2Gi). But
> currently the heap usage is very high so that we are only able to process 1
> employee(with ~4Gi). if we give more data we are getting Out of memory/Heap
> errors.
>
> Is there a way to process 1 employee at a time. Like a loop so that we can
> process all employees sequentially with our ~2Gi.
>
> We have also posted the same question on Stack Overflow and did not get a
> help till now either.
>
>
> https://stackoverflow.com/questions/65274909/looping-inside-a-beam-transform-process-sequentially-using-apache-beam
>
> Kindly guide us through this if someone is familiar with the scenario.
>
> --
> Thanks & Regards,
> Feba Fathima
>
>
>