You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sofia’s World <mm...@gmail.com> on 2020/07/13 21:35:46 UTC

Testing Apache Beam pipelines / python SDK

Hi all
 i was wondering if anyone could provide pointers on how  to test a
pipeline in python.
I have the following pipeline

lines = (p
         | 'Get List of Tickers' >> beam.Map(get_tickers)
         | 'Split fields'  >> beam.Map(split_fields)
         | 'Map to String' >> beam.Map(add_year)
         )
result = p.run()

Now i can easily test each individual function for each step
(get_tickers, split_fields, add_year)

but is there a way to test the pipeline 'as a whole' ?#

Could anyone point me to some examples?

kind regards

Re: Testing Apache Beam pipelines / python SDK

Posted by Robert Bradshaw <ro...@google.com>.
If you don't want to write to an actual file, the example with the Check
transform should allow you to use Check(...) as you would a sink. (I
realize this should have been

  run_my_pipeline(
      beam.Create([...]),
      "Check1" >> Check(equal_to([...])),
      "Check2" >> Check(any_callable_that_validates_result2))

to give distinct labels.

As for bigquery, yes, these are integration tests that write to real
bigquery. You can alternatively check to see the PCollection has the right
things you wanted to write.


On Tue, Jul 21, 2020 at 1:32 PM Sofia’s World <mm...@gmail.com> wrote:

> Hello Robert
>  could you point me to a test sample where a 'mock' sink is used?
> do you guys have a testing package , which provide an in memory sink where
> for example i can dump the result of
> my pipeline (as opposed to writing to a file) ?
> Additionally, what is the best way to test writing to BigQuery?
> I have seen this file
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py
> but it appears it writes to real big query?
>
> kind regards
>  Marco
>
>
>
>
>
>
>
> On Fri, Jul 17, 2020 at 11:05 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> If want a full end-to-end integration test of your pipeline, what you can
>> do is:
>>
>> 1) Write your input data to temporary files.
>> 2) Run your pipeline, which writes its output somewhere (ideally a
>> temp location as well).
>> 3) Open up the outputs and see if it was as expected.
>>
>> This is similar to the test at
>>
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_test.py
>> , but a bit heavy weight.
>>
>> Another way to validate your pipeline is to refactor the code so the
>> inputs and outputs are pluggable. For example, you could write
>>
>> def run_my_pipeline(input):
>>    [all your pipeline logic goes here]
>>    [this could also be wrapped up as a PTransform]
>>    return result1, result2
>>
>> def main(...):
>>   with beam.Pipeline(...) as p:
>>     input = p | beam.io.ReadFromText(...)
>>     result1, result2 = run_my_pipeline(input)
>>     result1 | beam.io.WriteToSomewhere(...)
>>     result2 | beam.io.WriteToSomewhereElse(...)
>>
>> def test():
>>   with beam.Pipeline(...) as p:
>>     input = p | beam.Create(...)
>>     result1, result2 = run_my_pipeline(input)
>>     assert_that(result1, equal_to([...]))
>>     assert_that(result2, any_callable_that_validates_result2,
>> label="Check2")
>>
>> You could also parameterize things on your sinks and sources, e.g.
>>
>> def run_my_pipeline(source, sink1, sink2):
>>    with beam.Pipeline(...) as p:
>>      input = p | source
>>      ...
>>      result1 | sink1
>>      result2 | sinkn2
>>
>> def main(...):
>>   run_my_pipeline(
>>       beam.io.ReadFromText(...),
>>       beam.io.WriteToSomewhere(...),
>>       beam.io.WriteToSomewhereElse(...))
>>
>> def test():
>>
>>   class Check(beam.PTransform):
>>     def __init__(checker):
>>       self._checker = checker
>>     def expand(pcoll):
>>       assert_that(pcoll, self._checker)
>>
>>   run_my_pipeline(
>>       beam.Create([...]),
>>       Check1(equal_to([...])),
>>       Check2(any_callable_that_validates_result2))
>>
>> or various permutations thereof.
>>
>> Is that more what you're looking for?
>>
>>
>>
>> On Fri, Jul 17, 2020 at 2:46 PM Sofia’s World <mm...@gmail.com>
>> wrote:
>> >
>> > Hello Robert
>> >  thanks but i think i am either missing the point or not expressing
>> clearly my goal.
>> > I had a look at the util_test.py, and i see that in those tests
>> pipelines are being created as part of tests., and  in these tests what are
>> being tested are beam functions - eg beam.Map  etc.
>> > I am after testing a pipeline as a whole. Taking this example,
>> >
>> > p = beam.Pipeline(options=pipeline_options)
>> > lines = (p
>> >          | 'Get List of Tickers' >> ReadFromText(input_file)
>> >          | 'Split fields'  >> beam.Map(split_fields)
>> >          | 'Map to String' >> beam.Map(add_year)
>> >
>> > what i am trying to do is to test a full pipeline run, like in the test
>> example below
>> >
>> > from mypackage.email_pipeline import run
>> >
>> > @patch('testing.email_pipeline.ReadFromText')
>> > def test_create_pipelne(self, mock_read_from_text):
>> >     mock_read_from_text.return_value = ['One',
>> >                                         'Two',
>> >                                         'Three']
>> >
>> >     test_pipeline = TestPipeline(is_integration_test=True)
>> >     pipeline_verifiers = [
>> >         PipelineStateMatcher(),
>> >     ]
>> >     extra_opts = {
>> >         'input_table': 'testtable',
>> >         'num_records': 1,
>> >         'beam_bq_source': 'source',
>> >         'on_success_matcher': all_of(*pipeline_verifiers)
>> >     }
>> >     result = run(
>> >
>>  test_pipeline.get_full_options_as_args(**extra_opts))
>> >
>> >     print(result)
>> >
>> > Basically, i would expect a PCollection as result of the pipeline, and
>> i would be testing the content of the PCollection
>> >
>> > Running this results in this messsage
>> >
>> > IT is skipped because --test-pipeline-options is not specified
>> >
>> > Would you be able to advise on this?
>> >
>> > kind regards
>> >
>> >  Marco
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>
>> >> You can use apache_beam.testing.util.assert_that to write tests of
>> >> Beam pipelines. This is what Beam uses for its tests, e.g.
>> >>
>> >>
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80
>> >>
>> >> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <mm...@gmail.com>
>> wrote:
>> >> >
>> >> > Hi all
>> >> >  i was wondering if anyone could provide pointers on how  to test a
>> pipeline in python.
>> >> > I have the following pipeline
>> >> >
>> >> > lines = (p
>> >> >          | 'Get List of Tickers' >> beam.Map(get_tickers)
>> >> >          | 'Split fields'  >> beam.Map(split_fields)
>> >> >          | 'Map to String' >> beam.Map(add_year)
>> >> >          )
>> >> > result = p.run()
>> >> >
>> >> > Now i can easily test each individual function for each step
>> (get_tickers, split_fields, add_year)
>> >> >
>> >> > but is there a way to test the pipeline 'as a whole' ?#
>> >> >
>> >> > Could anyone point me to some examples?
>> >> >
>> >> > kind regards
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>>
>

Re: Testing Apache Beam pipelines / python SDK

Posted by Sofia’s World <mm...@gmail.com>.
Hello Robert
 could you point me to a test sample where a 'mock' sink is used?
do you guys have a testing package , which provide an in memory sink where
for example i can dump the result of
my pipeline (as opposed to writing to a file) ?
Additionally, what is the best way to test writing to BigQuery?
I have seen this file
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py
but it appears it writes to real big query?

kind regards
 Marco







On Fri, Jul 17, 2020 at 11:05 PM Robert Bradshaw <ro...@google.com>
wrote:

> If want a full end-to-end integration test of your pipeline, what you can
> do is:
>
> 1) Write your input data to temporary files.
> 2) Run your pipeline, which writes its output somewhere (ideally a
> temp location as well).
> 3) Open up the outputs and see if it was as expected.
>
> This is similar to the test at
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_test.py
> , but a bit heavy weight.
>
> Another way to validate your pipeline is to refactor the code so the
> inputs and outputs are pluggable. For example, you could write
>
> def run_my_pipeline(input):
>    [all your pipeline logic goes here]
>    [this could also be wrapped up as a PTransform]
>    return result1, result2
>
> def main(...):
>   with beam.Pipeline(...) as p:
>     input = p | beam.io.ReadFromText(...)
>     result1, result2 = run_my_pipeline(input)
>     result1 | beam.io.WriteToSomewhere(...)
>     result2 | beam.io.WriteToSomewhereElse(...)
>
> def test():
>   with beam.Pipeline(...) as p:
>     input = p | beam.Create(...)
>     result1, result2 = run_my_pipeline(input)
>     assert_that(result1, equal_to([...]))
>     assert_that(result2, any_callable_that_validates_result2,
> label="Check2")
>
> You could also parameterize things on your sinks and sources, e.g.
>
> def run_my_pipeline(source, sink1, sink2):
>    with beam.Pipeline(...) as p:
>      input = p | source
>      ...
>      result1 | sink1
>      result2 | sinkn2
>
> def main(...):
>   run_my_pipeline(
>       beam.io.ReadFromText(...),
>       beam.io.WriteToSomewhere(...),
>       beam.io.WriteToSomewhereElse(...))
>
> def test():
>
>   class Check(beam.PTransform):
>     def __init__(checker):
>       self._checker = checker
>     def expand(pcoll):
>       assert_that(pcoll, self._checker)
>
>   run_my_pipeline(
>       beam.Create([...]),
>       Check1(equal_to([...])),
>       Check2(any_callable_that_validates_result2))
>
> or various permutations thereof.
>
> Is that more what you're looking for?
>
>
>
> On Fri, Jul 17, 2020 at 2:46 PM Sofia’s World <mm...@gmail.com> wrote:
> >
> > Hello Robert
> >  thanks but i think i am either missing the point or not expressing
> clearly my goal.
> > I had a look at the util_test.py, and i see that in those tests
> pipelines are being created as part of tests., and  in these tests what are
> being tested are beam functions - eg beam.Map  etc.
> > I am after testing a pipeline as a whole. Taking this example,
> >
> > p = beam.Pipeline(options=pipeline_options)
> > lines = (p
> >          | 'Get List of Tickers' >> ReadFromText(input_file)
> >          | 'Split fields'  >> beam.Map(split_fields)
> >          | 'Map to String' >> beam.Map(add_year)
> >
> > what i am trying to do is to test a full pipeline run, like in the test
> example below
> >
> > from mypackage.email_pipeline import run
> >
> > @patch('testing.email_pipeline.ReadFromText')
> > def test_create_pipelne(self, mock_read_from_text):
> >     mock_read_from_text.return_value = ['One',
> >                                         'Two',
> >                                         'Three']
> >
> >     test_pipeline = TestPipeline(is_integration_test=True)
> >     pipeline_verifiers = [
> >         PipelineStateMatcher(),
> >     ]
> >     extra_opts = {
> >         'input_table': 'testtable',
> >         'num_records': 1,
> >         'beam_bq_source': 'source',
> >         'on_success_matcher': all_of(*pipeline_verifiers)
> >     }
> >     result = run(
> >                     test_pipeline.get_full_options_as_args(**extra_opts))
> >
> >     print(result)
> >
> > Basically, i would expect a PCollection as result of the pipeline, and i
> would be testing the content of the PCollection
> >
> > Running this results in this messsage
> >
> > IT is skipped because --test-pipeline-options is not specified
> >
> > Would you be able to advise on this?
> >
> > kind regards
> >
> >  Marco
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> You can use apache_beam.testing.util.assert_that to write tests of
> >> Beam pipelines. This is what Beam uses for its tests, e.g.
> >>
> >>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80
> >>
> >> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <mm...@gmail.com>
> wrote:
> >> >
> >> > Hi all
> >> >  i was wondering if anyone could provide pointers on how  to test a
> pipeline in python.
> >> > I have the following pipeline
> >> >
> >> > lines = (p
> >> >          | 'Get List of Tickers' >> beam.Map(get_tickers)
> >> >          | 'Split fields'  >> beam.Map(split_fields)
> >> >          | 'Map to String' >> beam.Map(add_year)
> >> >          )
> >> > result = p.run()
> >> >
> >> > Now i can easily test each individual function for each step
> (get_tickers, split_fields, add_year)
> >> >
> >> > but is there a way to test the pipeline 'as a whole' ?#
> >> >
> >> > Could anyone point me to some examples?
> >> >
> >> > kind regards
> >> >
> >> >
> >> >
> >> >
> >> >
>

Re: Testing Apache Beam pipelines / python SDK

Posted by Robert Bradshaw <ro...@google.com>.
If want a full end-to-end integration test of your pipeline, what you can do is:

1) Write your input data to temporary files.
2) Run your pipeline, which writes its output somewhere (ideally a
temp location as well).
3) Open up the outputs and see if it was as expected.

