You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Vilhelm von Ehrenheim (JIRA)" <ji...@apache.org> on 2017/04/18 14:36:41 UTC

[jira] [Created] (BEAM-1996) Error about mixing pipelines in nosetests

Vilhelm von Ehrenheim created BEAM-1996:
-------------------------------------------

             Summary: Error about mixing pipelines in nosetests
                 Key: BEAM-1996
                 URL: https://issues.apache.org/jira/browse/BEAM-1996
             Project: Beam
          Issue Type: Bug
          Components: runner-direct
            Reporter: Vilhelm von Ehrenheim
            Assignee: Thomas Groh
            Priority: Minor


When testing a PTranform (defined using @ptransform_fn) that merges several PCollections from different sources the following error is raised:
{noformat}ValueError: Mixing value from different pipelines not allowed.{noformat}
Actually running the same pipeline in GCP using the `DataflowRunner` does not give any error. Neither does running the test file manually instead of through nose. 

Here is an example:
{code:none|title=utils.py}
# Defined in module `utils`
@ptransform_fn
def Join(pcolls, by):
    return pcolls | beam.CoGroupByKey()
{code}

{code:none|title=test_utils.py}
class UtilsTest(unittest.TestCase):
    def test_join(self):
        p = TestPipeline(runner="DirectRunner")

        p1 = (p
             | "Create p1" >> beam.Create([
                 {'a': 1, 'b': 11},
                 {'a': 2, 'b': 22},
                 {'a': 3, 'b': 33}]))
        p2 = (p
             | "Create p2" >> beam.Create([
                 {'a': 1, 'c': 111},
                 {'a': 1, 'c': 112},
                 {'a': 3, 'c': 333}]))

        res = ((p1, p2) | "LeftJoin" >> utils.Join(by='a'))

        beam.assert_that(res, beam.equal_to([
            {'a': 1, 'b': 11, 'c': 111},
            {'a': 1, 'b': 11, 'c': 112},
            {'a': 2, 'b': 22},
            {'a': 3, 'b': 33, 'c': 333}]))

        # Run test pipeline
        p.run()
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)