You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2018/01/24 17:58:34 UTC

[beam] branch master updated: [BEAM-3126] Adding a new Flatten test to Python SDK. (#4463)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e70b068  [BEAM-3126] Adding a new Flatten test to Python SDK. (#4463)
e70b068 is described below

commit e70b0688fa57ad381a97630710c75233a4bb72f7
Author: Daniel Oliveira <yo...@gmail.com>
AuthorDate: Wed Jan 24 09:58:30 2018 -0800

    [BEAM-3126] Adding a new Flatten test to Python SDK. (#4463)
---
 sdks/python/apache_beam/transforms/ptransform_test.py | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index fb20d8c..09ac72b 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -535,6 +535,24 @@ class PTransformTest(unittest.TestCase):
     with self.assertRaises(TypeError):
       set([1, 2, 3]) | beam.Flatten()
 
+  @attr('ValidatesRunner')
+  def test_flatten_multiple_pcollections_having_multiple_consumers(self):
+    pipeline = TestPipeline()
+    input = pipeline | 'Start' >> beam.Create(['AA', 'BBB', 'CC'])
+
+    def split_even_odd(element):
+      tag = 'even_length' if len(element) % 2 == 0 else 'odd_length'
+      return pvalue.TaggedOutput(tag, element)
+
+    even_length, odd_length = (input | beam.Map(split_even_odd)
+                               .with_outputs('even_length', 'odd_length'))
+    merged = (even_length, odd_length) | 'Flatten' >> beam.Flatten()
+
+    assert_that(merged, equal_to(['AA', 'BBB', 'CC']))
+    assert_that(even_length, equal_to(['AA', 'CC']), label='assert:even')
+    assert_that(odd_length, equal_to(['BBB']), label='assert:odd')
+    pipeline.run()
+
   def test_co_group_by_key_on_list(self):
     pipeline = TestPipeline()
     pcoll_1 = pipeline | 'Start 1' >> beam.Create(

-- 
To stop receiving notification emails like this one, please contact
robertwb@apache.org.