You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2020/03/17 19:40:20 UTC
[beam] branch master updated: [BEAM-7923] Include side effects in
p.run
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b458cfd [BEAM-7923] Include side effects in p.run
new df482df Merge pull request #11141 from KevinGG/BEAM-7923-fix
b458cfd is described below
commit b458cfdc65086b7476f0c949a1389dccf8a681f1
Author: KevinGG <ka...@gmail.com>
AuthorDate: Mon Mar 16 15:03:20 2020 -0700
[BEAM-7923] Include side effects in p.run
1. PCollections never used as inputs and not watched, such as sinks without being assigned
to variables will be pruned before `p.run()`. The change makes sure that
these side effect PCollections are now considered as extended targets
and will be executed on `p.run()`.
2. Note the change will not affect `show`, `head` and `collect` because
they have an additional pipeline fragment logic that already prunes
everything unrelated before the instrumenting and the prune logic inside
instrumenting.
---
.../runners/interactive/pipeline_instrument.py | 17 +++++++++++++++++
.../runners/interactive/pipeline_instrument_test.py | 12 ++++++++++++
2 files changed, 29 insertions(+)
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
index b39fea4..b88517c 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
@@ -403,6 +403,8 @@ class PipelineInstrument(object):
self._pipeline
"""
cacheable_inputs = set()
+ all_inputs = set()
+ all_outputs = set()
unbounded_source_pcolls = set()
class InstrumentVisitor(PipelineVisitor):
@@ -418,10 +420,16 @@ class PipelineInstrument(object):
tuple(ie.current_env().options.capturable_sources)):
unbounded_source_pcolls.update(transform_node.outputs.values())
cacheable_inputs.update(self._pin._cacheable_inputs(transform_node))
+ ins, outs = self._pin._all_inputs_outputs(transform_node)
+ all_inputs.update(ins)
+ all_outputs.update(outs)
v = InstrumentVisitor(self)
self._pipeline.visit(v)
+ # Every output PCollection that is never used as an input PCollection is
+ # considered as a side effect of the pipeline run and should be included.
+ self._extended_targets.update(all_outputs.difference(all_inputs))
# Add the unbounded source pcollections to the cacheable inputs. This allows
# for the caching of unbounded sources without a variable reference.
cacheable_inputs.update(unbounded_source_pcolls)
@@ -720,6 +728,15 @@ class PipelineInstrument(object):
inputs.add(in_pcoll)
return inputs
+ def _all_inputs_outputs(self, transform):
+ inputs = set()
+ outputs = set()
+ for in_pcoll in transform.inputs:
+ inputs.add(in_pcoll)
+ for _, out_pcoll in transform.outputs.items():
+ outputs.add(out_pcoll)
+ return inputs, outputs
+
def _cacheable_key(self, pcoll):
"""Gets the key a cacheable PCollection is tracked within the instrument."""
return cacheable_key(
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
index 8fa9724..f10e98d 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
@@ -798,6 +798,18 @@ class PipelineInstrumentTest(unittest.TestCase):
assert_pipeline_proto_contain_top_level_transform(
self, full_proto, 'Init Source')
+ def test_side_effect_pcoll_is_included(self):
+ pipeline_with_side_effect = beam.Pipeline(
+ interactive_runner.InteractiveRunner())
+ # Deliberately not assign the result to a variable to make it a
+ # "side effect" transform. Note we never watch anything from
+ # the pipeline defined locally either.
+ # pylint: disable=range-builtin-not-iterating,expression-not-assigned
+ pipeline_with_side_effect | 'Init Create' >> beam.Create(range(10))
+ pipeline_instrument = instr.build_pipeline_instrument(
+ pipeline_with_side_effect)
+ self.assertTrue(pipeline_instrument._extended_targets)
+
if __name__ == '__main__':
unittest.main()