You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/26 00:18:21 UTC

[1/2] beam git commit: Revert "Revert "Remove dataflow_test.py""

Repository: beam
Updated Branches:
  refs/heads/python-sdk 4e1028b3d -> c6420df97


Revert "Revert "Remove dataflow_test.py""

This reverts commit 96fcc7d31c2540f867c3a73903c2aa99183a6b8b.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2aa7d47e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2aa7d47e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2aa7d47e

Branch: refs/heads/python-sdk
Commit: 2aa7d47e1491e0601b7b4d1476a8f182b2a14dc3
Parents: 4e1028b
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jan 24 16:33:55 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jan 25 16:18:09 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/dataflow_test.py        | 418 -------------------
 .../apache_beam/transforms/ptransform_test.py   |  67 +++
 .../apache_beam/transforms/sideinputs_test.py   | 208 ++++++++-
 3 files changed, 274 insertions(+), 419 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2aa7d47e/sdks/python/apache_beam/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py
deleted file mode 100644
index f410230..0000000
--- a/sdks/python/apache_beam/dataflow_test.py
+++ /dev/null
@@ -1,418 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Integration tests for the dataflow package."""
-
-from __future__ import absolute_import
-
-import logging
-import re
-import unittest
-
-import apache_beam as beam
-from apache_beam.pvalue import AsDict
-from apache_beam.pvalue import AsIter as AllOf
-from apache_beam.pvalue import AsList
-from apache_beam.pvalue import AsSingleton
-from apache_beam.pvalue import EmptySideInput
-from apache_beam.pvalue import SideOutputValue
-from apache_beam.test_pipeline import TestPipeline
-from apache_beam.transforms import Create
-from apache_beam.transforms import DoFn
-from apache_beam.transforms import FlatMap
-from apache_beam.transforms import GroupByKey
-from apache_beam.transforms import Map
-from apache_beam.transforms import ParDo
-from apache_beam.transforms import WindowInto
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
-from apache_beam.transforms.window import IntervalWindow
-from apache_beam.transforms.window import WindowFn
-from nose.plugins.attrib import attr
-
-
-class DataflowTest(unittest.TestCase):
-  """Dataflow integration tests."""
-
-  SAMPLE_DATA = ['aa bb cc aa bb aa \n'] * 10
-  SAMPLE_RESULT = [('cc', 10), ('bb', 20), ('aa', 30)]
-
-  @beam.ptransform_fn
-  def Count(pcoll):  # pylint: disable=invalid-name, no-self-argument
-    """A Count transform: v, ... => (v, n), ..."""
-    return (pcoll
-            | 'AddCount' >> Map(lambda x: (x, 1))
-            | 'GroupCounts' >> GroupByKey()
-            | 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones))))
-
-  @attr('ValidatesRunner')
-  def test_word_count(self):
-    pipeline = TestPipeline()
-    lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA)
-    result = (
-        (lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x)))
-        .apply('CountWords', DataflowTest.Count))
-    assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_map(self):
-    pipeline = TestPipeline()
-    lines = pipeline | 'input' >> Create(['a', 'b', 'c'])
-    result = (lines
-              | 'upper' >> Map(str.upper)
-              | 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-'))
-    assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_par_do_with_side_input_as_arg(self):
-    pipeline = TestPipeline()
-    words_list = ['aa', 'bb', 'cc']
-    words = pipeline | 'SomeWords' >> Create(words_list)
-    prefix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
-    suffix = 'zyx'
-    result = words | FlatMap(
-        'DecorateWords',
-        lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
-        AsSingleton(prefix), suffix)
-    assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list]))
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_par_do_with_side_input_as_keyword_arg(self):
-    pipeline = TestPipeline()
-    words_list = ['aa', 'bb', 'cc']
-    words = pipeline | 'SomeWords' >> Create(words_list)
-    prefix = 'zyx'
-    suffix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
-    result = words | FlatMap(
-        'DecorateWords',
-        lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
-        prefix, sfx=AsSingleton(suffix))
-    assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_par_do_with_do_fn_object(self):
-    class SomeDoFn(DoFn):
-      """A custom DoFn for a FlatMap transform."""
-
-      def process(self, context, prefix, suffix):
-        return ['%s-%s-%s' % (prefix, context.element, suffix)]
-
-    pipeline = TestPipeline()
-    words_list = ['aa', 'bb', 'cc']
-    words = pipeline | 'SomeWords' >> Create(words_list)
-    prefix = 'zyx'
-    suffix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
-    result = words | 'DecorateWordsDoFn' >> ParDo(
-        SomeDoFn(), prefix, suffix=AsSingleton(suffix))
-    assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_par_do_with_multiple_outputs_and_using_yield(self):
-    class SomeDoFn(DoFn):
-      """A custom DoFn using yield."""
-
-      def process(self, context):
-        yield context.element
-        if context.element % 2 == 0:
-          yield SideOutputValue('even', context.element)
-        else:
-          yield SideOutputValue('odd', context.element)
-
-    pipeline = TestPipeline()
-    nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
-    results = nums | ParDo(
-        'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
-    assert_that(results.main, equal_to([1, 2, 3, 4]))
-    assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
-    assert_that(results.even, equal_to([2, 4]), label='assert:even')
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_par_do_with_multiple_outputs_and_using_return(self):
-    def some_fn(v):
-      if v % 2 == 0:
-        return [v, SideOutputValue('even', v)]
-      else:
-        return [v, SideOutputValue('odd', v)]
-
-    pipeline = TestPipeline()
-    nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
-    results = nums | FlatMap(
-        'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
-    assert_that(results.main, equal_to([1, 2, 3, 4]))
-    assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
-    assert_that(results.even, equal_to([2, 4]), label='assert:even')
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_empty_singleton_side_input(self):
-    pipeline = TestPipeline()
-    pcol = pipeline | 'start' >> Create([1, 2])
-    side = pipeline | 'side' >> Create([])  # Empty side input.
-
-    def my_fn(k, s):
-      v = ('empty' if isinstance(s, EmptySideInput) else 'full')
-      return [(k, v)]
-    result = pcol | 'compute' >> FlatMap(my_fn, AsSingleton(side))
-    assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
-    pipeline.run()
-
-  # @attr('ValidatesRunner')
-  # TODO(BEAM-1124): Temporarily disable it due to test failed running on
-  # Dataflow service.
-  def test_multi_valued_singleton_side_input(self):
-    pipeline = TestPipeline()
-    pcol = pipeline | 'start' >> Create([1, 2])
-    side = pipeline | 'side' >> Create([3, 4])  # 2 values in side input.
-    pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side))  # pylint: disable=expression-not-assigned
-    with self.assertRaises(ValueError):
-      pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_default_value_singleton_side_input(self):
-    pipeline = TestPipeline()
-    pcol = pipeline | 'start' >> Create([1, 2])
-    side = pipeline | 'side' >> Create([])  # 0 values in side input.
-    result = pcol | FlatMap(lambda x, s: [x * s], AsSingleton(side, 10))
-    assert_that(result, equal_to([10, 20]))
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_iterable_side_input(self):
-    pipeline = TestPipeline()
-    pcol = pipeline | 'start' >> Create([1, 2])
-    side = pipeline | 'side' >> Create([3, 4])  # 2 values in side input.
-    result = pcol | FlatMap('compute',
-                            lambda x, s: [x * y for y in s], AllOf(side))
-    assert_that(result, equal_to([3, 4, 6, 8]))
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_undeclared_side_outputs(self):
-    pipeline = TestPipeline()
-    nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
-    results = nums | FlatMap(
-        'ClassifyNumbers',
-        lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
-    ).with_outputs()
-    assert_that(results[None], equal_to([1, 2, 3, 4]))
-    assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
-    assert_that(results.even, equal_to([2, 4]), label='assert:even')
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_empty_side_outputs(self):
-    pipeline = TestPipeline()
-    nums = pipeline | 'Some Numbers' >> Create([1, 3, 5])
-    results = nums | FlatMap(
-        'ClassifyNumbers',
-        lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
-    ).with_outputs()
-    assert_that(results[None], equal_to([1, 3, 5]))
-    assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd')
-    assert_that(results.even, equal_to([]), label='assert:even')
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_as_list_and_as_dict_side_inputs(self):
-    a_list = [5, 1, 3, 2, 9]
-    some_pairs = [('crouton', 17), ('supreme', None)]
-    pipeline = TestPipeline()
-    main_input = pipeline | 'main input' >> Create([1])
-    side_list = pipeline | 'side list' >> Create(a_list)
-    side_pairs = pipeline | 'side pairs' >> Create(some_pairs)
-    results = main_input | FlatMap(
-        'concatenate',
-        lambda x, the_list, the_dict: [[x, the_list, the_dict]],
-        AsList(side_list), AsDict(side_pairs))
-
-    def  matcher(expected_elem, expected_list, expected_pairs):
-      def match(actual):
-        [[actual_elem, actual_list, actual_dict]] = actual
-        equal_to([expected_elem])([actual_elem])
-        equal_to(expected_list)(actual_list)
-        equal_to(expected_pairs)(actual_dict.iteritems())
-      return match
-
-    assert_that(results, matcher(1, a_list, some_pairs))
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_as_singleton_without_unique_labels(self):
-    # This should succeed as calling AsSingleton on the same PCollection twice
-    # with the same defaults will return the same PCollectionView.
-    a_list = [2]
-    pipeline = TestPipeline()
-    main_input = pipeline | 'main input' >> Create([1])
-    side_list = pipeline | 'side list' >> Create(a_list)
-    results = main_input | FlatMap(
-        'test',
-        lambda x, s1, s2: [[x, s1, s2]],
-        AsSingleton(side_list), AsSingleton(side_list))
-
-    def  matcher(expected_elem, expected_singleton):
-      def match(actual):
-        [[actual_elem, actual_singleton1, actual_singleton2]] = actual
-        equal_to([expected_elem])([actual_elem])
-        equal_to([expected_singleton])([actual_singleton1])
-        equal_to([expected_singleton])([actual_singleton2])
-      return match
-
-    assert_that(results, matcher(1, 2))
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_as_singleton_with_different_defaults_without_unique_labels(self):
-    # This should fail as AsSingleton with distinct default values should create
-    # distinct PCollectionViews with the same full_label.
-    a_list = [2]
-    pipeline = TestPipeline()
-    main_input = pipeline | 'main input' >> Create([1])
-    side_list = pipeline | 'side list' >> Create(a_list)
-
-    with self.assertRaises(RuntimeError) as e:
-      _ = main_input | FlatMap(
-          'test',
-          lambda x, s1, s2: [[x, s1, s2]],
-          AsSingleton(side_list), AsSingleton(side_list, default_value=3))
-    self.assertTrue(
-        e.exception.message.startswith(
-            'Transform "ViewAsSingleton(side list.None)" does not have a '
-            'stable unique label.'))
-
-  @attr('ValidatesRunner')
-  def test_as_singleton_with_different_defaults_with_unique_labels(self):
-    a_list = []
-    pipeline = TestPipeline()
-    main_input = pipeline | 'main input' >> Create([1])
-    side_list = pipeline | 'side list' >> Create(a_list)
-    results = main_input | FlatMap(
-        'test',
-        lambda x, s1, s2: [[x, s1, s2]],
-        AsSingleton('si1', side_list, default_value=2),
-        AsSingleton('si2', side_list, default_value=3))
-
-    def  matcher(expected_elem, expected_singleton1, expected_singleton2):
-      def match(actual):
-        [[actual_elem, actual_singleton1, actual_singleton2]] = actual
-        equal_to([expected_elem])([actual_elem])
-        equal_to([expected_singleton1])([actual_singleton1])
-        equal_to([expected_singleton2])([actual_singleton2])
-      return match
-
-    assert_that(results, matcher(1, 2, 3))
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_as_list_without_unique_labels(self):
-    # This should succeed as calling AsList on the same PCollection twice will
-    # return the same PCollectionView.
-    a_list = [1, 2, 3]
-    pipeline = TestPipeline()
-    main_input = pipeline | 'main input' >> Create([1])
-    side_list = pipeline | 'side list' >> Create(a_list)
-    results = main_input | FlatMap(
-        'test',
-        lambda x, ls1, ls2: [[x, ls1, ls2]],
-        AsList(side_list), AsList(side_list))
-
-    def  matcher(expected_elem, expected_list):
-      def match(actual):
-        [[actual_elem, actual_list1, actual_list2]] = actual
-        equal_to([expected_elem])([actual_elem])
-        equal_to(expected_list)(actual_list1)
-        equal_to(expected_list)(actual_list2)
-      return match
-
-    assert_that(results, matcher(1, [1, 2, 3]))
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_as_list_with_unique_labels(self):
-    a_list = [1, 2, 3]
-    pipeline = TestPipeline()
-    main_input = pipeline | 'main input' >> Create([1])
-    side_list = pipeline | 'side list' >> Create(a_list)
-    results = main_input | FlatMap(
-        'test',
-        lambda x, ls1, ls2: [[x, ls1, ls2]],
-        AsList(side_list), AsList(side_list, label='label'))
-
-    def  matcher(expected_elem, expected_list):
-      def match(actual):
-        [[actual_elem, actual_list1, actual_list2]] = actual
-        equal_to([expected_elem])([actual_elem])
-        equal_to(expected_list)(actual_list1)
-        equal_to(expected_list)(actual_list2)
-      return match
-
-    assert_that(results, matcher(1, [1, 2, 3]))
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_as_dict_with_unique_labels(self):
-    some_kvs = [('a', 1), ('b', 2)]
-    pipeline = TestPipeline()
-    main_input = pipeline | 'main input' >> Create([1])
-    side_kvs = pipeline | 'side kvs' >> Create(some_kvs)
-    results = main_input | FlatMap(
-        'test',
-        lambda x, dct1, dct2: [[x, dct1, dct2]],
-        AsDict(side_kvs), AsDict(side_kvs, label='label'))
-
-    def  matcher(expected_elem, expected_kvs):
-      def match(actual):
-        [[actual_elem, actual_dict1, actual_dict2]] = actual
-        equal_to([expected_elem])([actual_elem])
-        equal_to(expected_kvs)(actual_dict1.iteritems())
-        equal_to(expected_kvs)(actual_dict2.iteritems())
-      return match
-
-    assert_that(results, matcher(1, some_kvs))
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_window_transform(self):
-    class TestWindowFn(WindowFn):
-      """Windowing function adding two disjoint windows to each element."""
-
-      def assign(self, assign_context):
-        _ = assign_context
-        return [IntervalWindow(10, 20), IntervalWindow(20, 30)]
-
-      def merge(self, existing_windows):
-        return existing_windows
-
-    pipeline = TestPipeline()
-    numbers = pipeline | 'KVs' >> Create([(1, 10), (2, 20), (3, 30)])
-    result = (numbers
-              | 'W' >> WindowInto(windowfn=TestWindowFn())
-              | 'G' >> GroupByKey())
-    assert_that(
-        result, equal_to([(1, [10]), (1, [10]), (2, [20]),
-                          (2, [20]), (3, [30]), (3, [30])]))
-    pipeline.run()
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/2aa7d47e/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 827bc83..68e4482 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -24,6 +24,7 @@ import re
 import unittest
 
 import hamcrest as hc
+from nose.plugins.attrib import attr
 
 import apache_beam as beam
 from apache_beam.test_pipeline import TestPipeline
@@ -189,6 +190,72 @@ class PTransformTest(unittest.TestCase):
     assert_that(r2.m, equal_to([3, 4, 5]), label='r2')
     pipeline.run()
 
+  @attr('ValidatesRunner')
+  def test_par_do_with_multiple_outputs_and_using_yield(self):
+    class SomeDoFn(beam.DoFn):
+      """A custom DoFn using yield."""
+
+      def process(self, context):
+        yield context.element
+        if context.element % 2 == 0:
+          yield pvalue.SideOutputValue('even', context.element)
+        else:
+          yield pvalue.SideOutputValue('odd', context.element)
+
+    pipeline = TestPipeline()
+    nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
+    results = nums | beam.ParDo(
+        'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
+    assert_that(results.main, equal_to([1, 2, 3, 4]))
+    assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
+    assert_that(results.even, equal_to([2, 4]), label='assert:even')
+    pipeline.run()
+
+  @attr('ValidatesRunner')
+  def test_par_do_with_multiple_outputs_and_using_return(self):
+    def some_fn(v):
+      if v % 2 == 0:
+        return [v, pvalue.SideOutputValue('even', v)]
+      else:
+        return [v, pvalue.SideOutputValue('odd', v)]
+
+    pipeline = TestPipeline()
+    nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
+    results = nums | beam.FlatMap(
+        'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
+    assert_that(results.main, equal_to([1, 2, 3, 4]))
+    assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
+    assert_that(results.even, equal_to([2, 4]), label='assert:even')
+    pipeline.run()
+
+  @attr('ValidatesRunner')
+  def test_undeclared_side_outputs(self):
+    pipeline = TestPipeline()
+    nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
+    results = nums | beam.FlatMap(
+        'ClassifyNumbers',
+        lambda x: [x,
+                   pvalue.SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
+    ).with_outputs()
+    assert_that(results[None], equal_to([1, 2, 3, 4]))
+    assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
+    assert_that(results.even, equal_to([2, 4]), label='assert:even')
+    pipeline.run()
+
+  @attr('ValidatesRunner')
+  def test_empty_side_outputs(self):
+    pipeline = TestPipeline()
+    nums = pipeline | 'Some Numbers' >> beam.Create([1, 3, 5])
+    results = nums | beam.FlatMap(
+        'ClassifyNumbers',
+        lambda x: [x,
+                   pvalue.SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
+    ).with_outputs()
+    assert_that(results[None], equal_to([1, 3, 5]))
+    assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd')
+    assert_that(results.even, equal_to([]), label='assert:even')
+    pipeline.run()
+
   def test_do_requires_do_fn_returning_iterable(self):
     # This function is incorrect because it returns an object that isn't an
     # iterable.

http://git-wip-us.apache.org/repos/asf/beam/blob/2aa7d47e/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index a327dc8..9384e7b 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -20,7 +20,10 @@
 import logging
 import unittest
 
+from nose.plugins.attrib import attr
+
 import apache_beam as beam
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import window
 from apache_beam.transforms.util import assert_that, equal_to
 
@@ -28,7 +31,7 @@ from apache_beam.transforms.util import assert_that, equal_to
 class SideInputsTest(unittest.TestCase):
 
   def create_pipeline(self):
-    return beam.Pipeline('DirectRunner')
+    return TestPipeline('DirectRunner')
 
   def run_windowed_side_inputs(self, elements, main_window_fn,
                                side_window_fn=None,
@@ -125,6 +128,209 @@ class SideInputsTest(unittest.TestCase):
             (11, {'k11': 'v11'}),
         ])
 
+  @attr('ValidatesRunner')
+  def test_empty_singleton_side_input(self):
+    pipeline = self.create_pipeline()
+    pcol = pipeline | 'start' >> beam.Create([1, 2])
+    side = pipeline | 'side' >> beam.Create([])  # Empty side input.
+
+    def my_fn(k, s):
+      # TODO(robertwb): Should this be an error as in Java?
+      v = ('empty' if isinstance(s, beam.pvalue.EmptySideInput) else 'full')
+      return [(k, v)]
+    result = pcol | 'compute' >> beam.FlatMap(
+        my_fn, beam.pvalue.AsSingleton(side))
+    assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
+    pipeline.run()
+
+  # @attr('ValidatesRunner')
+  # TODO(BEAM-1124): Temporarily disable it due to test failed running on
+  # Dataflow service.
+  def test_multi_valued_singleton_side_input(self):
+    pipeline = self.create_pipeline()
+    pcol = pipeline | 'start' >> beam.Create([1, 2])
+    side = pipeline | 'side' >> beam.Create([3, 4])  # 2 values in side input.
+    pcol | 'compute' >> beam.FlatMap(  # pylint: disable=expression-not-assigned
+        lambda x, s: [x * s], beam.pvalue.AsSingleton(side))
+    with self.assertRaises(ValueError):
+      pipeline.run()
+
+  @attr('ValidatesRunner')
+  def test_default_value_singleton_side_input(self):
+    pipeline = self.create_pipeline()
+    pcol = pipeline | 'start' >> beam.Create([1, 2])
+    side = pipeline | 'side' >> beam.Create([])  # 0 values in side input.
+    result = pcol | beam.FlatMap(
+        lambda x, s: [x * s], beam.pvalue.AsSingleton(side, 10))
+    assert_that(result, equal_to([10, 20]))
+    pipeline.run()
+
+  @attr('ValidatesRunner')
+  def test_iterable_side_input(self):
+    pipeline = self.create_pipeline()
+    pcol = pipeline | 'start' >> beam.Create([1, 2])
+    side = pipeline | 'side' >> beam.Create([3, 4])  # 2 values in side input.
+    result = pcol | 'compute' >> beam.FlatMap(
+        lambda x, s: [x * y for y in s],
+        beam.pvalue.AsIter(side))
+    assert_that(result, equal_to([3, 4, 6, 8]))
+    pipeline.run()
+
+  @attr('ValidatesRunner')
+  def test_as_list_and_as_dict_side_inputs(self):
+    a_list = [5, 1, 3, 2, 9]
+    some_pairs = [('crouton', 17), ('supreme', None)]
+    pipeline = self.create_pipeline()
+    main_input = pipeline | 'main input' >> beam.Create([1])
+    side_list = pipeline | 'side list' >> beam.Create(a_list)
+    side_pairs = pipeline | 'side pairs' >> beam.Create(some_pairs)
+    results = main_input | 'concatenate' >> beam.FlatMap(
+        lambda x, the_list, the_dict: [[x, the_list, the_dict]],
+        beam.pvalue.AsList(side_list), beam.pvalue.AsDict(side_pairs))
+
+    def  matcher(expected_elem, expected_list, expected_pairs):
+      def match(actual):
+        [[actual_elem, actual_list, actual_dict]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to(expected_list)(actual_list)
+        equal_to(expected_pairs)(actual_dict.iteritems())
+      return match
+
+    assert_that(results, matcher(1, a_list, some_pairs))
+    pipeline.run()
+
+  @attr('ValidatesRunner')
+  def test_as_singleton_without_unique_labels(self):
+    # This should succeed as calling beam.pvalue.AsSingleton on the same
+    # PCollection twice with the same defaults will return the same
+    # PCollectionView.
+    a_list = [2]
+    pipeline = self.create_pipeline()
+    main_input = pipeline | 'main input' >> beam.Create([1])
+    side_list = pipeline | 'side list' >> beam.Create(a_list)
+    results = main_input | beam.FlatMap(
+        lambda x, s1, s2: [[x, s1, s2]],
+        beam.pvalue.AsSingleton(side_list), beam.pvalue.AsSingleton(side_list))
+
+    def  matcher(expected_elem, expected_singleton):
+      def match(actual):
+        [[actual_elem, actual_singleton1, actual_singleton2]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to([expected_singleton])([actual_singleton1])
+        equal_to([expected_singleton])([actual_singleton2])
+      return match
+
+    assert_that(results, matcher(1, 2))
+    pipeline.run()
+
+  @attr('ValidatesRunner')
+  def test_as_singleton_with_different_defaults_without_unique_labels(self):
+    # This should fail as beam.pvalue.AsSingleton with distinct default values
+    # should beam.Create distinct PCollectionViews with the same full_label.
+    a_list = [2]
+    pipeline = self.create_pipeline()
+    main_input = pipeline | 'main input' >> beam.Create([1])
+    side_list = pipeline | 'side list' >> beam.Create(a_list)
+
+    with self.assertRaises(RuntimeError) as e:
+      _ = main_input | beam.FlatMap(
+          lambda x, s1, s2: [[x, s1, s2]],
+          beam.pvalue.AsSingleton(side_list),
+          beam.pvalue.AsSingleton(side_list, default_value=3))
+    self.assertTrue(
+        e.exception.message.startswith(
+            'Transform "ViewAsSingleton(side list.None)" does not have a '
+            'stable unique label.'))
+
+  @attr('ValidatesRunner')
+  def test_as_singleton_with_different_defaults_with_unique_labels(self):
+    a_list = []
+    pipeline = self.create_pipeline()
+    main_input = pipeline | 'main input' >> beam.Create([1])
+    side_list = pipeline | 'side list' >> beam.Create(a_list)
+    results = main_input | beam.FlatMap(
+        lambda x, s1, s2: [[x, s1, s2]],
+        beam.pvalue.AsSingleton('si1', side_list, default_value=2),
+        beam.pvalue.AsSingleton('si2', side_list, default_value=3))
+
+    def  matcher(expected_elem, expected_singleton1, expected_singleton2):
+      def match(actual):
+        [[actual_elem, actual_singleton1, actual_singleton2]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to([expected_singleton1])([actual_singleton1])
+        equal_to([expected_singleton2])([actual_singleton2])
+      return match
+
+    assert_that(results, matcher(1, 2, 3))
+    pipeline.run()
+
+  @attr('ValidatesRunner')
+  def test_as_list_without_unique_labels(self):
+    # This should succeed as calling beam.pvalue.AsList on the same
+    # PCollection twice will return the same PCollectionView.
+    a_list = [1, 2, 3]
+    pipeline = self.create_pipeline()
+    main_input = pipeline | 'main input' >> beam.Create([1])
+    side_list = pipeline | 'side list' >> beam.Create(a_list)
+    results = main_input | beam.FlatMap(
+        lambda x, ls1, ls2: [[x, ls1, ls2]],
+        beam.pvalue.AsList(side_list), beam.pvalue.AsList(side_list))
+
+    def  matcher(expected_elem, expected_list):
+      def match(actual):
+        [[actual_elem, actual_list1, actual_list2]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to(expected_list)(actual_list1)
+        equal_to(expected_list)(actual_list2)
+      return match
+
+    assert_that(results, matcher(1, [1, 2, 3]))
+    pipeline.run()
+
+  @attr('ValidatesRunner')
+  def test_as_list_with_unique_labels(self):
+    a_list = [1, 2, 3]
+    pipeline = self.create_pipeline()
+    main_input = pipeline | 'main input' >> beam.Create([1])
+    side_list = pipeline | 'side list' >> beam.Create(a_list)
+    results = main_input | beam.FlatMap(
+        lambda x, ls1, ls2: [[x, ls1, ls2]],
+        beam.pvalue.AsList(side_list),
+        beam.pvalue.AsList(side_list, label='label'))
+
+    def  matcher(expected_elem, expected_list):
+      def match(actual):
+        [[actual_elem, actual_list1, actual_list2]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to(expected_list)(actual_list1)
+        equal_to(expected_list)(actual_list2)
+      return match
+
+    assert_that(results, matcher(1, [1, 2, 3]))
+    pipeline.run()
+
+  @attr('ValidatesRunner')
+  def test_as_dict_with_unique_labels(self):
+    some_kvs = [('a', 1), ('b', 2)]
+    pipeline = self.create_pipeline()
+    main_input = pipeline | 'main input' >> beam.Create([1])
+    side_kvs = pipeline | 'side kvs' >> beam.Create(some_kvs)
+    results = main_input | beam.FlatMap(
+        lambda x, dct1, dct2: [[x, dct1, dct2]],
+        beam.pvalue.AsDict(side_kvs),
+        beam.pvalue.AsDict(side_kvs, label='label'))
+
+    def  matcher(expected_elem, expected_kvs):
+      def match(actual):
+        [[actual_elem, actual_dict1, actual_dict2]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to(expected_kvs)(actual_dict1.iteritems())
+        equal_to(expected_kvs)(actual_dict2.iteritems())
+      return match
+
+    assert_that(results, matcher(1, some_kvs))
+    pipeline.run()
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.DEBUG)


[2/2] beam git commit: Closes #1837

Posted by ro...@apache.org.
Closes #1837


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c6420df9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c6420df9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c6420df9

Branch: refs/heads/python-sdk
Commit: c6420df9791eb6083fba1f74bd88e06ce8f6a61f
Parents: 4e1028b 2aa7d47
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jan 25 16:18:10 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jan 25 16:18:10 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/dataflow_test.py        | 418 -------------------
 .../apache_beam/transforms/ptransform_test.py   |  67 +++
 .../apache_beam/transforms/sideinputs_test.py   | 208 ++++++++-
 3 files changed, 274 insertions(+), 419 deletions(-)
----------------------------------------------------------------------