You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/12/15 01:52:36 UTC
[beam] branch master updated: [BEAM-10641] Add Combiner Packing to
Dataflow (#13455)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f23ebad [BEAM-10641] Add Combiner Packing to Dataflow (#13455)
f23ebad is described below
commit f23ebadc58e6a2d7c4499285b85c3468dff70fef
Author: Yifan Mai <yi...@google.com>
AuthorDate: Mon Dec 14 17:51:47 2020 -0800
[BEAM-10641] Add Combiner Packing to Dataflow (#13455)
* [BEAM-10641] Add combiner packing to graph optimizer phases
* Changelog
* Fix import
* Use-before define and lint issue.
* Fix import
Co-authored-by: Robert Bradshaw <ro...@gmail.com>
---
CHANGES.md | 1 +
.../apache_beam/runners/dataflow/dataflow_runner.py | 16 ++++++++++++++++
2 files changed, 17 insertions(+)
diff --git a/CHANGES.md b/CHANGES.md
index b0933ad..d5f9fdd 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -65,6 +65,7 @@
* Publishing Java 11 SDK container images now supported as part of Apache Beam release process. ([BEAM-8106](https://issues.apache.org/jira/browse/BEAM-8106))
* Added Cloud Bigtable Provider extension to Beam SQL ([BEAM-11173](https://issues.apache.org/jira/browse/BEAM-11173), [BEAM-11373](https://issues.apache.org/jira/browse/BEAM-11373))
* Added a schema provider for thrift data ([BEAM-11338](https://issues.apache.org/jira/browse/BEAM-11338))
+* Added combiner packing pipeline optimization to Dataflow runner. ([BEAM-10641](https://issues.apache.org/jira/browse/BEAM-10641))
## Breaking Changes
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 92d9c19..7f4e536 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -537,6 +537,22 @@ class DataflowRunner(PipelineRunner):
self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
return_context=True, default_environment=self._default_environment)
+ # Optimize the pipeline if it not streaming and the
+ # disable_optimize_pipeline_for_dataflow experiment has not been set.
+ if (not options.view_as(StandardOptions).streaming and
+ not options.view_as(DebugOptions).lookup_experiment(
+ "disable_optimize_pipeline_for_dataflow")):
+ from apache_beam.runners.portability.fn_api_runner import translations
+ self.proto_pipeline = translations.optimize_pipeline(
+ self.proto_pipeline,
+ phases=[
+ translations.eliminate_common_key_with_none,
+ translations.pack_combiners,
+ translations.sort_stages,
+ ],
+ known_runner_urns=frozenset(),
+ partial=True)
+
if use_fnapi:
self._check_for_unsupported_fnapi_features(self.proto_pipeline)