You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/12/15 01:52:36 UTC

[beam] branch master updated: [BEAM-10641] Add Combiner Packing to Dataflow (#13455)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f23ebad  [BEAM-10641] Add Combiner Packing to Dataflow (#13455)
f23ebad is described below

commit f23ebadc58e6a2d7c4499285b85c3468dff70fef
Author: Yifan Mai <yi...@google.com>
AuthorDate: Mon Dec 14 17:51:47 2020 -0800

    [BEAM-10641] Add Combiner Packing to Dataflow (#13455)
    
    * [BEAM-10641] Add combiner packing to graph optimizer phases
    
    * Changelog
    
    * Fix import
    
    * Use-before define and lint issue.
    
    * Fix import
    
    Co-authored-by: Robert Bradshaw <ro...@gmail.com>
---
 CHANGES.md                                               |  1 +
 .../apache_beam/runners/dataflow/dataflow_runner.py      | 16 ++++++++++++++++
 2 files changed, 17 insertions(+)

diff --git a/CHANGES.md b/CHANGES.md
index b0933ad..d5f9fdd 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -65,6 +65,7 @@
 * Publishing Java 11 SDK container images now supported as part of Apache Beam release process. ([BEAM-8106](https://issues.apache.org/jira/browse/BEAM-8106))
 * Added Cloud Bigtable Provider extension to Beam SQL ([BEAM-11173](https://issues.apache.org/jira/browse/BEAM-11173), [BEAM-11373](https://issues.apache.org/jira/browse/BEAM-11373))
 * Added a schema provider for thrift data ([BEAM-11338](https://issues.apache.org/jira/browse/BEAM-11338))
+* Added combiner packing pipeline optimization to Dataflow runner. ([BEAM-10641](https://issues.apache.org/jira/browse/BEAM-10641))
 
 ## Breaking Changes
 
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 92d9c19..7f4e536 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -537,6 +537,22 @@ class DataflowRunner(PipelineRunner):
     self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
         return_context=True, default_environment=self._default_environment)
 
+    # Optimize the pipeline if it not streaming and the
+    # disable_optimize_pipeline_for_dataflow experiment has not been set.
+    if (not options.view_as(StandardOptions).streaming and
+        not options.view_as(DebugOptions).lookup_experiment(
+            "disable_optimize_pipeline_for_dataflow")):
+      from apache_beam.runners.portability.fn_api_runner import translations
+      self.proto_pipeline = translations.optimize_pipeline(
+          self.proto_pipeline,
+          phases=[
+              translations.eliminate_common_key_with_none,
+              translations.pack_combiners,
+              translations.sort_stages,
+          ],
+          known_runner_urns=frozenset(),
+          partial=True)
+
     if use_fnapi:
       self._check_for_unsupported_fnapi_features(self.proto_pipeline)