You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/02/04 21:31:02 UTC

[GitHub] [beam] yifanmai commented on a change in pull request #13884: [BEAM-11715] Partial revert of "Combiner packing in Dataflow" (#13763)

yifanmai commented on a change in pull request #13884:
URL: https://github.com/apache/beam/pull/13884#discussion_r570556744



##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -552,6 +511,38 @@ def run_pipeline(self, pipeline, options):
     self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
         return_context=True, default_environment=self._default_environment)
 
+    # Optimize the pipeline if it not streaming and the pre_optimize
+    # experiment is set.
+    pre_optimize = options.view_as(DebugOptions).lookup_experiment(
+        'pre_optimize', 'default').lower()
+    from apache_beam.runners.portability.fn_api_runner import translations
+    if (options.view_as(StandardOptions).streaming or pre_optimize == 'none' or
+        pre_optimize == 'default'):
+      phases = []
+    elif pre_optimize == 'all':
+      phases = [
+          translations.eliminate_common_key_with_none,
+          # TODO(BEAM-11694): Enable translations.pack_combiners
+          # translations.pack_combiners,
+          translations.sort_stages
+      ]
+    else:
+      phases = []
+      for phase_name in pre_optimize.split(','):
+        # For now, these are all we allow.
+        if phase_name in ('eliminate_common_key_with_none', 'pack_combiners'):

Review comment:
       Actually it's always enabled in `all`, and here we allow it to be enabled by specifying a comma-delimited list of phases. i.e. it's one of the allowed phases. See [previous discussion](https://github.com/apache/beam/pull/13763#discussion_r565530996) in the PR to be reverted.

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -552,6 +511,38 @@ def run_pipeline(self, pipeline, options):
     self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
         return_context=True, default_environment=self._default_environment)
 
+    # Optimize the pipeline if it not streaming and the pre_optimize
+    # experiment is set.
+    pre_optimize = options.view_as(DebugOptions).lookup_experiment(
+        'pre_optimize', 'default').lower()
+    from apache_beam.runners.portability.fn_api_runner import translations
+    if (options.view_as(StandardOptions).streaming or pre_optimize == 'none' or
+        pre_optimize == 'default'):
+      phases = []
+    elif pre_optimize == 'all':
+      phases = [
+          translations.eliminate_common_key_with_none,
+          # TODO(BEAM-11694): Enable translations.pack_combiners
+          # translations.pack_combiners,
+          translations.sort_stages
+      ]
+    else:
+      phases = []
+      for phase_name in pre_optimize.split(','):
+        # For now, these are all we allow.
+        if phase_name in ('eliminate_common_key_with_none', 'pack_combiners'):
+          phases.append(getattr(translations, phase_name))
+        else:
+          raise ValueError(
+              'Unknown or inapplicable phase for pre_optimize: %s' % phase_name)
+      phases.append(translations.sort_stages)
+
+    self.proto_pipeline = translations.optimize_pipeline(

Review comment:
       Acknowledged. This PR removes `from_runner_api` from DataflowRunner so this shouldn't be an issue.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org