You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2018/01/24 17:58:34 UTC
[beam] branch master updated: [BEAM-3126] Adding a new Flatten test
to Python SDK. (#4463)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e70b068 [BEAM-3126] Adding a new Flatten test to Python SDK. (#4463)
e70b068 is described below
commit e70b0688fa57ad381a97630710c75233a4bb72f7
Author: Daniel Oliveira <yo...@gmail.com>
AuthorDate: Wed Jan 24 09:58:30 2018 -0800
[BEAM-3126] Adding a new Flatten test to Python SDK. (#4463)
---
sdks/python/apache_beam/transforms/ptransform_test.py | 18 ++++++++++++++++++
1 file changed, 18 insertions(+)
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index fb20d8c..09ac72b 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -535,6 +535,24 @@ class PTransformTest(unittest.TestCase):
with self.assertRaises(TypeError):
set([1, 2, 3]) | beam.Flatten()
+ @attr('ValidatesRunner')
+ def test_flatten_multiple_pcollections_having_multiple_consumers(self):
+ pipeline = TestPipeline()
+ input = pipeline | 'Start' >> beam.Create(['AA', 'BBB', 'CC'])
+
+ def split_even_odd(element):
+ tag = 'even_length' if len(element) % 2 == 0 else 'odd_length'
+ return pvalue.TaggedOutput(tag, element)
+
+ even_length, odd_length = (input | beam.Map(split_even_odd)
+ .with_outputs('even_length', 'odd_length'))
+ merged = (even_length, odd_length) | 'Flatten' >> beam.Flatten()
+
+ assert_that(merged, equal_to(['AA', 'BBB', 'CC']))
+ assert_that(even_length, equal_to(['AA', 'CC']), label='assert:even')
+ assert_that(odd_length, equal_to(['BBB']), label='assert:odd')
+ pipeline.run()
+
def test_co_group_by_key_on_list(self):
pipeline = TestPipeline()
pcoll_1 = pipeline | 'Start 1' >> beam.Create(
--
To stop receiving notification emails like this one, please contact
robertwb@apache.org.