You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Hannah Jiang (Jira)" <ji...@apache.org> on 2020/01/30 23:33:00 UTC

[jira] [Updated] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

     [ https://issues.apache.org/jira/browse/BEAM-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hannah Jiang updated BEAM-9228:
-------------------------------
    Description: 
A user reported following issue.

-------------------------------------------------
I have a set of tfrecord files, obtained by converting parquet files with Spark. Each file is roughly 1GB and I have 11 of those.

I would expect simple statistics gathering (ie counting number of items of all files) to scale linearly with respect to the number of cores on my system.

I am able to reproduce the issue with the minimal snippet below

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.portability import fn_api_runner
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability import python_urns
import sys

pipeline_options = PipelineOptions(['--direct_num_workers', '4'])

file_pattern = 'part-r-00*
runner=fn_api_runner.FnApiRunner(
          default_environment=beam_runner_api_pb2.Environment(
              urn=python_urns.SUBPROCESS_SDK,
              payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
                        % sys.executable.encode('ascii')))

p = beam.Pipeline(runner=runner, options=pipeline_options)

lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
                 | beam.combiners.Count.Globally()
                 | beam.io.WriteToText('/tmp/output'))

p.run()

Only one combination of apache_beam revision / worker type seems to work (I refer to https://beam.apache.org/documentation/runners/direct/ for the worker types)
* beam 2.16; neither multithread nor multiprocess achieve high cpu usage on multiple cores
* beam 2.17: able to achieve high cpu usage on all 4 cores
* beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails when trying to serialize the Environment instance most likely because of a change from 2.17 to 2.18.

I also tried briefly SparkRunner with version 2.16 but was no able to achieve any throughput.

What is the recommnended way to achieve what I am trying to ? How can I troubleshoot ?
----------------------------------------------------------------------------------------------------------------------------------------------

This is caused by [this PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].

A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed that data is shuffled to multiple workers, however, there are some regressions with SDF wrapper tests.

  was:
A user reported following issue.

-------------------------------------------------
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.portability import fn_api_runner
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability import python_urns
import sys

pipeline_options = PipelineOptions(['--direct_num_workers', '4'])

file_pattern = 'part-r-00*
runner=fn_api_runner.FnApiRunner(
          default_environment=beam_runner_api_pb2.Environment(
              urn=python_urns.SUBPROCESS_SDK,
              payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
                        % sys.executable.encode('ascii')))

p = beam.Pipeline(runner=runner, options=pipeline_options)

lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
           | beam.combiners.Count.Globally()
           | beam.io.WriteToText('/tmp/output'))

p.run()
--------------------------------------------------------------------------------------

This is caused by [this PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].

A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed that data is shuffled to multiple workers, however, there are some regressions with SDF wrapper tests.


> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> --------------------------------------------------------------------
>
>                 Key: BEAM-9228
>                 URL: https://issues.apache.org/jira/browse/BEAM-9228
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.16.0, 2.18.0
>            Reporter: Hannah Jiang
>            Priority: Major
>
> A user reported following issue.
> -------------------------------------------------
> I have a set of tfrecord files, obtained by converting parquet files with Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>           default_environment=beam_runner_api_pb2.Environment(
>               urn=python_urns.SUBPROCESS_SDK,
>               payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
>                         % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>                  | beam.combiners.Count.Globally()
>                  | beam.io.WriteToText('/tmp/output'))
> p.run()
> Only one combination of apache_beam revision / worker type seems to work (I refer to https://beam.apache.org/documentation/runners/direct/ for the worker types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails when trying to serialize the Environment instance most likely because of a change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I troubleshoot ?
> ----------------------------------------------------------------------------------------------------------------------------------------------
> This is caused by [this PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed that data is shuffled to multiple workers, however, there are some regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)