This is similar to the test at
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_test.py
, but a bit heavy weight.

Another way to validate your pipeline is to refactor the code so the
inputs and outputs are pluggable. For example, you could write

def run_my_pipeline(input):
   [all your pipeline logic goes here]
   [this could also be wrapped up as a PTransform]
   return result1, result2

def main(...):
  with beam.Pipeline(...) as p:
    input = p | beam.io.ReadFromText(...)
    result1, result2 = run_my_pipeline(input)
    result1 | beam.io.WriteToSomewhere(...)
    result2 | beam.io.WriteToSomewhereElse(...)

def test():
  with beam.Pipeline(...) as p:
    input = p | beam.Create(...)
    result1, result2 = run_my_pipeline(input)
    assert_that(result1, equal_to([...]))
    assert_that(result2, any_callable_that_validates_result2, label="Check2")

You could also parameterize things on your sinks and sources, e.g.

def run_my_pipeline(source, sink1, sink2):
   with beam.Pipeline(...) as p:
     input = p | source
     ...
     result1 | sink1
     result2 | sinkn2

def main(...):
  run_my_pipeline(
      beam.io.ReadFromText(...),
      beam.io.WriteToSomewhere(...),
      beam.io.WriteToSomewhereElse(...))

def test():

  class Check(beam.PTransform):
    def __init__(checker):
      self._checker = checker
    def expand(pcoll):
      assert_that(pcoll, self._checker)

  run_my_pipeline(
      beam.Create([...]),
      Check1(equal_to([...])),
      Check2(any_callable_that_validates_result2))

