You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Hannah Jiang (Jira)" <ji...@apache.org> on 2020/01/30 23:19:00 UTC

[jira] [Created] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

Hannah Jiang created BEAM-9228:
----------------------------------

             Summary: _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
                 Key: BEAM-9228
                 URL: https://issues.apache.org/jira/browse/BEAM-9228
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
    Affects Versions: 2.18.0, 2.16.0
            Reporter: Hannah Jiang


A user reported following issue.

-------------------------------------------------
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.portability import fn_api_runner
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability import python_urns
import sys

pipeline_options = PipelineOptions(['--direct_num_workers', '4'])

file_pattern = 'part-r-00*
runner=fn_api_runner.FnApiRunner(
          default_environment=beam_runner_api_pb2.Environment(
              urn=python_urns.SUBPROCESS_SDK,
              payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
                        % sys.executable.encode('ascii')))

p = beam.Pipeline(runner=runner, options=pipeline_options)

lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
           | beam.combiners.Count.Globally()
           | beam.io.WriteToText('/tmp/output'))

p.run()
--------------------------------------------------------------------------------------

This is caused by [this PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].

A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed that data is shuffled to multiple workers, however, there are some regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)