You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2020/03/17 19:40:20 UTC

[beam] branch master updated: [BEAM-7923] Include side effects in p.run

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b458cfd  [BEAM-7923] Include side effects in p.run
     new df482df  Merge pull request #11141 from KevinGG/BEAM-7923-fix
b458cfd is described below

commit b458cfdc65086b7476f0c949a1389dccf8a681f1
Author: KevinGG <ka...@gmail.com>
AuthorDate: Mon Mar 16 15:03:20 2020 -0700

    [BEAM-7923] Include side effects in p.run
    
    1. PCollections never used as inputs and not watched, such as sinks without being assigned
    to variables will be pruned before `p.run()`. The change makes sure that
    these side effect PCollections are now considered as extended targets
    and will be executed on `p.run()`.
    2. Note the change will not affect `show`, `head` and `collect` because
    they have an additional pipeline fragment logic that already prunes
    everything unrelated before the instrumenting and the prune logic inside
    instrumenting.
---
 .../runners/interactive/pipeline_instrument.py          | 17 +++++++++++++++++
 .../runners/interactive/pipeline_instrument_test.py     | 12 ++++++++++++
 2 files changed, 29 insertions(+)

diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
index b39fea4..b88517c 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
@@ -403,6 +403,8 @@ class PipelineInstrument(object):
       self._pipeline
     """
     cacheable_inputs = set()
+    all_inputs = set()
+    all_outputs = set()
     unbounded_source_pcolls = set()
 
     class InstrumentVisitor(PipelineVisitor):
@@ -418,10 +420,16 @@ class PipelineInstrument(object):
                       tuple(ie.current_env().options.capturable_sources)):
           unbounded_source_pcolls.update(transform_node.outputs.values())
         cacheable_inputs.update(self._pin._cacheable_inputs(transform_node))
+        ins, outs = self._pin._all_inputs_outputs(transform_node)
+        all_inputs.update(ins)
+        all_outputs.update(outs)
 
     v = InstrumentVisitor(self)
     self._pipeline.visit(v)
 
+    # Every output PCollection that is never used as an input PCollection is
+    # considered as a side effect of the pipeline run and should be included.
+    self._extended_targets.update(all_outputs.difference(all_inputs))
     # Add the unbounded source pcollections to the cacheable inputs. This allows
     # for the caching of unbounded sources without a variable reference.
     cacheable_inputs.update(unbounded_source_pcolls)
@@ -720,6 +728,15 @@ class PipelineInstrument(object):
         inputs.add(in_pcoll)
     return inputs
 
+  def _all_inputs_outputs(self, transform):
+    inputs = set()
+    outputs = set()
+    for in_pcoll in transform.inputs:
+      inputs.add(in_pcoll)
+    for _, out_pcoll in transform.outputs.items():
+      outputs.add(out_pcoll)
+    return inputs, outputs
+
   def _cacheable_key(self, pcoll):
     """Gets the key a cacheable PCollection is tracked within the instrument."""
     return cacheable_key(
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
index 8fa9724..f10e98d 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
@@ -798,6 +798,18 @@ class PipelineInstrumentTest(unittest.TestCase):
     assert_pipeline_proto_contain_top_level_transform(
         self, full_proto, 'Init Source')
 
+  def test_side_effect_pcoll_is_included(self):
+    pipeline_with_side_effect = beam.Pipeline(
+        interactive_runner.InteractiveRunner())
+    # Deliberately not assign the result to a variable to make it a
+    # "side effect" transform. Note we never watch anything from
+    # the pipeline defined locally either.
+    # pylint: disable=range-builtin-not-iterating,expression-not-assigned
+    pipeline_with_side_effect | 'Init Create' >> beam.Create(range(10))
+    pipeline_instrument = instr.build_pipeline_instrument(
+        pipeline_with_side_effect)
+    self.assertTrue(pipeline_instrument._extended_targets)
+
 
 if __name__ == '__main__':
   unittest.main()