or various permutations thereof.

Is that more what you're looking for?



On Fri, Jul 17, 2020 at 2:46 PM Sofia’s World <mm...@gmail.com> wrote:
>
> Hello Robert
>  thanks but i think i am either missing the point or not expressing clearly my goal.
> I had a look at the util_test.py, and i see that in those tests pipelines are being created as part of tests., and  in these tests what are being tested are beam functions - eg beam.Map  etc.
> I am after testing a pipeline as a whole. Taking this example,
>
> p = beam.Pipeline(options=pipeline_options)
> lines = (p
>          | 'Get List of Tickers' >> ReadFromText(input_file)
>          | 'Split fields'  >> beam.Map(split_fields)
>          | 'Map to String' >> beam.Map(add_year)
>
> what i am trying to do is to test a full pipeline run, like in the test example below
>
> from mypackage.email_pipeline import run
>
> @patch('testing.email_pipeline.ReadFromText')
> def test_create_pipelne(self, mock_read_from_text):
>     mock_read_from_text.return_value = ['One',
>                                         'Two',
>                                         'Three']
>
>     test_pipeline = TestPipeline(is_integration_test=True)
>     pipeline_verifiers = [
>         PipelineStateMatcher(),
>     ]
>     extra_opts = {
>         'input_table': 'testtable',
>         'num_records': 1,
>         'beam_bq_source': 'source',
>         'on_success_matcher': all_of(*pipeline_verifiers)
>     }
>     result = run(
>                     test_pipeline.get_full_options_as_args(**extra_opts))
>
>     print(result)
>
> Basically, i would expect a PCollection as result of the pipeline, and i would be testing the content of the PCollection
>
> Running this results in this messsage
>
> IT is skipped because --test-pipeline-options is not specified
>
> Would you be able to advise on this?
>
> kind regards
>
>  Marco
>
>
>
>
>
>
>
>
>
>
>
>
> On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <ro...@google.com> wrote:
>>
>> You can use apache_beam.testing.util.assert_that to write tests of
>> Beam pipelines. This is what Beam uses for its tests, e.g.
>>
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80
>>
>> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <mm...@gmail.com> wrote:
>> >
>> > Hi all
>> >  i was wondering if anyone could provide pointers on how  to test a pipeline in python.
>> > I have the following pipeline
>> >
>> > lines = (p
>> >          | 'Get List of Tickers' >> beam.Map(get_tickers)
>> >          | 'Split fields'  >> beam.Map(split_fields)
>> >          | 'Map to String' >> beam.Map(add_year)
>> >          )
>> > result = p.run()
>> >
>> > Now i can easily test each individual function for each step (get_tickers, split_fields, add_year)
>> >
>> > but is there a way to test the pipeline 'as a whole' ?#
>> >
>> > Could anyone point me to some examples?
>> >
>> > kind regards
>> >
>> >
>> >
>> >
>> >

Re: Testing Apache Beam pipelines / python SDK

Posted by Sofia’s World <mm...@gmail.com>.
Thanks Kyle / Robert... your answer provide me exact information i was after
kind regards
  Marco

On Fri, Jul 17, 2020 at 11:05 PM Kyle Weaver <kc...@google.com> wrote:

> Correction: you need to use `with` to actually run your pipeline:
>
> with beam.Pipeline(options=pipeline_options) as p:
>   lines = (p
>          | 'Get List of Tickers' >> ReadFromText(input_file)
>          | 'Split fields'  >> beam.Map(split_fields)
>          | 'Map to String' >> beam.Map(add_year)
>
>   assert_that(lines, equal_to(["expected_value1", "expected_value2", ...]))
>
>
>
> On Fri, Jul 17, 2020 at 3:02 PM Kyle Weaver <kc...@google.com> wrote:
>
>> > I had a look at the util_test.py, and i see that in those tests
>> pipelines are being created as part of tests., and  in these tests what are
>> being tested are beam functions - eg beam.Map  etc.
>>
>> assert_that checks the results of an entire pipeline, not individual
>> transforms. You should be able to apply assert_that to your example:
>>
>> p = beam.Pipeline(options=pipeline_options)
>> lines = (p
>>          | 'Get List of Tickers' >> ReadFromText(input_file)
>>          | 'Split fields'  >> beam.Map(split_fields)
>>          | 'Map to String' >> beam.Map(add_year)
>>
>> assert_that(lines, equal_to(["expected_value1", "expected_value2", ...]))
>>
>>
>>
>> On Fri, Jul 17, 2020 at 2:53 PM Sofia’s World <mm...@gmail.com>
>> wrote:
>>
>>> Hello Robert
>>>  thanks but i think i am either missing the point or not expressing
>>> clearly my goal.
>>> I had a look at the util_test.py, and i see that in those tests
>>> pipelines are being created as part of tests., and  in these tests what are
>>> being tested are beam functions - eg beam.Map  etc.
>>> I am after testing a pipeline as a whole. Taking this example,
>>>
>>> p = beam.Pipeline(options=pipeline_options)
>>> lines = (p
>>>          | 'Get List of Tickers' >> ReadFromText(input_file)
>>>          | 'Split fields'  >> beam.Map(split_fields)
>>>          | 'Map to String' >> beam.Map(add_year)
>>>
>>> what i am trying to do is to test a full pipeline run, like in the test example below
>>>
>>> from mypackage.email_pipeline import run
>>>
>>> @patch('testing.email_pipeline.ReadFromText')
>>> def test_create_pipelne(self, mock_read_from_text):
>>>     mock_read_from_text.return_value = ['One',
>>>                                         'Two',
>>>                                         'Three']
>>>
>>>     test_pipeline = TestPipeline(is_integration_test=True)
>>>     pipeline_verifiers = [
>>>         PipelineStateMatcher(),
>>>     ]
>>>     extra_opts = {
>>>         'input_table': 'testtable',
>>>         'num_records': 1,
>>>         'beam_bq_source': 'source',
>>>         'on_success_matcher': all_of(*pipeline_verifiers)
>>>     }
>>>     result = run(
>>>                     test_pipeline.get_full_options_as_args(**extra_opts))
>>>
>>>     print(result)
>>>
>>> Basically, i would expect a PCollection as result of the pipeline, and i would be testing the content of the PCollection
>>>
>>> Running this results in this messsage
>>>
>>> IT is skipped because --test-pipeline-options is not specified
>>>
>>> Would you be able to advise on this?
>>>
>>> kind regards
>>>
>>>  Marco
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> You can use apache_beam.testing.util.assert_that to write tests of
>>>> Beam pipelines. This is what Beam uses for its tests, e.g.
>>>>
>>>>
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80
>>>>
>>>> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <mm...@gmail.com>
>>>> wrote:
>>>> >
>>>> > Hi all
>>>> >  i was wondering if anyone could provide pointers on how  to test a
>>>> pipeline in python.
>>>> > I have the following pipeline
>>>> >
>>>> > lines = (p
>>>> >          | 'Get List of Tickers' >> beam.Map(get_tickers)
>>>> >          | 'Split fields'  >> beam.Map(split_fields)
>>>> >          | 'Map to String' >> beam.Map(add_year)
>>>> >          )
>>>> > result = p.run()
>>>> >
>>>> > Now i can easily test each individual function for each step
>>>> (get_tickers, split_fields, add_year)
>>>> >
>>>> > but is there a way to test the pipeline 'as a whole' ?#
>>>> >
>>>> > Could anyone point me to some examples?
>>>> >
>>>> > kind regards
>>>> >
>>>> >
>>>> >
>>>> >
>>>> >
>>>>
>>>

Re: Testing Apache Beam pipelines / python SDK

Posted by Kyle Weaver <kc...@google.com>.
Correction: you need to use `with` to actually run your pipeline:

with beam.Pipeline(options=pipeline_options) as p:
  lines = (p
         | 'Get List of Tickers' >> ReadFromText(input_file)
         | 'Split fields'  >> beam.Map(split_fields)
         | 'Map to String' >> beam.Map(add_year)

  assert_that(lines, equal_to(["expected_value1", "expected_value2", ...]))



On Fri, Jul 17, 2020 at 3:02 PM Kyle Weaver <kc...@google.com> wrote:

> > I had a look at the util_test.py, and i see that in those tests
> pipelines are being created as part of tests., and  in these tests what are
> being tested are beam functions - eg beam.Map  etc.
>
> assert_that checks the results of an entire pipeline, not individual
> transforms. You should be able to apply assert_that to your example:
>
> p = beam.Pipeline(options=pipeline_options)
> lines = (p
>          | 'Get List of Tickers' >> ReadFromText(input_file)
>          | 'Split fields'  >> beam.Map(split_fields)
>          | 'Map to String' >> beam.Map(add_year)
>
> assert_that(lines, equal_to(["expected_value1", "expected_value2", ...]))
>
>
>
> On Fri, Jul 17, 2020 at 2:53 PM Sofia’s World <mm...@gmail.com> wrote:
>
>> Hello Robert
>>  thanks but i think i am either missing the point or not expressing
>> clearly my goal.
>> I had a look at the util_test.py, and i see that in those tests pipelines
>> are being created as part of tests., and  in these tests what are being
>> tested are beam functions - eg beam.Map  etc.
>> I am after testing a pipeline as a whole. Taking this example,
>>
>> p = beam.Pipeline(options=pipeline_options)
>> lines = (p
>>          | 'Get List of Tickers' >> ReadFromText(input_file)
>>          | 'Split fields'  >> beam.Map(split_fields)
>>          | 'Map to String' >> beam.Map(add_year)
>>
>> what i am trying to do is to test a full pipeline run, like in the test example below
>>
>> from mypackage.email_pipeline import run
>>
>> @patch('testing.email_pipeline.ReadFromText')
>> def test_create_pipelne(self, mock_read_from_text):
>>     mock_read_from_text.return_value = ['One',
>>                                         'Two',
>>                                         'Three']
>>
>>     test_pipeline = TestPipeline(is_integration_test=True)
>>     pipeline_verifiers = [
>>         PipelineStateMatcher(),
>>     ]
>>     extra_opts = {
>>         'input_table': 'testtable',
>>         'num_records': 1,
>>         'beam_bq_source': 'source',
>>         'on_success_matcher': all_of(*pipeline_verifiers)
>>     }
>>     result = run(
>>                     test_pipeline.get_full_options_as_args(**extra_opts))
>>
>>     print(result)
>>
>> Basically, i would expect a PCollection as result of the pipeline, and i would be testing the content of the PCollection
>>
>> Running this results in this messsage
>>
>> IT is skipped because --test-pipeline-options is not specified
>>
>> Would you be able to advise on this?
>>
>> kind regards
>>
>>  Marco
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> You can use apache_beam.testing.util.assert_that to write tests of
>>> Beam pipelines. This is what Beam uses for its tests, e.g.
>>>
>>>
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80
>>>
>>> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <mm...@gmail.com>
>>> wrote:
>>> >
>>> > Hi all
>>> >  i was wondering if anyone could provide pointers on how  to test a
>>> pipeline in python.
>>> > I have the following pipeline
>>> >
>>> > lines = (p
>>> >          | 'Get List of Tickers' >> beam.Map(get_tickers)
>>> >          | 'Split fields'  >> beam.Map(split_fields)
>>> >          | 'Map to String' >> beam.Map(add_year)
>>> >          )
>>> > result = p.run()
>>> >
>>> > Now i can easily test each individual function for each step
>>> (get_tickers, split_fields, add_year)
>>> >
>>> > but is there a way to test the pipeline 'as a whole' ?#
>>> >
>>> > Could anyone point me to some examples?
>>> >
>>> > kind regards
>>> >
>>> >
>>> >
>>> >
>>> >
>>>
>>

Re: Testing Apache Beam pipelines / python SDK

Posted by Kyle Weaver <kc...@google.com>.
> I had a look at the util_test.py, and i see that in those tests pipelines
are being created as part of tests., and  in these tests what are being
tested are beam functions - eg beam.Map  etc.

assert_that checks the results of an entire pipeline, not individual
transforms. You should be able to apply assert_that to your example:

p = beam.Pipeline(options=pipeline_options)
lines = (p
         | 'Get List of Tickers' >> ReadFromText(input_file)
         | 'Split fields'  >> beam.Map(split_fields)
         | 'Map to String' >> beam.Map(add_year)

assert_that(lines, equal_to(["expected_value1", "expected_value2", ...]))



On Fri, Jul 17, 2020 at 2:53 PM Sofia’s World <mm...@gmail.com> wrote:

> Hello Robert
>  thanks but i think i am either missing the point or not expressing
> clearly my goal.
> I had a look at the util_test.py, and i see that in those tests pipelines
> are being created as part of tests., and  in these tests what are being
> tested are beam functions - eg beam.Map  etc.
> I am after testing a pipeline as a whole. Taking this example,
>
> p = beam.Pipeline(options=pipeline_options)
> lines = (p
>          | 'Get List of Tickers' >> ReadFromText(input_file)
>          | 'Split fields'  >> beam.Map(split_fields)
>          | 'Map to String' >> beam.Map(add_year)
>
> what i am trying to do is to test a full pipeline run, like in the test example below
>
> from mypackage.email_pipeline import run
>
> @patch('testing.email_pipeline.ReadFromText')
> def test_create_pipelne(self, mock_read_from_text):
>     mock_read_from_text.return_value = ['One',
>                                         'Two',
>                                         'Three']
>
>     test_pipeline = TestPipeline(is_integration_test=True)
>     pipeline_verifiers = [
>         PipelineStateMatcher(),
>     ]
>     extra_opts = {
>         'input_table': 'testtable',
>         'num_records': 1,
>         'beam_bq_source': 'source',
>         'on_success_matcher': all_of(*pipeline_verifiers)
>     }
>     result = run(
>                     test_pipeline.get_full_options_as_args(**extra_opts))
>
>     print(result)
>
> Basically, i would expect a PCollection as result of the pipeline, and i would be testing the content of the PCollection
>
> Running this results in this messsage
>
> IT is skipped because --test-pipeline-options is not specified
>
> Would you be able to advise on this?
>
> kind regards
>
>  Marco
>
>
>
>
>
>
>
>
>
>
>
>
> On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> You can use apache_beam.testing.util.assert_that to write tests of
>> Beam pipelines. This is what Beam uses for its tests, e.g.
>>
>>
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80
>>
>> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <mm...@gmail.com>
>> wrote:
>> >
>> > Hi all
>> >  i was wondering if anyone could provide pointers on how  to test a
>> pipeline in python.
>> > I have the following pipeline
>> >
>> > lines = (p
>> >          | 'Get List of Tickers' >> beam.Map(get_tickers)
>> >          | 'Split fields'  >> beam.Map(split_fields)
>> >          | 'Map to String' >> beam.Map(add_year)
>> >          )
>> > result = p.run()
>> >
>> > Now i can easily test each individual function for each step
>> (get_tickers, split_fields, add_year)
>> >
>> > but is there a way to test the pipeline 'as a whole' ?#
>> >
>> > Could anyone point me to some examples?
>> >
>> > kind regards
>> >
>> >
>> >
>> >
>> >
>>
>

Re: Testing Apache Beam pipelines / python SDK

Posted by Sofia’s World <mm...@gmail.com>.
Hello Robert
 thanks but i think i am either missing the point or not expressing clearly
my goal.
I had a look at the util_test.py, and i see that in those tests pipelines
are being created as part of tests., and  in these tests what are being
tested are beam functions - eg beam.Map  etc.
I am after testing a pipeline as a whole. Taking this example,

p = beam.Pipeline(options=pipeline_options)
lines = (p
         | 'Get List of Tickers' >> ReadFromText(input_file)
         | 'Split fields'  >> beam.Map(split_fields)
         | 'Map to String' >> beam.Map(add_year)

what i am trying to do is to test a full pipeline run, like in the
test example below

from mypackage.email_pipeline import run

@patch('testing.email_pipeline.ReadFromText')
def test_create_pipelne(self, mock_read_from_text):
    mock_read_from_text.return_value = ['One',
                                        'Two',
                                        'Three']

    test_pipeline = TestPipeline(is_integration_test=True)
    pipeline_verifiers = [
        PipelineStateMatcher(),
    ]
    extra_opts = {
        'input_table': 'testtable',
        'num_records': 1,
        'beam_bq_source': 'source',
        'on_success_matcher': all_of(*pipeline_verifiers)
    }
    result = run(
                    test_pipeline.get_full_options_as_args(**extra_opts))

    print(result)

Basically, i would expect a PCollection as result of the pipeline, and
i would be testing the content of the PCollection

Running this results in this messsage

IT is skipped because --test-pipeline-options is not specified

Would you be able to advise on this?

kind regards

 Marco












On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <ro...@google.com>
wrote:

> You can use apache_beam.testing.util.assert_that to write tests of
> Beam pipelines. This is what Beam uses for its tests, e.g.
>
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80
>
> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <mm...@gmail.com> wrote:
> >
> > Hi all
> >  i was wondering if anyone could provide pointers on how  to test a
> pipeline in python.
> > I have the following pipeline
> >
> > lines = (p
> >          | 'Get List of Tickers' >> beam.Map(get_tickers)
> >          | 'Split fields'  >> beam.Map(split_fields)
> >          | 'Map to String' >> beam.Map(add_year)
> >          )
> > result = p.run()
> >
> > Now i can easily test each individual function for each step
> (get_tickers, split_fields, add_year)
> >
> > but is there a way to test the pipeline 'as a whole' ?#
> >
> > Could anyone point me to some examples?
> >
> > kind regards
> >
> >
> >
> >
> >
>

Re: Testing Apache Beam pipelines / python SDK

Posted by Robert Bradshaw <ro...@google.com>.
You can use apache_beam.testing.util.assert_that to write tests of
Beam pipelines. This is what Beam uses for its tests, e.g.

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80

On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <mm...@gmail.com> wrote:
>
> Hi all
>  i was wondering if anyone could provide pointers on how  to test a pipeline in python.
> I have the following pipeline
>
> lines = (p
>          | 'Get List of Tickers' >> beam.Map(get_tickers)
>          | 'Split fields'  >> beam.Map(split_fields)
>          | 'Map to String' >> beam.Map(add_year)
>          )
> result = p.run()
>
> Now i can easily test each individual function for each step (get_tickers, split_fields, add_year)
>
> but is there a way to test the pipeline 'as a whole' ?#
>
> Could anyone point me to some examples?
>
> kind regards
>
>
>
>
>