You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Hannah Jiang <ha...@google.com> on 2020/02/25 00:57:26 UTC

Re: Unable to reliably have multiple cores working on a dataset with DirectRunner

This issue will be fixed from v2.20.
PR: https://github.com/apache/beam/pull/10847


On Fri, Jan 31, 2020 at 9:52 AM Hannah Jiang <ha...@google.com> wrote:

> Yeap, here is the Jira ticket. BEAM-9228
> <https://issues.apache.org/jira/browse/BEAM-9228>
> I just confirmed that the reshuffle operation is not being called at
> https://github.com/apache/beam/blob/release-2.19.0/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py#L926
> .
>
>
> On Fri, Jan 31, 2020 at 9:43 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> It's only optionally inserting a reshuffle:
>>
>> https://github.com/apache/beam/blob/release-2.19.0/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py#L926
>>
>> We should at least have a fusion break (marking the downstream stage
>> as must follows of the upstream) if reshuffle is not a primitive. We
>> also clearly need a better test...I'd be happy to consult on either.
>> Hannah, is there a JIRA for this yet?
>>
>> On Fri, Jan 31, 2020 at 8:41 AM Luke Cwik <lc...@google.com> wrote:
>> >
>> > Is the DirectRunner inserting a reshuffle or redistribute operation
>> within the SplittableDoFn transform expansion so it looks like
>> (PairWithRestriction -> SplitRestriction -> Reshuffle ->
>> ProcessElementAndRestriction)?
>> >
>> >
>> > On Thu, Jan 30, 2020 at 3:32 PM Hannah Jiang <ha...@google.com>
>> wrote:
>> >>
>> >> Hi Julien
>> >>
>> >> Here are some more updates about the issue.
>> >>
>> >> When we run multiprocessing or multithreading mode with DirectRunner,
>> workers are created as expected. However, there are issue(s) with
>> _SDFBoundedSourceWrapper class, so some read transforms send data to a
>> single worker, instead of distributing across workers. Therefore, when we
>> check CPU usages, only one subprocess is working and other workers are idle.
>> >> Unless there are some other transforms that require reshuffle, the
>> dataset is processed by a single worker, which happened to your pipeline.
>> >>
>> >> I tried a workaround, which is rolling back iobase.py not to use
>> _SDFBoundedSourceWrapper class and keep other changes as it is. With this
>> change, data is distributed to multiple workers, however, it has some
>> regressions with SDF wrapper tests, so it didn't work. I created a ticket
>> for the issue.
>> >>
>> >> Thanks for reporting the issue.
>> >> Hannah
>> >>
>> >>
>> >> On Wed, Jan 29, 2020 at 4:33 PM Hannah Jiang <ha...@google.com>
>> wrote:
>> >>>
>> >>> I have investigated some more.
>> >>> Above commit found by Julien fixed it for 2.17.0, but another commit
>> broke it from 2.18, 2.19 (which will be released soon) and head.
>> >>> Root cause haven't been identified, I will keep working on it.
>> >>>
>> >>> So at the moment, only 2.17 works as expected.
>> >>> And from 2.18, environment setting for FnApiRunner was simplied, you
>> can set the runner with following code. This will be updated at
>> documentation soon.
>> >>>
>> -----------------------------------------------------------------------
>> >>>
>> >>> from apache_beam.transforms import environments
>> >>> command_string = '%s -m apache_beam.runners.worker.sdk_worker_main' %
>> sys.executable
>> >>> runner=fn_api_runner.FnApiRunner(
>> >>>
>>  default_environment=environments.SubprocessSDKEnvironment(command_string=command_string))
>> >>>
>> >>> -------------------------------------------------
>> >>>
>> >>>
>> >>>
>> >>> On Wed, Jan 29, 2020 at 2:09 PM Kyle Weaver <kc...@google.com>
>> wrote:
>> >>>>
>> >>>> > I also tried briefly SparkRunner with version 2.16 but was no able
>> to achieve any throughput.
>> >>>>
>> >>>> What do you mean by this?
>> >>>>
>> >>>> On Wed, Jan 29, 2020 at 1:20 PM Julien Lafaye <jl...@gmail.com>
>> wrote:
>> >>>>>
>> >>>>> I confirm the situation gets better after the commit: 4 cores used
>> for 18 seconds rather than one core used for 50 seconds.
>> >>>>>
>> >>>>> I still need to check whether this fixed the original issue which
>> was with tensorflow_data_validation.
>> >>>>>
>> >>>>> But definitely a step for me in the right direction to understand
>> the issue.
>> >>>>>
>> >>>>> On Wed, Jan 29, 2020 at 9:57 PM Robert Bradshaw <
>> robertwb@google.com> wrote:
>> >>>>>>
>> >>>>>> This could influence how sources are read. When you say
>> before/after
>> >>>>>> the commit, is it better now?
>> >>>>>>
>> >>>>>> On Wed, Jan 29, 2020 at 12:10 PM Julien Lafaye <jl...@gmail.com>
>> wrote:
>> >>>>>> >
>> >>>>>> > I took some time to bisect my issue between v2.16.0 & v2.16.0
>> and it looks like the commit below made a difference.
>> >>>>>> >
>> >>>>>> > before the commit: execution time is 50 seconds using the
>> fn_api_runner in multiprocess mode on 4 workers
>> >>>>>> > after the commit: execution time is 18 seconds using the
>> fn_api_runner in multiprocess mode on 4 workers
>> >>>>>> >
>> >>>>>> > commit e0adc9a256cdcf73d172d1c6bd6153d0840d488d (HEAD,
>> refs/bisect/new)
>> >>>>>> > Author: Robert Bradshaw <ro...@gmail.com>
>> >>>>>> > Date:   Fri Oct 18 15:33:10 2019 -0700
>> >>>>>> >
>> >>>>>> >    Make beam_fn_api opt-out rather than opt-in for runners.
>> >>>>>> >
>> >>>>>> >    Also delegate this decision to the runner, rather than
>> checking the string
>> >>>>>> >    value of the flag.
>> >>>>>> >
>> >>>>>> > I looked at the modifications done in this patch but none stroke
>> me as related with my issue.
>> >>>>>> >
>> >>>>>> > On Wed, Jan 29, 2020 at 8:35 AM Julien Lafaye <jl...@gmail.com>
>> wrote:
>> >>>>>> >>
>> >>>>>> >> Hi Hannah,
>> >>>>>> >>
>> >>>>>> >> I used top.
>> >>>>>> >>
>> >>>>>> >> Please let me know if you need any other information that cloud
>> help me understand the issue.
>> >>>>>> >>
>> >>>>>> >> J.
>> >>>>>> >>
>> >>>>>> >> On Wed, Jan 29, 2020 at 8:14 AM Hannah Jiang <
>> hannahjiang@google.com> wrote:
>> >>>>>> >>>
>> >>>>>> >>> Hi Julien
>> >>>>>> >>>
>> >>>>>> >>> Thanks for reaching out user community. I will look into it.
>> Can you please share how you checked CPU usage for each core?
>> >>>>>> >>>
>> >>>>>> >>> Thanks,
>> >>>>>> >>> Hannah
>> >>>>>> >>>
>> >>>>>> >>> On Tue, Jan 28, 2020 at 9:48 PM Julien Lafaye <
>> jlafaye@gmail.com> wrote:
>> >>>>>> >>>>
>> >>>>>> >>>> Hello,
>> >>>>>> >>>>
>> >>>>>> >>>> I have a set of tfrecord files, obtained by converting
>> parquet files with Spark. Each file is roughly 1GB and I have 11 of those.
>> >>>>>> >>>>
>> >>>>>> >>>> I would expect simple statistics gathering (ie counting
>> number of items of all files) to scale linearly with respect to the number
>> of cores on my system.
>> >>>>>> >>>>
>> >>>>>> >>>> I am able to reproduce the issue with the minimal snippet
>> below
>> >>>>>> >>>>
>> >>>>>> >>>> import apache_beam as beam
>> >>>>>> >>>> from apache_beam.options.pipeline_options import
>> PipelineOptions
>> >>>>>> >>>> from apache_beam.runners.portability import fn_api_runner
>> >>>>>> >>>> from apache_beam.portability.api import beam_runner_api_pb2
>> >>>>>> >>>> from apache_beam.portability import python_urns
>> >>>>>> >>>> import sys
>> >>>>>> >>>>
>> >>>>>> >>>> pipeline_options = PipelineOptions(['--direct_num_workers',
>> '4'])
>> >>>>>> >>>>
>> >>>>>> >>>> file_pattern = 'part-r-00*
>> >>>>>> >>>> runner=fn_api_runner.FnApiRunner(
>> >>>>>> >>>>           default_environment=beam_runner_api_pb2.Environment(
>> >>>>>> >>>>               urn=python_urns.SUBPROCESS_SDK,
>> >>>>>> >>>>               payload=b'%s -m
>> apache_beam.runners.worker.sdk_worker_main'
>> >>>>>> >>>>                         % sys.executable.encode('ascii')))
>> >>>>>> >>>>
>> >>>>>> >>>> p = beam.Pipeline(runner=runner, options=pipeline_options)
>> >>>>>> >>>>
>> >>>>>> >>>> lines = (p | 'read' >>
>> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>> >>>>>> >>>>            | beam.combiners.Count.Globally()
>> >>>>>> >>>>            | beam.io.WriteToText('/tmp/output'))
>> >>>>>> >>>>
>> >>>>>> >>>> p.run()
>> >>>>>> >>>>
>> >>>>>> >>>> Only one combination of apache_beam revision / worker type
>> seems to work (I refer to
>> https://beam.apache.org/documentation/runners/direct/ for the worker
>> types)
>> >>>>>> >>>> * beam 2.16; neither multithread nor multiprocess achieve
>> high cpu usage on multiple cores
>> >>>>>> >>>> * beam 2.17: able to achieve high cpu usage on all 4 cores
>> >>>>>> >>>> * beam 2.18: not tested the mulithreaded mode but the
>> multiprocess mode fails when trying to serialize the Environment instance
>> most likely because of a change from 2.17 to 2.18.
>> >>>>>> >>>>
>> >>>>>> >>>> I also tried briefly SparkRunner with version 2.16 but was no
>> able to achieve any throughput.
>> >>>>>> >>>>
>> >>>>>> >>>> What is the recommnended way to achieve what I am trying to ?
>> How can I troubleshoot ?
>> >>>>>> >>>>
>> >>>>>> >>> --
>> >>>>>> >>> Please help me know how I am doing: go/hannahjiang-feedback
>> <https://goto.google.com/hannahjiang-feedback>
>>
>