You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Yomal de Silva <yo...@gmail.com> on 2023/07/17 11:33:44 UTC

[Question] Processing chunks of data in batch based pipelines

Hi all,

I have a pipeline which reads data from a database(postgresql), enrich the
data through a side input and finally publish the results to Kafka.
Currently I am not using the builtin JDBCIO to read the data but I think
there wont be any difference in using that. With my implementation I have
set the fetchsize and pass the data to the next transform to process. I
have 2 questions here,

1. For batch based processing pipelines is there a way to process elements
in chunks rather than reading the entire dataset and loading that to
memory? What I have observed is that it occupies a significant amount of
memory and may even cause OOM exceptions. I am looking for sort of a
backpressure implementation or any other way to stop reading all the data
into memory until some of the records gets processed. I have found the
following answer [1] which states thats not possible, since this answer was
provided some time ago wanted to check if it is still the case.

2. When dealing with side inputs, again does it loads everything into
memory and use the appropriate window to carry out the operation inside a
transform?

Please let me know if you have any solutions for this.

[1]
https://stackoverflow.com/questions/57580362/how-to-manage-backpressure-with-apache-beam

Thank you.

Re: [Question] Processing chunks of data in batch based pipelines

Posted by Alexey Romanenko <ar...@gmail.com>.
Reading from RDBMS and processing the data downstream of your pipeline is not the same in terms of bundling. 

The main “issue" with a former is that it reads mostly in a single thread per SQL-query and JDBC client is not exception.  So, Beam can’t split data, that are not yet read, into bundles. 

How much records it will keep in the memory is up to corresponding JBDC driver and “fetch size” is just a hint for it. So, it depends on driver, not on Beam in this case.

As a workaround, as I suggested above, it was developed a “partitioned” version of read for JdbcIO which has some weakness but maybe help in some situations.

—
Alexey

> On 17 Jul 2023, at 20:29, Yomal de Silva <yo...@gmail.com> wrote:
> 
> Hi Alexey,
> 
> Yes, I have tried changing the fetch size for my implementation. What I observed through the Flink dashboard was the reading transform gets completed quickly and one of the other transforms takes a much longer time (due to some logic). 
> 
> Even if Apache Beam processes data in bundles when reading from a data source like a database it would not wait till a single bundle reaches the end of the pipeline. Is that understanding correct? So it will eventually read the entire dataset, loading it into memory. 
> 
> I haven't tried the 2nd option you suggested. Will try it out. 
> 
> Thank you
> 
> On Mon, Jul 17, 2023 at 10:08 PM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>> Hi Yomal,
>> 
>> Actually, usually all data in Beam pipeline is processed by bundles (or chunks) if it processed by DoFn. The size of the bundle is up to your processing engine and, iirc, there is no way in Beam to change it.
>> 
>> Talking about your case -  did you try to change a fetch size for Beam’s JdbcIO connector or for your own one?
>> Normally, it just gives a hint for the JDBC driver as to the number of rows that should be fetched from the database [1].
>> 
>> Another option could be to try to read data with JdbcIO.readWithPartitions() that will execute several instances of the query on the same table
>> using ranges [2].
>> 
>> —
>> Alexey
>> 
>> [1] https://github.com/apache/beam/blob/c8f68f92097de33fe2c6863344404a1b9922ae27/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1524
>> [2] https://beam.apache.org/releases/javadoc/2.49.0/org/apache/beam/sdk/io/jdbc/JdbcIO.html#readWithPartitions-org.apache.beam.sdk.values.TypeDescriptor-
>> 
>>> On 17 Jul 2023, at 13:33, Yomal de Silva <yomal.praveen@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi all,
>>> 
>>> I have a pipeline which reads data from a database(postgresql), enrich the data through a side input and finally publish the results to Kafka. Currently I am not using the builtin JDBCIO to read the data but I think there wont be any difference in using that. With my implementation I have set the fetchsize and pass the data to the next transform to process. I have 2 questions here,
>>> 
>>> 1. For batch based processing pipelines is there a way to process elements in chunks rather than reading the entire dataset and loading that to memory? What I have observed is that it occupies a significant amount of memory and may even cause OOM exceptions. I am looking for sort of a backpressure implementation or any other way to stop reading all the data into memory until some of the records gets processed. I have found the following answer [1] which states thats not possible, since this answer was provided some time ago wanted to check if it is still the case.
>>> 
>>> 2. When dealing with side inputs, again does it loads everything into memory and use the appropriate window to carry out the operation inside a transform? 
>>> 
>>> Please let me know if you have any solutions for this. 
>>> 
>>> [1] https://stackoverflow.com/questions/57580362/how-to-manage-backpressure-with-apache-beam
>>> 
>>> Thank you.
>> 


Re: [Question] Processing chunks of data in batch based pipelines

Posted by Yomal de Silva <yo...@gmail.com>.
Hi Alexey,

