You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/26 00:18:21 UTC
[1/2] beam git commit: Revert "Revert "Remove dataflow_test.py""
Repository: beam
Updated Branches:
refs/heads/python-sdk 4e1028b3d -> c6420df97
Revert "Revert "Remove dataflow_test.py""
This reverts commit 96fcc7d31c2540f867c3a73903c2aa99183a6b8b.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2aa7d47e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2aa7d47e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2aa7d47e
Branch: refs/heads/python-sdk
Commit: 2aa7d47e1491e0601b7b4d1476a8f182b2a14dc3
Parents: 4e1028b
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jan 24 16:33:55 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jan 25 16:18:09 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/dataflow_test.py | 418 -------------------
.../apache_beam/transforms/ptransform_test.py | 67 +++
.../apache_beam/transforms/sideinputs_test.py | 208 ++++++++-
3 files changed, 274 insertions(+), 419 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2aa7d47e/sdks/python/apache_beam/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py
deleted file mode 100644
index f410230..0000000
--- a/sdks/python/apache_beam/dataflow_test.py
+++ /dev/null
@@ -1,418 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Integration tests for the dataflow package."""
-
-from __future__ import absolute_import
-
-import logging
-import re
-import unittest
-
-import apache_beam as beam
-from apache_beam.pvalue import AsDict
-from apache_beam.pvalue import AsIter as AllOf
-from apache_beam.pvalue import AsList
-from apache_beam.pvalue import AsSingleton
-from apache_beam.pvalue import EmptySideInput
-from apache_beam.pvalue import SideOutputValue
-from apache_beam.test_pipeline import TestPipeline
-from apache_beam.transforms import Create
-from apache_beam.transforms import DoFn
-from apache_beam.transforms import FlatMap
-from apache_beam.transforms import GroupByKey
-from apache_beam.transforms import Map
-from apache_beam.transforms import ParDo
-from apache_beam.transforms import WindowInto
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
-from apache_beam.transforms.window import IntervalWindow
-from apache_beam.transforms.window import WindowFn
-from nose.plugins.attrib import attr
-
-
-class DataflowTest(unittest.TestCase):
- """Dataflow integration tests."""
-
- SAMPLE_DATA = ['aa bb cc aa bb aa \n'] * 10
- SAMPLE_RESULT = [('cc', 10), ('bb', 20), ('aa', 30)]
-
- @beam.ptransform_fn
- def Count(pcoll): # pylint: disable=invalid-name, no-self-argument
- """A Count transform: v, ... => (v, n), ..."""
- return (pcoll
- | 'AddCount' >> Map(lambda x: (x, 1))
- | 'GroupCounts' >> GroupByKey()
- | 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones))))
-
- @attr('ValidatesRunner')
- def test_word_count(self):
- pipeline = TestPipeline()
- lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA)
- result = (
- (lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x)))
- .apply('CountWords', DataflowTest.Count))
- assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_map(self):
- pipeline = TestPipeline()
- lines = pipeline | 'input' >> Create(['a', 'b', 'c'])
- result = (lines
- | 'upper' >> Map(str.upper)
- | 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-'))
- assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_par_do_with_side_input_as_arg(self):
- pipeline = TestPipeline()
- words_list = ['aa', 'bb', 'cc']
- words = pipeline | 'SomeWords' >> Create(words_list)
- prefix = pipeline | 'SomeString' >> Create(['xyz']) # side in
- suffix = 'zyx'
- result = words | FlatMap(
- 'DecorateWords',
- lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
- AsSingleton(prefix), suffix)
- assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list]))
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_par_do_with_side_input_as_keyword_arg(self):
- pipeline = TestPipeline()
- words_list = ['aa', 'bb', 'cc']
- words = pipeline | 'SomeWords' >> Create(words_list)
- prefix = 'zyx'
- suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in
- result = words | FlatMap(
- 'DecorateWords',
- lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
- prefix, sfx=AsSingleton(suffix))
- assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_par_do_with_do_fn_object(self):
- class SomeDoFn(DoFn):
- """A custom DoFn for a FlatMap transform."""
-
- def process(self, context, prefix, suffix):
- return ['%s-%s-%s' % (prefix, context.element, suffix)]
-
- pipeline = TestPipeline()
- words_list = ['aa', 'bb', 'cc']
- words = pipeline | 'SomeWords' >> Create(words_list)
- prefix = 'zyx'
- suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in
- result = words | 'DecorateWordsDoFn' >> ParDo(
- SomeDoFn(), prefix, suffix=AsSingleton(suffix))
- assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_par_do_with_multiple_outputs_and_using_yield(self):
- class SomeDoFn(DoFn):
- """A custom DoFn using yield."""
-
- def process(self, context):
- yield context.element
- if context.element % 2 == 0:
- yield SideOutputValue('even', context.element)
- else:
- yield SideOutputValue('odd', context.element)
-
- pipeline = TestPipeline()
- nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
- results = nums | ParDo(
- 'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
- assert_that(results.main, equal_to([1, 2, 3, 4]))
- assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
- assert_that(results.even, equal_to([2, 4]), label='assert:even')
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_par_do_with_multiple_outputs_and_using_return(self):
- def some_fn(v):
- if v % 2 == 0:
- return [v, SideOutputValue('even', v)]
- else:
- return [v, SideOutputValue('odd', v)]
-
- pipeline = TestPipeline()
- nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
- results = nums | FlatMap(
- 'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
- assert_that(results.main, equal_to([1, 2, 3, 4]))
- assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
- assert_that(results.even, equal_to([2, 4]), label='assert:even')
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_empty_singleton_side_input(self):
- pipeline = TestPipeline()
- pcol = pipeline | 'start' >> Create([1, 2])
- side = pipeline | 'side' >> Create([]) # Empty side input.
-
- def my_fn(k, s):
- v = ('empty' if isinstance(s, EmptySideInput) else 'full')
- return [(k, v)]
- result = pcol | 'compute' >> FlatMap(my_fn, AsSingleton(side))
- assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
- pipeline.run()
-
- # @attr('ValidatesRunner')
- # TODO(BEAM-1124): Temporarily disable it due to test failed running on
- # Dataflow service.
- def test_multi_valued_singleton_side_input(self):
- pipeline = TestPipeline()
- pcol = pipeline | 'start' >> Create([1, 2])
- side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input.
- pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side)) # pylint: disable=expression-not-assigned
- with self.assertRaises(ValueError):
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_default_value_singleton_side_input(self):
- pipeline = TestPipeline()
- pcol = pipeline | 'start' >> Create([1, 2])
- side = pipeline | 'side' >> Create([]) # 0 values in side input.
- result = pcol | FlatMap(lambda x, s: [x * s], AsSingleton(side, 10))
- assert_that(result, equal_to([10, 20]))
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_iterable_side_input(self):
- pipeline = TestPipeline()
- pcol = pipeline | 'start' >> Create([1, 2])
- side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input.
- result = pcol | FlatMap('compute',
- lambda x, s: [x * y for y in s], AllOf(side))
- assert_that(result, equal_to([3, 4, 6, 8]))
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_undeclared_side_outputs(self):
- pipeline = TestPipeline()
- nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
- results = nums | FlatMap(
- 'ClassifyNumbers',
- lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
- ).with_outputs()
- assert_that(results[None], equal_to([1, 2, 3, 4]))
- assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
- assert_that(results.even, equal_to([2, 4]), label='assert:even')
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_empty_side_outputs(self):
- pipeline = TestPipeline()
- nums = pipeline | 'Some Numbers' >> Create([1, 3, 5])
- results = nums | FlatMap(
- 'ClassifyNumbers',
- lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
- ).with_outputs()
- assert_that(results[None], equal_to([1, 3, 5]))
- assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd')
- assert_that(results.even, equal_to([]), label='assert:even')
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_as_list_and_as_dict_side_inputs(self):
- a_list = [5, 1, 3, 2, 9]
- some_pairs = [('crouton', 17), ('supreme', None)]
- pipeline = TestPipeline()
- main_input = pipeline | 'main input' >> Create([1])
- side_list = pipeline | 'side list' >> Create(a_list)
- side_pairs = pipeline | 'side pairs' >> Create(some_pairs)
- results = main_input | FlatMap(
- 'concatenate',
- lambda x, the_list, the_dict: [[x, the_list, the_dict]],
- AsList(side_list), AsDict(side_pairs))
-
- def matcher(expected_elem, expected_list, expected_pairs):
- def match(actual):
- [[actual_elem, actual_list, actual_dict]] = actual
- equal_to([expected_elem])([actual_elem])
- equal_to(expected_list)(actual_list)
- equal_to(expected_pairs)(actual_dict.iteritems())
- return match
-
- assert_that(results, matcher(1, a_list, some_pairs))
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_as_singleton_without_unique_labels(self):
- # This should succeed as calling AsSingleton on the same PCollection twice
- # with the same defaults will return the same PCollectionView.
- a_list = [2]
- pipeline = TestPipeline()
- main_input = pipeline | 'main input' >> Create([1])
- side_list = pipeline | 'side list' >> Create(a_list)
- results = main_input | FlatMap(
- 'test',
- lambda x, s1, s2: [[x, s1, s2]],
- AsSingleton(side_list), AsSingleton(side_list))
-
- def matcher(expected_elem, expected_singleton):
- def match(actual):
- [[actual_elem, actual_singleton1, actual_singleton2]] = actual
- equal_to([expected_elem])([actual_elem])
- equal_to([expected_singleton])([actual_singleton1])
- equal_to([expected_singleton])([actual_singleton2])
- return match
-
- assert_that(results, matcher(1, 2))
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_as_singleton_with_different_defaults_without_unique_labels(self):
- # This should fail as AsSingleton with distinct default values should create
- # distinct PCollectionViews with the same full_label.
- a_list = [2]
- pipeline = TestPipeline()
- main_input = pipeline | 'main input' >> Create([1])
- side_list = pipeline | 'side list' >> Create(a_list)
-
- with self.assertRaises(RuntimeError) as e:
- _ = main_input | FlatMap(
- 'test',
- lambda x, s1, s2: [[x, s1, s2]],
- AsSingleton(side_list), AsSingleton(side_list, default_value=3))
- self.assertTrue(
- e.exception.message.startswith(
- 'Transform "ViewAsSingleton(side list.None)" does not have a '
- 'stable unique label.'))
-
- @attr('ValidatesRunner')
- def test_as_singleton_with_different_defaults_with_unique_labels(self):
- a_list = []
- pipeline = TestPipeline()
- main_input = pipeline | 'main input' >> Create([1])
- side_list = pipeline | 'side list' >> Create(a_list)
- results = main_input | FlatMap(
- 'test',
- lambda x, s1, s2: [[x, s1, s2]],
- AsSingleton('si1', side_list, default_value=2),
- AsSingleton('si2', side_list, default_value=3))
-
- def matcher(expected_elem, expected_singleton1, expected_singleton2):
- def match(actual):
- [[actual_elem, actual_singleton1, actual_singleton2]] = actual
- equal_to([expected_elem])([actual_elem])
- equal_to([expected_singleton1])([actual_singleton1])
- equal_to([expected_singleton2])([actual_singleton2])
- return match
-
- assert_that(results, matcher(1, 2, 3))
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_as_list_without_unique_labels(self):
- # This should succeed as calling AsList on the same PCollection twice will
- # return the same PCollectionView.
- a_list = [1, 2, 3]
- pipeline = TestPipeline()
- main_input = pipeline | 'main input' >> Create([1])
- side_list = pipeline | 'side list' >> Create(a_list)
- results = main_input | FlatMap(
- 'test',
- lambda x, ls1, ls2: [[x, ls1, ls2]],
- AsList(side_list), AsList(side_list))
-
- def matcher(expected_elem, expected_list):
- def match(actual):
- [[actual_elem, actual_list1, actual_list2]] = actual
- equal_to([expected_elem])([actual_elem])
- equal_to(expected_list)(actual_list1)
- equal_to(expected_list)(actual_list2)
- return match
-
- assert_that(results, matcher(1, [1, 2, 3]))
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_as_list_with_unique_labels(self):
- a_list = [1, 2, 3]
- pipeline = TestPipeline()
- main_input = pipeline | 'main input' >> Create([1])
- side_list = pipeline | 'side list' >> Create(a_list)
- results = main_input | FlatMap(
- 'test',
- lambda x, ls1, ls2: [[x, ls1, ls2]],
- AsList(side_list), AsList(side_list, label='label'))
-
- def matcher(expected_elem, expected_list):
- def match(actual):
- [[actual_elem, actual_list1, actual_list2]] = actual
- equal_to([expected_elem])([actual_elem])
- equal_to(expected_list)(actual_list1)
- equal_to(expected_list)(actual_list2)
- return match
-
- assert_that(results, matcher(1, [1, 2, 3]))
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_as_dict_with_unique_labels(self):
- some_kvs = [('a', 1), ('b', 2)]
- pipeline = TestPipeline()
- main_input = pipeline | 'main input' >> Create([1])
- side_kvs = pipeline | 'side kvs' >> Create(some_kvs)
- results = main_input | FlatMap(
- 'test',
- lambda x, dct1, dct2: [[x, dct1, dct2]],
- AsDict(side_kvs), AsDict(side_kvs, label='label'))
-
- def matcher(expected_elem, expected_kvs):
- def match(actual):
- [[actual_elem, actual_dict1, actual_dict2]] = actual
- equal_to([expected_elem])([actual_elem])
- equal_to(expected_kvs)(actual_dict1.iteritems())
- equal_to(expected_kvs)(actual_dict2.iteritems())
- return match
-
- assert_that(results, matcher(1, some_kvs))
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_window_transform(self):
- class TestWindowFn(WindowFn):
- """Windowing function adding two disjoint windows to each element."""
-
- def assign(self, assign_context):
- _ = assign_context
- return [IntervalWindow(10, 20), IntervalWindow(20, 30)]
-
- def merge(self, existing_windows):
- return existing_windows
-
- pipeline = TestPipeline()
- numbers = pipeline | 'KVs' >> Create([(1, 10), (2, 20), (3, 30)])
- result = (numbers
- | 'W' >> WindowInto(windowfn=TestWindowFn())
- | 'G' >> GroupByKey())
- assert_that(
- result, equal_to([(1, [10]), (1, [10]), (2, [20]),
- (2, [20]), (3, [30]), (3, [30])]))
- pipeline.run()
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/2aa7d47e/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 827bc83..68e4482 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -24,6 +24,7 @@ import re
import unittest
import hamcrest as hc
+from nose.plugins.attrib import attr
import apache_beam as beam
from apache_beam.test_pipeline import TestPipeline
@@ -189,6 +190,72 @@ class PTransformTest(unittest.TestCase):
assert_that(r2.m, equal_to([3, 4, 5]), label='r2')
pipeline.run()
+ @attr('ValidatesRunner')
+ def test_par_do_with_multiple_outputs_and_using_yield(self):
+ class SomeDoFn(beam.DoFn):
+ """A custom DoFn using yield."""
+
+ def process(self, context):
+ yield context.element
+ if context.element % 2 == 0:
+ yield pvalue.SideOutputValue('even', context.element)
+ else:
+ yield pvalue.SideOutputValue('odd', context.element)
+
+ pipeline = TestPipeline()
+ nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
+ results = nums | beam.ParDo(
+ 'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
+ assert_that(results.main, equal_to([1, 2, 3, 4]))
+ assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
+ assert_that(results.even, equal_to([2, 4]), label='assert:even')
+ pipeline.run()
+
+ @attr('ValidatesRunner')
+ def test_par_do_with_multiple_outputs_and_using_return(self):
+ def some_fn(v):
+ if v % 2 == 0:
+ return [v, pvalue.SideOutputValue('even', v)]
+ else:
+ return [v, pvalue.SideOutputValue('odd', v)]
+
+ pipeline = TestPipeline()
+ nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
+ results = nums | beam.FlatMap(
+ 'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
+ assert_that(results.main, equal_to([1, 2, 3, 4]))
+ assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
+ assert_that(results.even, equal_to([2, 4]), label='assert:even')
+ pipeline.run()
+
+ @attr('ValidatesRunner')
+ def test_undeclared_side_outputs(self):
+ pipeline = TestPipeline()
+ nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
+ results = nums | beam.FlatMap(
+ 'ClassifyNumbers',
+ lambda x: [x,
+ pvalue.SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
+ ).with_outputs()
+ assert_that(results[None], equal_to([1, 2, 3, 4]))
+ assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
+ assert_that(results.even, equal_to([2, 4]), label='assert:even')
+ pipeline.run()
+
+ @attr('ValidatesRunner')
+ def test_empty_side_outputs(self):
+ pipeline = TestPipeline()
+ nums = pipeline | 'Some Numbers' >> beam.Create([1, 3, 5])
+ results = nums | beam.FlatMap(
+ 'ClassifyNumbers',
+ lambda x: [x,
+ pvalue.SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
+ ).with_outputs()
+ assert_that(results[None], equal_to([1, 3, 5]))
+ assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd')
+ assert_that(results.even, equal_to([]), label='assert:even')
+ pipeline.run()
+
def test_do_requires_do_fn_returning_iterable(self):
# This function is incorrect because it returns an object that isn't an
# iterable.
http://git-wip-us.apache.org/repos/asf/beam/blob/2aa7d47e/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index a327dc8..9384e7b 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -20,7 +20,10 @@
import logging
import unittest
+from nose.plugins.attrib import attr
+
import apache_beam as beam
+from apache_beam.test_pipeline import TestPipeline
from apache_beam.transforms import window
from apache_beam.transforms.util import assert_that, equal_to
@@ -28,7 +31,7 @@ from apache_beam.transforms.util import assert_that, equal_to
class SideInputsTest(unittest.TestCase):
def create_pipeline(self):
- return beam.Pipeline('DirectRunner')
+ return TestPipeline('DirectRunner')
def run_windowed_side_inputs(self, elements, main_window_fn,
side_window_fn=None,
@@ -125,6 +128,209 @@ class SideInputsTest(unittest.TestCase):
(11, {'k11': 'v11'}),
])
+ @attr('ValidatesRunner')
+ def test_empty_singleton_side_input(self):
+ pipeline = self.create_pipeline()
+ pcol = pipeline | 'start' >> beam.Create([1, 2])
+ side = pipeline | 'side' >> beam.Create([]) # Empty side input.
+
+ def my_fn(k, s):
+ # TODO(robertwb): Should this be an error as in Java?
+ v = ('empty' if isinstance(s, beam.pvalue.EmptySideInput) else 'full')
+ return [(k, v)]
+ result = pcol | 'compute' >> beam.FlatMap(
+ my_fn, beam.pvalue.AsSingleton(side))
+ assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
+ pipeline.run()
+
+ # @attr('ValidatesRunner')
+ # TODO(BEAM-1124): Temporarily disable it due to test failed running on
+ # Dataflow service.
+ def test_multi_valued_singleton_side_input(self):
+ pipeline = self.create_pipeline()
+ pcol = pipeline | 'start' >> beam.Create([1, 2])
+ side = pipeline | 'side' >> beam.Create([3, 4]) # 2 values in side input.
+ pcol | 'compute' >> beam.FlatMap( # pylint: disable=expression-not-assigned
+ lambda x, s: [x * s], beam.pvalue.AsSingleton(side))
+ with self.assertRaises(ValueError):
+ pipeline.run()
+
+ @attr('ValidatesRunner')
+ def test_default_value_singleton_side_input(self):
+ pipeline = self.create_pipeline()
+ pcol = pipeline | 'start' >> beam.Create([1, 2])
+ side = pipeline | 'side' >> beam.Create([]) # 0 values in side input.
+ result = pcol | beam.FlatMap(
+ lambda x, s: [x * s], beam.pvalue.AsSingleton(side, 10))
+ assert_that(result, equal_to([10, 20]))
+ pipeline.run()
+
+ @attr('ValidatesRunner')
+ def test_iterable_side_input(self):
+ pipeline = self.create_pipeline()
+ pcol = pipeline | 'start' >> beam.Create([1, 2])
+ side = pipeline | 'side' >> beam.Create([3, 4]) # 2 values in side input.
+ result = pcol | 'compute' >> beam.FlatMap(
+ lambda x, s: [x * y for y in s],
+ beam.pvalue.AsIter(side))
+ assert_that(result, equal_to([3, 4, 6, 8]))
+ pipeline.run()
+
+ @attr('ValidatesRunner')
+ def test_as_list_and_as_dict_side_inputs(self):
+ a_list = [5, 1, 3, 2, 9]
+ some_pairs = [('crouton', 17), ('supreme', None)]
+ pipeline = self.create_pipeline()
+ main_input = pipeline | 'main input' >> beam.Create([1])
+ side_list = pipeline | 'side list' >> beam.Create(a_list)
+ side_pairs = pipeline | 'side pairs' >> beam.Create(some_pairs)
+ results = main_input | 'concatenate' >> beam.FlatMap(
+ lambda x, the_list, the_dict: [[x, the_list, the_dict]],
+ beam.pvalue.AsList(side_list), beam.pvalue.AsDict(side_pairs))
+
+ def matcher(expected_elem, expected_list, expected_pairs):
+ def match(actual):
+ [[actual_elem, actual_list, actual_dict]] = actual
+ equal_to([expected_elem])([actual_elem])
+ equal_to(expected_list)(actual_list)
+ equal_to(expected_pairs)(actual_dict.iteritems())
+ return match
+
+ assert_that(results, matcher(1, a_list, some_pairs))
+ pipeline.run()
+
+ @attr('ValidatesRunner')
+ def test_as_singleton_without_unique_labels(self):
+ # This should succeed as calling beam.pvalue.AsSingleton on the same
+ # PCollection twice with the same defaults will return the same
+ # PCollectionView.
+ a_list = [2]
+ pipeline = self.create_pipeline()
+ main_input = pipeline | 'main input' >> beam.Create([1])
+ side_list = pipeline | 'side list' >> beam.Create(a_list)
+ results = main_input | beam.FlatMap(
+ lambda x, s1, s2: [[x, s1, s2]],
+ beam.pvalue.AsSingleton(side_list), beam.pvalue.AsSingleton(side_list))
+
+ def matcher(expected_elem, expected_singleton):
+ def match(actual):
+ [[actual_elem, actual_singleton1, actual_singleton2]] = actual
+ equal_to([expected_elem])([actual_elem])
+ equal_to([expected_singleton])([actual_singleton1])
+ equal_to([expected_singleton])([actual_singleton2])
+ return match
+
+ assert_that(results, matcher(1, 2))
+ pipeline.run()
+
+ @attr('ValidatesRunner')
+ def test_as_singleton_with_different_defaults_without_unique_labels(self):
+ # This should fail as beam.pvalue.AsSingleton with distinct default values
+ # should beam.Create distinct PCollectionViews with the same full_label.
+ a_list = [2]
+ pipeline = self.create_pipeline()
+ main_input = pipeline | 'main input' >> beam.Create([1])
+ side_list = pipeline | 'side list' >> beam.Create(a_list)
+
+ with self.assertRaises(RuntimeError) as e:
+ _ = main_input | beam.FlatMap(
+ lambda x, s1, s2: [[x, s1, s2]],
+ beam.pvalue.AsSingleton(side_list),
+ beam.pvalue.AsSingleton(side_list, default_value=3))
+ self.assertTrue(
+ e.exception.message.startswith(
+ 'Transform "ViewAsSingleton(side list.None)" does not have a '
+ 'stable unique label.'))
+
+ @attr('ValidatesRunner')
+ def test_as_singleton_with_different_defaults_with_unique_labels(self):
+ a_list = []
+ pipeline = self.create_pipeline()
+ main_input = pipeline | 'main input' >> beam.Create([1])
+ side_list = pipeline | 'side list' >> beam.Create(a_list)
+ results = main_input | beam.FlatMap(
+ lambda x, s1, s2: [[x, s1, s2]],
+ beam.pvalue.AsSingleton('si1', side_list, default_value=2),
+ beam.pvalue.AsSingleton('si2', side_list, default_value=3))
+
+ def matcher(expected_elem, expected_singleton1, expected_singleton2):
+ def match(actual):
+ [[actual_elem, actual_singleton1, actual_singleton2]] = actual
+ equal_to([expected_elem])([actual_elem])
+ equal_to([expected_singleton1])([actual_singleton1])
+ equal_to([expected_singleton2])([actual_singleton2])
+ return match
+
+ assert_that(results, matcher(1, 2, 3))
+ pipeline.run()
+
+ @attr('ValidatesRunner')
+ def test_as_list_without_unique_labels(self):
+ # This should succeed as calling beam.pvalue.AsList on the same
+ # PCollection twice will return the same PCollectionView.
+ a_list = [1, 2, 3]
+ pipeline = self.create_pipeline()
+ main_input = pipeline | 'main input' >> beam.Create([1])
+ side_list = pipeline | 'side list' >> beam.Create(a_list)
+ results = main_input | beam.FlatMap(
+ lambda x, ls1, ls2: [[x, ls1, ls2]],
+ beam.pvalue.AsList(side_list), beam.pvalue.AsList(side_list))
+
+ def matcher(expected_elem, expected_list):
+ def match(actual):
+ [[actual_elem, actual_list1, actual_list2]] = actual
+ equal_to([expected_elem])([actual_elem])
+ equal_to(expected_list)(actual_list1)
+ equal_to(expected_list)(actual_list2)
+ return match
+
+ assert_that(results, matcher(1, [1, 2, 3]))
+ pipeline.run()
+
+ @attr('ValidatesRunner')
+ def test_as_list_with_unique_labels(self):
+ a_list = [1, 2, 3]
+ pipeline = self.create_pipeline()
+ main_input = pipeline | 'main input' >> beam.Create([1])
+ side_list = pipeline | 'side list' >> beam.Create(a_list)
+ results = main_input | beam.FlatMap(
+ lambda x, ls1, ls2: [[x, ls1, ls2]],
+ beam.pvalue.AsList(side_list),
+ beam.pvalue.AsList(side_list, label='label'))
+
+ def matcher(expected_elem, expected_list):
+ def match(actual):
+ [[actual_elem, actual_list1, actual_list2]] = actual
+ equal_to([expected_elem])([actual_elem])
+ equal_to(expected_list)(actual_list1)
+ equal_to(expected_list)(actual_list2)
+ return match
+
+ assert_that(results, matcher(1, [1, 2, 3]))
+ pipeline.run()
+
+ @attr('ValidatesRunner')
+ def test_as_dict_with_unique_labels(self):
+ some_kvs = [('a', 1), ('b', 2)]
+ pipeline = self.create_pipeline()
+ main_input = pipeline | 'main input' >> beam.Create([1])
+ side_kvs = pipeline | 'side kvs' >> beam.Create(some_kvs)
+ results = main_input | beam.FlatMap(
+ lambda x, dct1, dct2: [[x, dct1, dct2]],
+ beam.pvalue.AsDict(side_kvs),
+ beam.pvalue.AsDict(side_kvs, label='label'))
+
+ def matcher(expected_elem, expected_kvs):
+ def match(actual):
+ [[actual_elem, actual_dict1, actual_dict2]] = actual
+ equal_to([expected_elem])([actual_elem])
+ equal_to(expected_kvs)(actual_dict1.iteritems())
+ equal_to(expected_kvs)(actual_dict2.iteritems())
+ return match
+
+ assert_that(results, matcher(1, some_kvs))
+ pipeline.run()
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
[2/2] beam git commit: Closes #1837
Posted by ro...@apache.org.
Closes #1837
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c6420df9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c6420df9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c6420df9
Branch: refs/heads/python-sdk
Commit: c6420df9791eb6083fba1f74bd88e06ce8f6a61f
Parents: 4e1028b 2aa7d47
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jan 25 16:18:10 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jan 25 16:18:10 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/dataflow_test.py | 418 -------------------
.../apache_beam/transforms/ptransform_test.py | 67 +++
.../apache_beam/transforms/sideinputs_test.py | 208 ++++++++-
3 files changed, 274 insertions(+), 419 deletions(-)
----------------------------------------------------------------------