You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Vilhelm von Ehrenheim (JIRA)" <ji...@apache.org> on 2017/04/18 15:34:41 UTC

[jira] [Commented] (BEAM-1996) Error about mixing pipelines in nosetests

    [ https://issues.apache.org/jira/browse/BEAM-1996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15972925#comment-15972925 ] 

Vilhelm von Ehrenheim commented on BEAM-1996:
---------------------------------------------

Seems to be related to the extra parameter sent into the function. If defined as a proper class inheriting from PTransform w a custom __init__ taking care of the extra argument it works fine locally as well. Still a bit misleading to get this error... 

> Error about mixing pipelines in nosetests
> -----------------------------------------
>
>                 Key: BEAM-1996
>                 URL: https://issues.apache.org/jira/browse/BEAM-1996
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>            Reporter: Vilhelm von Ehrenheim
>            Assignee: Thomas Groh
>            Priority: Minor
>
> When testing a PTranform (defined using @ptransform_fn) that merges several PCollections from different sources the following error is raised:
> {noformat}ValueError: Mixing value from different pipelines not allowed.{noformat}
> Actually running the same pipeline in GCP using the `DataflowRunner` does not give any error. Neither does running the test file manually instead of through nose. 
> Here is an example:
> {code:none|title=utils.py}
> # Defined in module `utils`
> @ptransform_fn
> def Join(pcolls, by):
>     return pcolls | beam.CoGroupByKey()
> {code}
> {code:none|title=test_utils.py}
> class UtilsTest(unittest.TestCase):
>     def test_join(self):
>         p = TestPipeline(runner="DirectRunner")
>         p1 = (p
>              | "Create p1" >> beam.Create([
>                  {'a': 1, 'b': 11},
>                  {'a': 2, 'b': 22},
>                  {'a': 3, 'b': 33}]))
>         p2 = (p
>              | "Create p2" >> beam.Create([
>                  {'a': 1, 'c': 111},
>                  {'a': 1, 'c': 112},
>                  {'a': 3, 'c': 333}]))
>         res = ((p1, p2) | "LeftJoin" >> utils.Join(by='a'))
>         beam.assert_that(res, beam.equal_to([
>             {'a': 1, 'b': 11, 'c': 111},
>             {'a': 1, 'b': 11, 'c': 112},
>             {'a': 2, 'b': 22},
>             {'a': 3, 'b': 33, 'c': 333}]))
>         # Run test pipeline
>         p.run()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)