You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Hannah Jiang (Jira)" <ji...@apache.org> on 2019/09/03 22:37:00 UTC
[jira] [Comment Edited] (BEAM-3645) Support multi-process execution
on the FnApiRunner
[ https://issues.apache.org/jira/browse/BEAM-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16896575#comment-16896575 ]
Hannah Jiang edited comment on BEAM-3645 at 9/3/19 10:36 PM:
-------------------------------------------------------------
Direct runner can now process map tasks across multiple workers. Depending on running environment, these workers are running in multithreading or multiprocessing mode.
_*It is supported from Beam 2.15.*_
*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
urn=python_urns.SUBPROCESS_SDK,
payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
sys.executable.encode('ascii'))))
{code}
*Run with multithreading mode:*
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
urn=python_urns.EMBEDDED_PYTHON_GRPC,
payload=b'1'))) # payload is # of threads of each worker.{code}
*--direct_num_workers* option is used to control parallelism. Default value is 1.
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2
# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])
# an example to add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}
was (Author: hannahjiang):
Direct runner can now process map tasks across multiple workers. Depending on running environment, these workers are running in multithreading or multiprocessing mode.
_*It is supported from Beam 2.15.*_
*Run with multiprocessing mode*:
{code:java}
# using subprocess runner
p = beam.Pipeline(options=pipeline_options,
runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
urn=python_urns.SUBPROCESS_SDK,
payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' %
sys.executable.encode('ascii'))))
{code}
*Run with multithreading mode:*
{code:java}
# using in memory embedded runner
p = beam.Pipeline(options=pipeline_options)
{code}
{code:java}
# using embedded grpc runner
p = beam.Pipeline(options=pipeline_options,
runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
urn=python_urns.EMBEDDED_PYTHON_GRPC,
payload=b'1'))) # payload is # of threads of each worker.{code}
*--direct_num_workers* option is used to control parallelism. Default value is 1.
{code:java}
# an example to pass it from CLI.
python wordcount.py --input xx --output xx --direct_num_workers 2
# an example to set it with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])
# an example to add it to existing pipeline options.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = xxx
pipeline_options.view_as(DirectOptions).direct_num_workers = 2{code}
> Support multi-process execution on the FnApiRunner
> --------------------------------------------------
>
> Key: BEAM-3645
> URL: https://issues.apache.org/jira/browse/BEAM-3645
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Affects Versions: 2.2.0, 2.3.0
> Reporter: Charles Chen
> Assignee: Hannah Jiang
> Priority: Major
> Fix For: 2.15.0
>
> Time Spent: 35h 20m
> Remaining Estimate: 0h
>
> https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance gain over the previous DirectRunner. We can do even better in multi-core environments by supporting multi-process execution in the FnApiRunner, to scale past Python GIL limitations.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)