You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ajo Thomas <aj...@gmail.com> on 2021/06/11 15:18:14 UTC

Portable Python pipeline not splitting reads across executors

Hi folks,

I am working on running a Portable Python pipeline on Spark.
The test pipeline is very straightforward where I am trying to read some
avro data in hdfs using avroio (native io and not an external transform)
and write it back to hdfs. Here is the pipeline:

Pipeline:
pipeline_options = get_pipeline_options()
input = "hdfs://inputpath/*"
schema = get_schema(input)
output = "hdfs://outputpath/out"
"""Pipeline"""
p = Pipeline(options=pipeline_options)
(p
| 'read' >> beam.io.ReadFromAvro(input)
| 'write' >> beam.io.WriteToAvro(file_path_prefix=output, schema=schema,
file_name_suffix=".avro"))
p.run().wait_until_finish()


Here is the dot rendered representation of the fused pipeline from
SparkPortableRunner:
{
rankdir=LR
0 [label="read/Read/Impulse\nbeam:transform:impulse:v1"]
1 [label="17read/Read/Impulse.None/beam:env:process:v1:0\n
beam:runner:executable_stage:v1"]
0 -> 1 [style=solid label="1"]
2 [label="40read/Read/Map(<lambda at
iobase.py:899>).None/SplitAndSize0/beam:env:process:v1:0\n
beam:runner:executable_stage:v1"]
1 -> 2 [style=solid label="2/SplitAndSize0"]
}

I have assigned about 10 executors for the pipeline run. I also have added
some logs in the io to see how the io is splitting the bundles. I can see
that all the reads are happening through a single executor while other
executors are idle. Any pointers to resolve this would be appreciated.

Thanks
Ajo

Re: Portable Python pipeline not splitting reads across executors

Posted by Ajo Thomas <aj...@gmail.com>.
Update for anyone who might run into this:

I was able to get this fixed by using  beam.io.ReadAllFromAvro([input]) instead
of beam.io.ReadFromAvro(input).
After some digging I realized that `ReadFromAvro` relies on splittable
dofns to split reads and splittable do fns are not supported in SparkRunner.
Switching to ReadAllFromAvro worked as it relies on
filebasedsource.ReadAllFiless which uses a different approach to splitting
the work.

Thanks
Ajo


On Fri, Jun 11, 2021 at 8:18 AM Ajo Thomas <aj...@gmail.com> wrote:

> Hi folks,
>
> I am working on running a Portable Python pipeline on Spark.
> The test pipeline is very straightforward where I am trying to read some
> avro data in hdfs using avroio (native io and not an external transform)
> and write it back to hdfs. Here is the pipeline:
>
> Pipeline:
> pipeline_options = get_pipeline_options()
> input = "hdfs://inputpath/*"
> schema = get_schema(input)
> output = "hdfs://outputpath/out"
> """Pipeline"""
> p = Pipeline(options=pipeline_options)
> (p
> | 'read' >> beam.io.ReadFromAvro(input)
> | 'write' >> beam.io.WriteToAvro(file_path_prefix=output, schema=schema,
> file_name_suffix=".avro"))
> p.run().wait_until_finish()
>
>
> Here is the dot rendered representation of the fused pipeline from
> SparkPortableRunner:
> {
> rankdir=LR
> 0 [label="read/Read/Impulse\nbeam:transform:impulse:v1"]
> 1 [label="17read/Read/Impulse.None/beam:env:process:v1:0\n
> beam:runner:executable_stage:v1"]
> 0 -> 1 [style=solid label="1"]
> 2 [label="40read/Read/Map(<lambda at
> iobase.py:899>).None/SplitAndSize0/beam:env:process:v1:0\n
> beam:runner:executable_stage:v1"]
> 1 -> 2 [style=solid label="2/SplitAndSize0"]
> }
>
> I have assigned about 10 executors for the pipeline run. I also have added
> some logs in the io to see how the io is splitting the bundles. I can see
> that all the reads are happening through a single executor while other
> executors are idle. Any pointers to resolve this would be appreciated.
>
> Thanks
> Ajo
>
>
>
>
>