You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2021/01/07 21:54:47 UTC
[beam] branch master updated: [BEAM-11504] Clean up direct runner
parallelism section.
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 36bf02c [BEAM-11504] Clean up direct runner parallelism section.
new d72b5d9 Merge pull request #13589 from ibzib/BEAM-11504
36bf02c is described below
commit 36bf02cfb2b37a894f6555c0038f06ba6a779e3a
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Mon Dec 21 13:12:10 2020 -0800
[BEAM-11504] Clean up direct runner parallelism section.
- Remove unparseable language tags.
- Add corresponding Java instructions.
- Remove obsolete (version < 2.19) Python instructions.
- Defer to programming guide for general instructions for setting pipeline options.
---
.../content/en/documentation/runners/direct.md | 112 +++++----------------
1 file changed, 25 insertions(+), 87 deletions(-)
diff --git a/website/www/site/content/en/documentation/runners/direct.md b/website/www/site/content/en/documentation/runners/direct.md
index 0168dcc..1249aa9 100644
--- a/website/www/site/content/en/documentation/runners/direct.md
+++ b/website/www/site/content/en/documentation/runners/direct.md
@@ -57,6 +57,8 @@ Here are some resources with information about how to test your pipelines.
## Pipeline options for the Direct Runner
+For general instructions on how to set pipeline options, see the [programming guide](/documentation/programming-guide/#configuring-pipeline-options).
+
When executing your pipeline from the command-line, set `runner` to `direct` or `DirectRunner`. The default values for the other pipeline options are generally sufficient.
See the reference documentation for the
@@ -74,105 +76,41 @@ Local execution is limited by the memory available in your local environment. It
If your pipeline uses an unbounded data source or sink, you must set the `streaming` option to `true`.
-{:.language-py}
-### Execution Mode
+### Parallel execution
-{:.language-py}
+{{< paragraph class="language-py" >}}
Python [FnApiRunner](https://beam.apache.org/contribute/runner-guide/#the-fn-api) supports multi-threading and multi-processing mode.
+{{< /paragraph >}}
+
+#### Setting parallelism
-{:.language-py}
-<strong>Setting parallelism</strong>
+{{< paragraph class="language-java" >}}
+The number of worker threads is defined by the `targetParallelism` pipeline option.
+By default, `targetParallelism` is the greater of the number of available processors and 3.
+{{< /paragraph >}}
-{:.language-py}
-Number of threads or subprocesses is defined by setting the `direct_num_workers` option.
+{{< paragraph class="language-py" >}}
+Number of threads or subprocesses is defined by setting the `direct_num_workers` pipeline option.
From 2.22.0, `direct_num_workers = 0` is supported. When `direct_num_workers` is set to 0, it will set the number of threads/subprocess to the number of cores of the machine where the pipeline is running.
+{{< /paragraph >}}
-{:.language-py}
-* There are several ways to set this option.
-```py
-python wordcount.py --input xx --output xx --direct_num_workers 2
-```
-
-{:.language-py}
-* Setting with `PipelineOptions`.
-```py
-from apache_beam.options.pipeline_options import PipelineOptions
-pipeline_options = PipelineOptions(['--direct_num_workers', '2'])
-```
-
-{:.language-py}
-* Adding to existing `PipelineOptions`.
-```py
-from apache_beam.options.pipeline_options import DirectOptions
-pipeline_options = PipelineOptions(xxx)
-pipeline_options.view_as(DirectOptions).direct_num_workers = 2
-```
-
-{:.language-py}
+{{< paragraph class="language-py" >}}
<strong>Setting running mode</strong>
+{{< /paragraph >}}
-{:.language-py}
-From 2.19, a new option was added to set running mode. We can use `direct_running_mode` option to set the running mode.
+{{< paragraph class="language-py" >}}
+In Beam 2.19.0 and newer, you can use the `direct_running_mode` pipeline option to set the running mode.
`direct_running_mode` can be one of [`'in_memory'`, `'multi_threading'`, `'multi_processing'`].
+{{< /paragraph >}}
-{:.language-py}
+{{< paragraph class="language-py" >}}
<b>in_memory</b>: Runner and workers' communication happens in memory (not through gRPC). This is a default mode.
+{{< /paragraph >}}
-{:.language-py}
+{{< paragraph class="language-py" >}}
<b>multi_threading</b>: Runner and workers communicate through gRPC and each worker runs in a thread.
+{{< /paragraph >}}
-{:.language-py}
+{{< paragraph class="language-py" >}}
<b>multi_processing</b>: Runner and workers communicate through gRPC and each worker runs in a subprocess.
-
-{:.language-py}
-Same as other options, `direct_running_mode` can be passed through CLI or set with `PipelineOptions`.
-
-{:.language-py}
-For the versions before 2.19.0, the running mode should be set with `FnApiRunner()`. Please refer following examples.
-
-{:.language-py}
-#### Running with multi-threading mode
-```py
-import argparse
-
-import apache_beam as beam
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.runners.portability import fn_api_runner
-from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.portability import python_urns
-
-parser = argparse.ArgumentParser()
-parser.add_argument(...)
-known_args, pipeline_args = parser.parse_known_args(argv)
-pipeline_options = PipelineOptions(pipeline_args)
-
-p = beam.Pipeline(options=pipeline_options,
- runner=fn_api_runner.FnApiRunner(
- default_environment=beam_runner_api_pb2.Environment(
- urn=python_urns.EMBEDDED_PYTHON_GRPC)))
-```
-
-{:.language-py}
-#### Running with multi-processing mode
-```py
-import argparse
-import sys
-
-import apache_beam as beam
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.runners.portability import fn_api_runner
-from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.portability import python_urns
-
-parser = argparse.ArgumentParser()
-parser.add_argument(...)
-known_args, pipeline_args = parser.parse_known_args(argv)
-pipeline_options = PipelineOptions(pipeline_args)
-
-p = beam.Pipeline(options=pipeline_options,
- runner=fn_api_runner.FnApiRunner(
- default_environment=beam_runner_api_pb2.Environment(
- urn=python_urns.SUBPROCESS_SDK,
- payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
- % sys.executable.encode('ascii'))))
-```
+{{< /paragraph >}}