Yes, I have tried changing the fetch size for my implementation. What I
observed through the Flink dashboard was the reading transform gets
completed quickly and one of the other transforms takes a much longer time
(due to some logic).

Even if Apache Beam processes data in bundles when reading from a data
source like a database it would not wait till a single bundle reaches the
end of the pipeline. Is that understanding correct? So it will eventually
read the entire dataset, loading it into memory.

I haven't tried the 2nd option you suggested. Will try it out.

Thank you

On Mon, Jul 17, 2023 at 10:08 PM Alexey Romanenko <ar...@gmail.com>
wrote:

> Hi Yomal,
>
> Actually, usually all data in Beam pipeline is processed by bundles (or
> chunks) if it processed by DoFn. The size of the bundle is up to your
> processing engine and, iirc, there is no way in Beam to change it.
>
> Talking about your case -  did you try to change a fetch size for Beam’s
> JdbcIO connector or for your own one?
> Normally, it just gives a hint for the JDBC driver as to the number of
> rows that should be fetched from the database [1].
>
> Another option could be to try to read data with
> JdbcIO.readWithPartitions() that will execute several instances of the
> query on the same table
> using ranges [2].
>
> —
> Alexey
>
> [1]
> https://github.com/apache/beam/blob/c8f68f92097de33fe2c6863344404a1b9922ae27/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1524
> [2]
> https://beam.apache.org/releases/javadoc/2.49.0/org/apache/beam/sdk/io/jdbc/JdbcIO.html#readWithPartitions-org.apache.beam.sdk.values.TypeDescriptor-
>
> On 17 Jul 2023, at 13:33, Yomal de Silva <yo...@gmail.com> wrote:
>
> Hi all,
>
> I have a pipeline which reads data from a database(postgresql), enrich the
> data through a side input and finally publish the results to Kafka.
> Currently I am not using the builtin JDBCIO to read the data but I think
> there wont be any difference in using that. With my implementation I have
> set the fetchsize and pass the data to the next transform to process. I
> have 2 questions here,
>
> 1. For batch based processing pipelines is there a way to process elements
> in chunks rather than reading the entire dataset and loading that to
> memory? What I have observed is that it occupies a significant amount of
> memory and may even cause OOM exceptions. I am looking for sort of a
> backpressure implementation or any other way to stop reading all the data
> into memory until some of the records gets processed. I have found the
> following answer [1] which states thats not possible, since this answer was
> provided some time ago wanted to check if it is still the case.
>
> 2. When dealing with side inputs, again does it loads everything into
> memory and use the appropriate window to carry out the operation inside a
> transform?
>
> Please let me know if you have any solutions for this.
>
> [1]
> https://stackoverflow.com/questions/57580362/how-to-manage-backpressure-with-apache-beam
>
> Thank you.
>
>
>

Re: [Question] Processing chunks of data in batch based pipelines

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Yomal,

Actually, usually all data in Beam pipeline is processed by bundles (or chunks) if it processed by DoFn. The size of the bundle is up to your processing engine and, iirc, there is no way in Beam to change it.

Talking about your case -  did you try to change a fetch size for Beam’s JdbcIO connector or for your own one?
Normally, it just gives a hint for the JDBC driver as to the number of rows that should be fetched from the database [1].

Another option could be to try to read data with JdbcIO.readWithPartitions() that will execute several instances of the query on the same table
using ranges [2].

—
Alexey

[1] https://github.com/apache/beam/blob/c8f68f92097de33fe2c6863344404a1b9922ae27/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1524
[2] https://beam.apache.org/releases/javadoc/2.49.0/org/apache/beam/sdk/io/jdbc/JdbcIO.html#readWithPartitions-org.apache.beam.sdk.values.TypeDescriptor-

> On 17 Jul 2023, at 13:33, Yomal de Silva <yo...@gmail.com> wrote:
> 
> Hi all,
> 
> I have a pipeline which reads data from a database(postgresql), enrich the data through a side input and finally publish the results to Kafka. Currently I am not using the builtin JDBCIO to read the data but I think there wont be any difference in using that. With my implementation I have set the fetchsize and pass the data to the next transform to process. I have 2 questions here,
> 
> 1. For batch based processing pipelines is there a way to process elements in chunks rather than reading the entire dataset and loading that to memory? What I have observed is that it occupies a significant amount of memory and may even cause OOM exceptions. I am looking for sort of a backpressure implementation or any other way to stop reading all the data into memory until some of the records gets processed. I have found the following answer [1] which states thats not possible, since this answer was provided some time ago wanted to check if it is still the case.
> 
> 2. When dealing with side inputs, again does it loads everything into memory and use the appropriate window to carry out the operation inside a transform? 
> 
> Please let me know if you have any solutions for this. 
> 
> [1] https://stackoverflow.com/questions/57580362/how-to-manage-backpressure-with-apache-beam
> 
> Thank you.