You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/01/05 23:32:00 UTC

[jira] [Work logged] (BEAM-12984) InteractiveRunner cannot collect PCollections from composites

     [ https://issues.apache.org/jira/browse/BEAM-12984?focusedWorklogId=704326&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-704326 ]

ASF GitHub Bot logged work on BEAM-12984:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Jan/22 23:31
            Start Date: 05/Jan/22 23:31
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on a change in pull request #15623:
URL: https://github.com/apache/beam/pull/15623#discussion_r779205403



##########
File path: sdks/python/apache_beam/runners/interactive/pipeline_fragment_test.py
##########
@@ -129,6 +129,41 @@ def test_fragment_does_not_prune_teststream(self):
     # resulting graph is invalid and the following call will raise an exception.
     fragment.to_runner_api()
 
+  @patch('IPython.get_ipython', new_callable=mock_get_ipython)
+  def test_pipeline_composites(self, cell):
+    """Tests that composites are supported.
+    """
+    with cell:  # Cell 1
+      p = beam.Pipeline(ir.InteractiveRunner())
+      ib.watch({'p': p})
+
+    with cell:  # Cell 2
+      # pylint: disable=range-builtin-not-iterating
+      init = p | 'Init' >> beam.Create(range(5))
+
+    with cell:  # Cell 3
+      # Have a composite within a composite to test that all transforms under a
+      # composite are added.
+
+      @beam.ptransform_fn
+      def Bar(pcoll):
+        return pcoll | beam.Map(lambda n: n)
+
+      @beam.ptransform_fn
+      def Foo(pcoll):
+        p1 = pcoll | beam.Map(lambda n: n)
+        p2 = pcoll | beam.Map(str)
+        bar = pcoll | Bar()
+        return {'pc1': p1, 'pc2': p2, 'bar': bar}
+
+      res = init | Foo()
+
+    ib.watch(locals())
+    pc = res['pc1']
+
+    result = pf.PipelineFragment([pc]).run()
+    self.assertEqual([0, 1, 2, 3, 4], list(result.get(pc)))

Review comment:
       I'm not sure this is working as intended. I tried changing this test like so:
   
   ```py
         @beam.ptransform_fn
         def Bar(pcoll):
           return pcoll | beam.Map(lambda n: 2*n)
   
         @beam.ptransform_fn
         def Foo(pcoll):
           p1 = pcoll | beam.Map(lambda n: 3*n)
           p2 = pcoll | beam.Map(str)
           bar = p1 | Bar()
           return {'pc1': p1, 'pc2': p2, 'bar': bar}
   
         res = init | Foo()
         #pc = init | beam.Map(lambda n: 6*n)
   
       ib.watch(locals())
       pc = res['bar']
   
       result = pf.PipelineFragment([pc]).run()
       self.assertEqual([0, 6, 12, 18, 24], list(result.get(pc)))
   ```
   
   (note the 2x, 3x, to make sure pcollection values are different)
   
   and the test fails the assertion
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 704326)
    Time Spent: 1h 50m  (was: 1h 40m)

> InteractiveRunner cannot collect PCollections from composites 
> --------------------------------------------------------------
>
>                 Key: BEAM-12984
>                 URL: https://issues.apache.org/jira/browse/BEAM-12984
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-py-interactive
>            Reporter: Sam Rohde
>            Assignee: Sam Rohde
>            Priority: P2
>             Fix For: 2.34.0
>
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> The following code will complain and throw an exception that there is no producer for some PCollection. 
> ```
> import apache_beam as beam
> import apache_beam.runners.interactive.interactive_beam as ib
> import apache_beam.runners.interactive.interactive_runner as ir
>  
> @beam.ptransform_fn
> def Foo(pcoll):
>   p1 = pcoll | 'ident' >> beam.Map(lambda n: n)
>   p2 = pcoll | 'to str' >> beam.Map(str)
>   return {'pc1': p1, 'pc2': p2}
>  
> p = beam.Pipeline(ir.InteractiveRunner())
> res = p | 'my create' >> beam.Create([1]) | 'my foo' >> Foo()
> ib.collect(res['pc1'])
> ```



--
This message was sent by Atlassian Jira
(v8.20.1#820001)