You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2020/03/16 22:29:36 UTC

[GitHub] [beam] KevinGG opened a new pull request #11141: [BEAM-7923] Include side effects in p.run

KevinGG opened a new pull request #11141: [BEAM-7923] Include side effects in p.run
URL: https://github.com/apache/beam/pull/11141
 
 
   1. PCollections never used as inputs and not watched, such as sinks without being assigned
   to variables will be pruned before `p.run()`. The change makes sure that
   these side effect PCollections are now considered as extended targets
   and will be executed on `p.run()`.
   2. Note the change will not affect `show`, `head` and `collect` because
   they have an additional pipeline fragment logic that already prunes
   everything unrelated before the instrumenting and the prune logic inside
   instrumenting.
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aaltay merged pull request #11141: [BEAM-7923] Include side effects in p.run

Posted by GitBox <gi...@apache.org>.
aaltay merged pull request #11141: [BEAM-7923] Include side effects in p.run
URL: https://github.com/apache/beam/pull/11141
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] KevinGG commented on issue #11141: [BEAM-7923] Include side effects in p.run

Posted by GitBox <gi...@apache.org>.
KevinGG commented on issue #11141: [BEAM-7923] Include side effects in p.run
URL: https://github.com/apache/beam/pull/11141#issuecomment-599788607
 
 
   Formatted with yapf.
   Lint passed locally.
   
   R: @aaltay 
   R: @davidyan74 
   R: @rohdesamuel 
   
   PTAL, thx!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aaltay commented on a change in pull request #11141: [BEAM-7923] Include side effects in p.run

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #11141: [BEAM-7923] Include side effects in p.run
URL: https://github.com/apache/beam/pull/11141#discussion_r393351076
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
 ##########
 @@ -418,10 +420,16 @@ def visit_transform(self, transform_node):
                       tuple(ie.current_env().options.capturable_sources)):
           unbounded_source_pcolls.update(transform_node.outputs.values())
         cacheable_inputs.update(self._pin._cacheable_inputs(transform_node))
+        ins, outs = self._pin._all_inputs_outputs(transform_node)
+        all_inputs.update(ins)
+        all_outputs.update(outs)
 
     v = InstrumentVisitor(self)
     self._pipeline.visit(v)
 
+    # Every output PCollection that is never used as an input PCollection is
+    # considered as a side effect of the pipeline run and should be included.
+    self._extended_targets.update(all_outputs.difference(all_inputs))
 
 Review comment:
   Do you want to track, mark side effects differently? Does users want to specifically track these pcollections?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aaltay commented on issue #11141: [BEAM-7923] Include side effects in p.run

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11141: [BEAM-7923] Include side effects in p.run
URL: https://github.com/apache/beam/pull/11141#issuecomment-600261516
 
 
   Merged this, without noticing that test did not run. Fooled by githubs "all green check signs". Please watch the tests, especially the cron ones and see if anything is failing.
   
   Or better, create an empty PR to run the tests.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] KevinGG commented on issue #11141: [BEAM-7923] Include side effects in p.run

Posted by GitBox <gi...@apache.org>.
KevinGG commented on issue #11141: [BEAM-7923] Include side effects in p.run
URL: https://github.com/apache/beam/pull/11141#issuecomment-600260121
 
 
   Rebased to resolve merge conflicts and force pushed!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] KevinGG commented on a change in pull request #11141: [BEAM-7923] Include side effects in p.run

Posted by GitBox <gi...@apache.org>.
KevinGG commented on a change in pull request #11141: [BEAM-7923] Include side effects in p.run
URL: https://github.com/apache/beam/pull/11141#discussion_r393352768
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
 ##########
 @@ -418,10 +420,16 @@ def visit_transform(self, transform_node):
                       tuple(ie.current_env().options.capturable_sources)):
           unbounded_source_pcolls.update(transform_node.outputs.values())
         cacheable_inputs.update(self._pin._cacheable_inputs(transform_node))
+        ins, outs = self._pin._all_inputs_outputs(transform_node)
+        all_inputs.update(ins)
+        all_outputs.update(outs)
 
     v = InstrumentVisitor(self)
     self._pipeline.visit(v)
 
+    # Every output PCollection that is never used as an input PCollection is
+    # considered as a side effect of the pipeline run and should be included.
+    self._extended_targets.update(all_outputs.difference(all_inputs))
 
 Review comment:
   It's not necessary. The intended behavior is not ambiguous: When the user uses `show`, `head`, `collect` APIs, these PCollections are excluded completely as the user explicitly wishes. And when the user invokes `p.run()`, all transforms in the pipeline should be executed as expected.
   
   This change is only to make sure that the prune logic doesn't affect the above intended behavior.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aaltay commented on issue #11141: [BEAM-7923] Include side effects in p.run

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11141: [BEAM-7923] Include side effects in p.run
URL: https://github.com/apache/beam/pull/11141#issuecomment-599839271
 
 
   Could you resolve the conflict?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] KevinGG commented on a change in pull request #11141: [BEAM-7923] Include side effects in p.run

Posted by GitBox <gi...@apache.org>.
KevinGG commented on a change in pull request #11141: [BEAM-7923] Include side effects in p.run
URL: https://github.com/apache/beam/pull/11141#discussion_r393347005
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/utils_test.py
 ##########
 @@ -56,7 +56,7 @@ def test_parse_windowedvalue_with_window_info(self):
         [['a', 2, int(1e6), els[0].windows, els[0].pane_info],
          ['b', 3, int(1e6), els[1].windows, els[1].pane_info]],
         columns=[0, 1, 'event_time', 'windows', 'pane_info'])
-    pd.testing.assert_frame_equal(actual_df, expected_df)
+    pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
 
 Review comment:
   This test was flaky because the dataframe columns can be built in arbitrary orders. This option makes sure it doesn't take column positioning into consideration since we only care about the equivalence of data.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services