You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2021/01/07 21:54:47 UTC

[beam] branch master updated: [BEAM-11504] Clean up direct runner parallelism section.

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 36bf02c  [BEAM-11504] Clean up direct runner parallelism section.
     new d72b5d9  Merge pull request #13589 from ibzib/BEAM-11504
36bf02c is described below

commit 36bf02cfb2b37a894f6555c0038f06ba6a779e3a
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Mon Dec 21 13:12:10 2020 -0800

    [BEAM-11504] Clean up direct runner parallelism section.
    
    - Remove unparseable language tags.
    
    - Add corresponding Java instructions.
    
    - Remove obsolete (version < 2.19) Python instructions.
    
    - Defer to programming guide for general instructions for setting pipeline options.
---
 .../content/en/documentation/runners/direct.md     | 112 +++++----------------
 1 file changed, 25 insertions(+), 87 deletions(-)

diff --git a/website/www/site/content/en/documentation/runners/direct.md b/website/www/site/content/en/documentation/runners/direct.md
index 0168dcc..1249aa9 100644
--- a/website/www/site/content/en/documentation/runners/direct.md
+++ b/website/www/site/content/en/documentation/runners/direct.md
@@ -57,6 +57,8 @@ Here are some resources with information about how to test your pipelines.
 
 ## Pipeline options for the Direct Runner
 
+For general instructions on how to set pipeline options, see the [programming guide](/documentation/programming-guide/#configuring-pipeline-options).
+
 When executing your pipeline from the command-line, set `runner` to `direct` or `DirectRunner`. The default values for the other pipeline options are generally sufficient.
 
 See the reference documentation for the
@@ -74,105 +76,41 @@ Local execution is limited by the memory available in your local environment. It
 
 If your pipeline uses an unbounded data source or sink, you must set the `streaming` option to `true`.
 
-{:.language-py}
-### Execution Mode
+### Parallel execution
 
-{:.language-py}
+{{< paragraph class="language-py" >}}
 Python [FnApiRunner](https://beam.apache.org/contribute/runner-guide/#the-fn-api) supports multi-threading and multi-processing mode.
+{{< /paragraph >}}
+
+#### Setting parallelism
 
-{:.language-py}
-<strong>Setting parallelism</strong>
+{{< paragraph class="language-java" >}}
+The number of worker threads is defined by the `targetParallelism` pipeline option.
+By default, `targetParallelism` is the greater of the number of available processors and 3.
+{{< /paragraph >}}
 
-{:.language-py}
-Number of threads or subprocesses is defined by setting the `direct_num_workers` option.
+{{< paragraph class="language-py" >}}
+Number of threads or subprocesses is defined by setting the `direct_num_workers` pipeline option.
 From 2.22.0, `direct_num_workers = 0` is supported. When `direct_num_workers` is set to 0, it will set the number of threads/subprocess to the number of cores of the machine where the pipeline is running.
+{{< /paragraph >}}
 
-{:.language-py}
-* There are several ways to set this option.
-```py
-python wordcount.py --input xx --output xx --direct_num_workers 2
-```
-
-{:.language-py}
-* Setting with `PipelineOptions`.
-```py
-from apache_beam.options.pipeline_options import PipelineOptions
-pipeline_options = PipelineOptions(['--direct_num_workers', '2'])
-```
-
-{:.language-py}
-* Adding to existing `PipelineOptions`.
-```py
-from apache_beam.options.pipeline_options import DirectOptions
-pipeline_options = PipelineOptions(xxx)
-pipeline_options.view_as(DirectOptions).direct_num_workers = 2
-```
-
-{:.language-py}
+{{< paragraph class="language-py" >}}
 <strong>Setting running mode</strong>
+{{< /paragraph >}}
 
-{:.language-py}
-From 2.19, a new option was added to set running mode. We can use `direct_running_mode` option to set the running mode.
+{{< paragraph class="language-py" >}}
+In Beam 2.19.0 and newer, you can use the `direct_running_mode` pipeline option to set the running mode.
 `direct_running_mode` can be one of [`'in_memory'`, `'multi_threading'`, `'multi_processing'`].
+{{< /paragraph >}}
 
-{:.language-py}
+{{< paragraph class="language-py" >}}
 <b>in_memory</b>: Runner and workers' communication happens in memory (not through gRPC). This is a default mode.
+{{< /paragraph >}}
 
-{:.language-py}
+{{< paragraph class="language-py" >}}
 <b>multi_threading</b>: Runner and workers communicate through gRPC and each worker runs in a thread.
+{{< /paragraph >}}
 
-{:.language-py}
+{{< paragraph class="language-py" >}}
 <b>multi_processing</b>: Runner and workers communicate through gRPC and each worker runs in a subprocess.
-
-{:.language-py}
-Same as other options, `direct_running_mode` can be passed through CLI or set with `PipelineOptions`.
-
-{:.language-py}
-For the versions before 2.19.0, the running mode should be set with `FnApiRunner()`. Please refer following examples.
-
-{:.language-py}
-#### Running with multi-threading mode
-```py
-import argparse
-
-import apache_beam as beam
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.runners.portability import fn_api_runner
-from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.portability import python_urns
-
-parser = argparse.ArgumentParser()
-parser.add_argument(...)
-known_args, pipeline_args = parser.parse_known_args(argv)
-pipeline_options = PipelineOptions(pipeline_args)
-
-p = beam.Pipeline(options=pipeline_options,
-      runner=fn_api_runner.FnApiRunner(
-          default_environment=beam_runner_api_pb2.Environment(
-          urn=python_urns.EMBEDDED_PYTHON_GRPC)))
-```
-
-{:.language-py}
-#### Running with multi-processing mode
-```py
-import argparse
-import sys
-
-import apache_beam as beam
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.runners.portability import fn_api_runner
-from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.portability import python_urns
-
-parser = argparse.ArgumentParser()
-parser.add_argument(...)
-known_args, pipeline_args = parser.parse_known_args(argv)
-pipeline_options = PipelineOptions(pipeline_args)
-
-p = beam.Pipeline(options=pipeline_options,
-      runner=fn_api_runner.FnApiRunner(
-          default_environment=beam_runner_api_pb2.Environment(
-              urn=python_urns.SUBPROCESS_SDK,
-              payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
-                        % sys.executable.encode('ascii'))))
-```
+{{< /paragraph >}}