You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:13:05 UTC
[30/50] [abbrv] incubator-beam git commit: Move all files to
apache_beam folder
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
new file mode 100644
index 0000000..00b6c8d
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -0,0 +1,1814 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for the PTransform and descendants."""
+
+from __future__ import absolute_import
+
+import operator
+import re
+import unittest
+
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.pipeline import Pipeline
+import google.cloud.dataflow.pvalue as pvalue
+import google.cloud.dataflow.transforms.combiners as combine
+from google.cloud.dataflow.transforms.ptransform import PTransform
+from google.cloud.dataflow.transforms.util import assert_that, equal_to
+import google.cloud.dataflow.typehints as typehints
+from google.cloud.dataflow.typehints import with_input_types
+from google.cloud.dataflow.typehints import with_output_types
+from google.cloud.dataflow.typehints.typehints_test import TypeHintTestCase
+from google.cloud.dataflow.utils.options import PipelineOptions
+from google.cloud.dataflow.utils.options import TypeOptions
+
+
+# Disable frequent lint warning due to pipe operator for chaining transforms.
+# pylint: disable=expression-not-assigned
+
+
+class PTransformTest(unittest.TestCase):
+
+ def assertStartswith(self, msg, prefix):
+ self.assertTrue(msg.startswith(prefix),
+ '"%s" does not start with "%s"' % (msg, prefix))
+
+ def test_str(self):
+ self.assertEqual('<PTransform(PTransform) label=[PTransform]>',
+ str(PTransform()))
+
+ pa = Pipeline('DirectPipelineRunner')
+ res = pa | df.Create('a_label', [1, 2])
+ self.assertEqual('<Create(PTransform) label=[a_label]>',
+ str(res.producer.transform))
+
+ pc = Pipeline('DirectPipelineRunner')
+ res = pc | df.Create('with_inputs', [1, 2])
+ inputs_tr = res.producer.transform
+ inputs_tr.inputs = ('ci',)
+ self.assertEqual(
+ """<Create(PTransform) label=[with_inputs] inputs=('ci',)>""",
+ str(inputs_tr))
+
+ pd = Pipeline('DirectPipelineRunner')
+ res = pd | df.Create('with_sidei', [1, 2])
+ side_tr = res.producer.transform
+ side_tr.side_inputs = (4,)
+ self.assertEqual(
+ '<Create(PTransform) label=[with_sidei] side_inputs=(4,)>',
+ str(side_tr))
+
+ inputs_tr.side_inputs = ('cs',)
+ self.assertEqual(
+ """<Create(PTransform) label=[with_inputs] """
+ """inputs=('ci',) side_inputs=('cs',)>""",
+ str(inputs_tr))
+
+ def test_parse_label_and_arg(self):
+
+ def fun(*args, **kwargs):
+ return PTransform().parse_label_and_arg(args, kwargs, 'name')
+
+ self.assertEqual(('PTransform', 'value'), fun('value'))
+ self.assertEqual(('PTransform', 'value'), fun(name='value'))
+ self.assertEqual(('label', 'value'), fun('label', 'value'))
+ self.assertEqual(('label', 'value'), fun('label', name='value'))
+ self.assertEqual(('label', 'value'), fun('value', label='label'))
+ self.assertEqual(('label', 'value'), fun(name='value', label='label'))
+
+ self.assertRaises(ValueError, fun)
+ self.assertRaises(ValueError, fun, 0, 'value')
+ self.assertRaises(ValueError, fun, label=0, name='value')
+ self.assertRaises(ValueError, fun, other='value')
+
+ with self.assertRaises(ValueError) as cm:
+ fun(0, name='value')
+ self.assertEqual(
+ cm.exception.message,
+ 'PTransform expects a (label, name) or (name) argument list '
+ 'instead of args=(0,), kwargs={\'name\': \'value\'}')
+
+ def test_do_with_do_fn(self):
+ class AddNDoFn(df.DoFn):
+
+ def process(self, context, addon):
+ return [context.element + addon]
+
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', [1, 2, 3])
+ result = pcoll | df.ParDo('do', AddNDoFn(), 10)
+ assert_that(result, equal_to([11, 12, 13]))
+ pipeline.run()
+
+ def test_do_with_unconstructed_do_fn(self):
+ class MyDoFn(df.DoFn):
+
+ def process(self, context):
+ pass
+
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', [1, 2, 3])
+ with self.assertRaises(ValueError):
+ pcoll | df.ParDo('do', MyDoFn) # Note the lack of ()'s
+
+ def test_do_with_callable(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', [1, 2, 3])
+ result = pcoll | df.FlatMap('do', lambda x, addon: [x + addon], 10)
+ assert_that(result, equal_to([11, 12, 13]))
+ pipeline.run()
+
+ def test_do_with_side_input_as_arg(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ side = pipeline | df.Create('side', [10])
+ pcoll = pipeline | df.Create('start', [1, 2, 3])
+ result = pcoll | df.FlatMap(
+ 'do', lambda x, addon: [x + addon], pvalue.AsSingleton(side))
+ assert_that(result, equal_to([11, 12, 13]))
+ pipeline.run()
+
+ def test_do_with_side_input_as_keyword_arg(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ side = pipeline | df.Create('side', [10])
+ pcoll = pipeline | df.Create('start', [1, 2, 3])
+ result = pcoll | df.FlatMap(
+ 'do', lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side))
+ assert_that(result, equal_to([11, 12, 13]))
+ pipeline.run()
+
+ def test_do_with_do_fn_returning_string_raises_warning(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', ['2', '9', '3'])
+ pcoll | df.FlatMap('do', lambda x: x + '1')
+
+ # Since the DoFn directly returns a string we should get an error warning
+ # us.
+ with self.assertRaises(typehints.TypeCheckError) as cm:
+ pipeline.run()
+
+ expected_error_prefix = ('Returning a str from a ParDo or FlatMap '
+ 'is discouraged.')
+ self.assertStartswith(cm.exception.message, expected_error_prefix)
+
+ def test_do_with_do_fn_returning_dict_raises_warning(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', ['2', '9', '3'])
+ pcoll | df.FlatMap('do', lambda x: {x: '1'})
+
+ # Since the DoFn directly returns a dict we should get an error warning
+ # us.
+ with self.assertRaises(typehints.TypeCheckError) as cm:
+ pipeline.run()
+
+ expected_error_prefix = ('Returning a dict from a ParDo or FlatMap '
+ 'is discouraged.')
+ self.assertStartswith(cm.exception.message, expected_error_prefix)
+
+ def test_do_with_side_outputs_maintains_unique_name(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', [1, 2, 3])
+ r1 = pcoll | df.FlatMap('a', lambda x: [x + 1]).with_outputs(main='m')
+ r2 = pcoll | df.FlatMap('b', lambda x: [x + 2]).with_outputs(main='m')
+ assert_that(r1.m, equal_to([2, 3, 4]), label='r1')
+ assert_that(r2.m, equal_to([3, 4, 5]), label='r2')
+ pipeline.run()
+
+ def test_do_requires_do_fn_returning_iterable(self):
+ # This function is incorrect because it returns an object that isn't an
+ # iterable.
+ def incorrect_par_do_fn(x):
+ return x + 5
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', [2, 9, 3])
+ pcoll | df.FlatMap('do', incorrect_par_do_fn)
+ # It's a requirement that all user-defined functions to a ParDo return
+ # an iterable.
+ with self.assertRaises(typehints.TypeCheckError) as cm:
+ pipeline.run()
+
+ expected_error_prefix = 'FlatMap and ParDo must return an iterable.'
+ self.assertStartswith(cm.exception.message, expected_error_prefix)
+
+ def test_do_fn_with_start_finish(self):
+ class MyDoFn(df.DoFn):
+ def start_bundle(self, c):
+ yield 'start'
+ def process(self, c):
+ pass
+ def finish_bundle(self, c):
+ yield 'finish'
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', [1, 2, 3])
+ result = pcoll | df.ParDo('do', MyDoFn())
+
+ # May have many bundles, but each has a start and finish.
+ def matcher():
+ def match(actual):
+ equal_to(['start', 'finish'])(list(set(actual)))
+ equal_to([actual.count('start')])([actual.count('finish')])
+ return match
+
+ assert_that(result, matcher())
+ pipeline.run()
+
+ def test_filter(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', [1, 2, 3, 4])
+ result = pcoll | df.Filter(
+ 'filter', lambda x: x % 2 == 0)
+ assert_that(result, equal_to([2, 4]))
+ pipeline.run()
+
+ class _MeanCombineFn(df.CombineFn):
+
+ def create_accumulator(self):
+ return (0, 0)
+
+ def add_input(self, (sum_, count), element):
+ return sum_ + element, count + 1
+
+ def merge_accumulators(self, accumulators):
+ sums, counts = zip(*accumulators)
+ return sum(sums), sum(counts)
+
+ def extract_output(self, (sum_, count)):
+ if not count:
+ return float('nan')
+ return sum_ / float(count)
+
+ def test_combine_with_combine_fn(self):
+ vals = [1, 2, 3, 4, 5, 6, 7]
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', vals)
+ result = pcoll | df.CombineGlobally('mean', self._MeanCombineFn())
+ assert_that(result, equal_to([sum(vals) / len(vals)]))
+ pipeline.run()
+
+ def test_combine_with_callable(self):
+ vals = [1, 2, 3, 4, 5, 6, 7]
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', vals)
+ result = pcoll | df.CombineGlobally(sum)
+ assert_that(result, equal_to([sum(vals)]))
+ pipeline.run()
+
+ def test_combine_with_side_input_as_arg(self):
+ values = [1, 2, 3, 4, 5, 6, 7]
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', values)
+ divisor = pipeline | df.Create('divisor', [2])
+ result = pcoll | df.CombineGlobally(
+ 'max',
+ # Multiples of divisor only.
+ lambda vals, d: max(v for v in vals if v % d == 0),
+ pvalue.AsSingleton(divisor)).without_defaults()
+ filt_vals = [v for v in values if v % 2 == 0]
+ assert_that(result, equal_to([max(filt_vals)]))
+ pipeline.run()
+
+ def test_combine_per_key_with_combine_fn(self):
+ vals_1 = [1, 2, 3, 4, 5, 6, 7]
+ vals_2 = [2, 4, 6, 8, 10, 12, 14]
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', ([('a', x) for x in vals_1] +
+ [('b', x) for x in vals_2]))
+ result = pcoll | df.CombinePerKey('mean', self._MeanCombineFn())
+ assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)),
+ ('b', sum(vals_2) / len(vals_2))]))
+ pipeline.run()
+
+ def test_combine_per_key_with_callable(self):
+ vals_1 = [1, 2, 3, 4, 5, 6, 7]
+ vals_2 = [2, 4, 6, 8, 10, 12, 14]
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', ([('a', x) for x in vals_1] +
+ [('b', x) for x in vals_2]))
+ result = pcoll | df.CombinePerKey(sum)
+ assert_that(result, equal_to([('a', sum(vals_1)), ('b', sum(vals_2))]))
+ pipeline.run()
+
+ def test_combine_per_key_with_side_input_as_arg(self):
+ vals_1 = [1, 2, 3, 4, 5, 6, 7]
+ vals_2 = [2, 4, 6, 8, 10, 12, 14]
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', ([('a', x) for x in vals_1] +
+ [('b', x) for x in vals_2]))
+ divisor = pipeline | df.Create('divisor', [2])
+ result = pcoll | df.CombinePerKey(
+ lambda vals, d: max(v for v in vals if v % d == 0),
+ pvalue.AsSingleton(divisor)) # Multiples of divisor only.
+ m_1 = max(v for v in vals_1 if v % 2 == 0)
+ m_2 = max(v for v in vals_2 if v % 2 == 0)
+ assert_that(result, equal_to([('a', m_1), ('b', m_2)]))
+ pipeline.run()
+
+ def test_group_by_key(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create(
+ 'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)])
+ result = pcoll | df.GroupByKey('group')
+ assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
+ pipeline.run()
+
+ def test_partition_with_partition_fn(self):
+
+ class SomePartitionFn(df.PartitionFn):
+
+ def partition_for(self, context, num_partitions, offset):
+ return (context.element % 3) + offset
+
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
+ # Attempt nominal partition operation.
+ partitions = pcoll | df.Partition('part1', SomePartitionFn(), 4, 1)
+ assert_that(partitions[0], equal_to([]))
+ assert_that(partitions[1], equal_to([0, 3, 6]), label='p1')
+ assert_that(partitions[2], equal_to([1, 4, 7]), label='p2')
+ assert_that(partitions[3], equal_to([2, 5, 8]), label='p3')
+ pipeline.run()
+
+ # Check that a bad partition label will yield an error. For the
+ # DirectPipelineRunner, this error manifests as an exception.
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
+ partitions = pcoll | df.Partition('part2', SomePartitionFn(), 4, 10000)
+ with self.assertRaises(ValueError):
+ pipeline.run()
+
+ def test_partition_with_callable(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
+ partitions = (
+ pcoll | df.Partition(
+ 'part',
+ lambda e, n, offset: (e % 3) + offset, 4,
+ 1))
+ assert_that(partitions[0], equal_to([]))
+ assert_that(partitions[1], equal_to([0, 3, 6]), label='p1')
+ assert_that(partitions[2], equal_to([1, 4, 7]), label='p2')
+ assert_that(partitions[3], equal_to([2, 5, 8]), label='p3')
+ pipeline.run()
+
+ def test_partition_followed_by_flatten_and_groupbykey(self):
+ """Regression test for an issue with how partitions are handled."""
+ pipeline = Pipeline('DirectPipelineRunner')
+ contents = [('aa', 1), ('bb', 2), ('aa', 2)]
+ created = pipeline | df.Create('A', contents)
+ partitioned = created | df.Partition('B', lambda x, n: len(x) % n, 3)
+ flattened = partitioned | df.Flatten('C')
+ grouped = flattened | df.GroupByKey('D')
+ assert_that(grouped, equal_to([('aa', [1, 2]), ('bb', [2])]))
+ pipeline.run()
+
+ def test_flatten_pcollections(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll_1 = pipeline | df.Create('start_1', [0, 1, 2, 3])
+ pcoll_2 = pipeline | df.Create('start_2', [4, 5, 6, 7])
+ result = (pcoll_1, pcoll_2) | df.Flatten('flatten')
+ assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))
+ pipeline.run()
+
+ def test_flatten_no_pcollections(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ with self.assertRaises(ValueError):
+ () | df.Flatten('pipeline arg missing')
+ result = () | df.Flatten('empty', pipeline=pipeline)
+ assert_that(result, equal_to([]))
+ pipeline.run()
+
+ def test_flatten_pcollections_in_iterable(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll_1 = pipeline | df.Create('start_1', [0, 1, 2, 3])
+ pcoll_2 = pipeline | df.Create('start_2', [4, 5, 6, 7])
+ result = ([pcoll for pcoll in (pcoll_1, pcoll_2)]
+ | df.Flatten('flatten'))
+ assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))
+ pipeline.run()
+
+ def test_flatten_input_type_must_be_iterable(self):
+ # Inputs to flatten *must* be an iterable.
+ with self.assertRaises(ValueError):
+ 4 | df.Flatten('flatten')
+
+ def test_flatten_input_type_must_be_iterable_of_pcolls(self):
+ # Inputs to flatten *must* be an iterable of PCollections.
+ with self.assertRaises(TypeError):
+ {'l': 'test'} | df.Flatten('flatten')
+ with self.assertRaises(TypeError):
+ set([1, 2, 3]) | df.Flatten('flatten')
+
+ def test_co_group_by_key_on_list(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll_1 = pipeline | df.Create(
+ 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
+ pcoll_2 = pipeline | df.Create(
+ 'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
+ result = (pcoll_1, pcoll_2) | df.CoGroupByKey('cgbk')
+ assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
+ ('b', ([3], [])),
+ ('c', ([4], [7, 8]))]))
+ pipeline.run()
+
+ def test_co_group_by_key_on_iterable(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll_1 = pipeline | df.Create(
+ 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
+ pcoll_2 = pipeline | df.Create(
+ 'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
+ result = ([pc for pc in (pcoll_1, pcoll_2)]
+ | df.CoGroupByKey('cgbk'))
+ assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
+ ('b', ([3], [])),
+ ('c', ([4], [7, 8]))]))
+ pipeline.run()
+
+ def test_co_group_by_key_on_dict(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll_1 = pipeline | df.Create(
+ 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
+ pcoll_2 = pipeline | df.Create(
+ 'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
+ result = {'X': pcoll_1, 'Y': pcoll_2} | df.CoGroupByKey('cgbk')
+ assert_that(result, equal_to([('a', {'X': [1, 2], 'Y': [5, 6]}),
+ ('b', {'X': [3], 'Y': []}),
+ ('c', {'X': [4], 'Y': [7, 8]})]))
+ pipeline.run()
+
+ def test_group_by_key_input_must_be_kv_pairs(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcolls = pipeline | df.Create('A', [1, 2, 3, 4, 5])
+
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ pcolls | df.GroupByKey('D')
+ pipeline.run()
+
+ self.assertStartswith(
+ e.exception.message,
+ 'Runtime type violation detected within '
+ 'ParDo(D/reify_windows): Input to GroupByKey must be '
+ 'a PCollection with elements compatible with KV[A, B]')
+
+ def test_group_by_key_only_input_must_be_kv_pairs(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcolls = pipeline | df.Create('A', ['a', 'b', 'f'])
+ with self.assertRaises(typehints.TypeCheckError) as cm:
+ pcolls | df.GroupByKeyOnly('D')
+ pipeline.run()
+
+ expected_error_prefix = ('Input to GroupByKeyOnly must be a PCollection of '
+ 'windowed key-value pairs.')
+ self.assertStartswith(cm.exception.message, expected_error_prefix)
+
+ def test_keys_and_values(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create(
+ 'start', [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)])
+ keys = pcoll.apply('keys', df.Keys())
+ vals = pcoll.apply('vals', df.Values())
+ assert_that(keys, equal_to([1, 2, 2, 3, 3, 3]), label='assert:keys')
+ assert_that(vals, equal_to([1, 1, 1, 2, 2, 3]), label='assert:vals')
+ pipeline.run()
+
+ def test_kv_swap(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create(
+ 'start', [(6, 3), (1, 2), (7, 1), (5, 2), (3, 2)])
+ result = pcoll.apply('swap', df.KvSwap())
+ assert_that(result, equal_to([(1, 7), (2, 1), (2, 3), (2, 5), (3, 6)]))
+ pipeline.run()
+
+ def test_remove_duplicates(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create(
+ 'start', [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
+ result = pcoll.apply('nodupes', df.RemoveDuplicates())
+ assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
+ pipeline.run()
+
+ def test_chained_ptransforms(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ t = (df.Map(lambda x: (x, 1))
+ | df.GroupByKey()
+ | df.Map(lambda (x, ones): (x, sum(ones))))
+ result = pipeline | df.Create('start', ['a', 'a', 'b']) | t
+ assert_that(result, equal_to([('a', 2), ('b', 1)]))
+ pipeline.run()
+
+ def test_apply_to_list(self):
+ self.assertEqual([1, 2, 3], [0, 1, 2] | df.Map('add_one', lambda x: x + 1))
+ self.assertEqual([1], [0, 1, 2] | df.Filter('odd', lambda x: x % 2))
+ self.assertEqual([1, 2, 3, 100],
+ ([1, 2, 3], [100]) | df.Flatten('flat'))
+ join_input = ([('k', 'a')],
+ [('k', 'b'), ('k', 'c')])
+ self.assertEqual([('k', (['a'], ['b', 'c']))],
+ join_input | df.CoGroupByKey('join'))
+
+ def test_multi_input_ptransform(self):
+ class DisjointUnion(PTransform):
+ def apply(self, pcollections):
+ return (pcollections
+ | df.Flatten()
+ | df.Map(lambda x: (x, None))
+ | df.GroupByKey()
+ | df.Map(lambda (x, _): x))
+ self.assertEqual([1, 2, 3], sorted(([1, 2], [2, 3]) | DisjointUnion()))
+
+ def test_apply_to_crazy_pvaluish(self):
+ class NestedFlatten(PTransform):
+ """A PTransform taking and returning nested PValueish.
+
+ Takes as input a list of dicts, and returns a dict with the corresponding
+ values flattened.
+ """
+ def _extract_input_pvalues(self, pvalueish):
+ pvalueish = list(pvalueish)
+ return pvalueish, sum([list(p.values()) for p in pvalueish], [])
+ def apply(self, pcoll_dicts):
+ keys = reduce(operator.or_, [set(p.keys()) for p in pcoll_dicts])
+ res = {}
+ for k in keys:
+ res[k] = [p[k] for p in pcoll_dicts if k in p] | df.Flatten(k)
+ return res
+ res = [{'a': [1, 2, 3]},
+ {'a': [4, 5, 6], 'b': ['x', 'y', 'z']},
+ {'a': [7, 8], 'b': ['x', 'y'], 'c': []}] | NestedFlatten()
+ self.assertEqual(3, len(res))
+ self.assertEqual([1, 2, 3, 4, 5, 6, 7, 8], sorted(res['a']))
+ self.assertEqual(['x', 'x', 'y', 'y', 'z'], sorted(res['b']))
+ self.assertEqual([], sorted(res['c']))
+
+@df.ptransform_fn
+def SamplePTransform(label, pcoll, context, *args, **kwargs):
+ """Sample transform using the @ptransform_fn decorator."""
+ _ = label, args, kwargs
+ map_transform = df.Map('ToPairs', lambda v: (v, None))
+ combine_transform = df.CombinePerKey('Group', lambda vs: None)
+ keys_transform = df.Keys('RemoveDuplicates')
+ context.extend([map_transform, combine_transform, keys_transform])
+ return pcoll | map_transform | combine_transform | keys_transform
+
+
+class PTransformLabelsTest(unittest.TestCase):
+
+ class CustomTransform(df.PTransform):
+
+ pardo = None
+
+ def apply(self, pcoll):
+ self.pardo = df.FlatMap('*do*', lambda x: [x + 1])
+ return pcoll | self.pardo
+
+ def test_chained_ptransforms(self):
+ """Tests that chaining gets proper nesting."""
+ pipeline = Pipeline('DirectPipelineRunner')
+ map1 = df.Map('map1', lambda x: (x, 1))
+ gbk = df.GroupByKey('gbk')
+ map2 = df.Map('map2', lambda (x, ones): (x, sum(ones)))
+ t = (map1 | gbk | map2)
+ result = pipeline | df.Create('start', ['a', 'a', 'b']) | t
+ self.assertTrue('map1|gbk|map2/map1' in pipeline.applied_labels)
+ self.assertTrue('map1|gbk|map2/gbk' in pipeline.applied_labels)
+ self.assertTrue('map1|gbk|map2/map2' in pipeline.applied_labels)
+ assert_that(result, equal_to([('a', 2), ('b', 1)]))
+ pipeline.run()
+
+ def test_apply_custom_transform_without_label(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('pcoll', [1, 2, 3])
+ custom = PTransformLabelsTest.CustomTransform()
+ result = pipeline.apply(custom, pcoll)
+ self.assertTrue('CustomTransform' in pipeline.applied_labels)
+ self.assertTrue('CustomTransform/*do*' in pipeline.applied_labels)
+ assert_that(result, equal_to([2, 3, 4]))
+ pipeline.run()
+
+ def test_apply_custom_transform_with_label(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('pcoll', [1, 2, 3])
+ custom = PTransformLabelsTest.CustomTransform('*custom*')
+ result = pipeline.apply(custom, pcoll)
+ self.assertTrue('*custom*' in pipeline.applied_labels)
+ self.assertTrue('*custom*/*do*' in pipeline.applied_labels)
+ assert_that(result, equal_to([2, 3, 4]))
+ pipeline.run()
+
+ def test_combine_without_label(self):
+ vals = [1, 2, 3, 4, 5, 6, 7]
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', vals)
+ combine = df.CombineGlobally(sum)
+ result = pcoll | combine
+ self.assertTrue('CombineGlobally(sum)' in pipeline.applied_labels)
+ assert_that(result, equal_to([sum(vals)]))
+ pipeline.run()
+
+ def test_apply_ptransform_using_decorator(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('pcoll', [1, 2, 3])
+ context = []
+ sample = SamplePTransform('*sample*', context)
+ _ = pcoll | sample
+ self.assertTrue('*sample*' in pipeline.applied_labels)
+ self.assertTrue('*sample*/ToPairs' in pipeline.applied_labels)
+ self.assertTrue('*sample*/Group' in pipeline.applied_labels)
+ self.assertTrue('*sample*/RemoveDuplicates' in pipeline.applied_labels)
+
+ def test_combine_with_label(self):
+ vals = [1, 2, 3, 4, 5, 6, 7]
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcoll = pipeline | df.Create('start', vals)
+ combine = df.CombineGlobally('*sum*', sum)
+ result = pcoll | combine
+ self.assertTrue('*sum*' in pipeline.applied_labels)
+ assert_that(result, equal_to([sum(vals)]))
+ pipeline.run()
+
+ def check_label(self, ptransform, expected_label):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pipeline | df.Create('start', [('a', 1)]) | ptransform
+ actual_label = sorted(pipeline.applied_labels - {'start'})[0]
+ self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label))
+
+ def test_default_labels(self):
+ self.check_label(df.Map(len), r'Map(len)')
+ self.check_label(df.Map(lambda x: x),
+ r'Map(<lambda at ptransform_test.py:#>)')
+ self.check_label(df.FlatMap(list), r'FlatMap(list)')
+ self.check_label(df.Filter(sum), r'Filter(sum)')
+ self.check_label(df.CombineGlobally(sum), r'CombineGlobally(sum)')
+ self.check_label(df.CombinePerKey(sum), r'CombinePerKey(sum)')
+
+ class MyDoFn(df.DoFn):
+ def process(self, context):
+ pass
+
+ self.check_label(df.ParDo(MyDoFn()), r'ParDo(MyDoFn)')
+
+
+class PTransformTypeCheckTestCase(TypeHintTestCase):
+
+ def assertStartswith(self, msg, prefix):
+ self.assertTrue(msg.startswith(prefix),
+ '"%s" does not start with "%s"' % (msg, prefix))
+
+ def setUp(self):
+ self.p = Pipeline(options=PipelineOptions([]))
+
+ def test_do_fn_pipeline_pipeline_type_check_satisfied(self):
+ @with_input_types(int, int)
+ @with_output_types(typehints.List[int])
+ class AddWithFive(df.DoFn):
+ def process(self, context, five):
+ return [context.element + five]
+
+ d = (self.p
+ | df.Create('t', [1, 2, 3]).with_output_types(int)
+ | df.ParDo('add', AddWithFive(), 5))
+
+ assert_that(d, equal_to([6, 7, 8]))
+ self.p.run()
+
+ def test_do_fn_pipeline_pipeline_type_check_violated(self):
+ @with_input_types(str, str)
+ @with_output_types(typehints.List[str])
+ class ToUpperCaseWithPrefix(df.DoFn):
+ def process(self, context, prefix):
+ return [prefix + context.element.upper()]
+
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ d = (self.p
+ | df.Create('t', [1, 2, 3]).with_output_types(int)
+ | df.ParDo('upper', ToUpperCaseWithPrefix(), 'hello'))
+
+ self.assertEqual("Type hint violation for 'upper': "
+ "requires <type 'str'> but got <type 'int'> for context",
+ e.exception.message)
+
+ def test_do_fn_pipeline_runtime_type_check_satisfied(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ @with_input_types(int, int)
+ @with_output_types(int)
+ class AddWithNum(df.DoFn):
+ def process(self, context, num):
+ return [context.element + num]
+
+ d = (self.p
+ | df.Create('t', [1, 2, 3]).with_output_types(int)
+ | df.ParDo('add', AddWithNum(), 5))
+
+ assert_that(d, equal_to([6, 7, 8]))
+ self.p.run()
+
+ def test_do_fn_pipeline_runtime_type_check_violated(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ @with_input_types(int, int)
+ @with_output_types(typehints.List[int])
+ class AddWithNum(df.DoFn):
+ def process(self, context, num):
+ return [context.element + num]
+
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ d = (self.p
+ | df.Create('t', ['1', '2', '3']).with_output_types(str)
+ | df.ParDo('add', AddWithNum(), 5))
+ self.p.run()
+
+ self.assertEqual("Type hint violation for 'add': "
+ "requires <type 'int'> but got <type 'str'> for context",
+ e.exception.message)
+
+ def test_pardo_does_not_type_check_using_type_hint_decorators(self):
+ @with_input_types(a=int)
+ @with_output_types(typehints.List[str])
+ def int_to_str(a):
+ return [str(a)]
+
+ # The function above is expecting an int for its only parameter. However, it
+ # will receive a str instead, which should result in a raised exception.
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ c = (self.p
+ | df.Create('s', ['b', 'a', 'r']).with_output_types(str)
+ | df.FlatMap('to str', int_to_str))
+
+ self.assertEqual("Type hint violation for 'to str': "
+ "requires <type 'int'> but got <type 'str'> for a",
+ e.exception.message)
+
+ def test_pardo_properly_type_checks_using_type_hint_decorators(self):
+ @with_input_types(a=str)
+ @with_output_types(typehints.List[str])
+ def to_all_upper_case(a):
+ return [a.upper()]
+
+ # If this type-checks than no error should be raised.
+ d = (self.p
+ | df.Create('t', ['t', 'e', 's', 't']).with_output_types(str)
+ | df.FlatMap('case', to_all_upper_case))
+ assert_that(d, equal_to(['T', 'E', 'S', 'T']))
+ self.p.run()
+
+ # Output type should have been recognized as 'str' rather than List[str] to
+ # do the flatten part of FlatMap.
+ self.assertEqual(str, d.element_type)
+
+ def test_pardo_does_not_type_check_using_type_hint_methods(self):
+ # The first ParDo outputs pcoll's of type int, however the second ParDo is
+ # expecting pcoll's of type str instead.
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
+ | (df.FlatMap('score', lambda x: [1] if x == 't' else [2])
+ .with_input_types(str).with_output_types(int))
+ | (df.FlatMap('upper', lambda x: [x.upper()])
+ .with_input_types(str).with_output_types(str)))
+
+ self.assertEqual("Type hint violation for 'upper': "
+ "requires <type 'str'> but got <type 'int'> for x",
+ e.exception.message)
+
+ def test_pardo_properly_type_checks_using_type_hint_methods(self):
+ # Pipeline should be created successfully without an error
+ d = (self.p
+ | df.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
+ | df.FlatMap('dup', lambda x: [x + x])
+ .with_input_types(str).with_output_types(str)
+ | df.FlatMap('upper', lambda x: [x.upper()])
+ .with_input_types(str).with_output_types(str))
+
+ assert_that(d, equal_to(['TT', 'EE', 'SS', 'TT']))
+ self.p.run()
+
+ def test_map_does_not_type_check_using_type_hints_methods(self):
+ # The transform before 'Map' has indicated that it outputs PCollections with
+ # int's, while Map is expecting one of str.
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ d = (self.p
+ | df.Create('s', [1, 2, 3, 4]).with_output_types(int)
+ | df.Map('upper', lambda x: x.upper()).with_input_types(str).with_output_types(str))
+
+ self.assertEqual("Type hint violation for 'upper': "
+ "requires <type 'str'> but got <type 'int'> for x",
+ e.exception.message)
+
+ def test_map_properly_type_checks_using_type_hints_methods(self):
+ # No error should be raised if this type-checks properly.
+ d = (self.p
+ | df.Create('s', [1, 2, 3, 4]).with_output_types(int)
+ | df.Map('to_str', lambda x: str(x)).with_input_types(int).with_output_types(str))
+ assert_that(d, equal_to(['1', '2', '3', '4']))
+ self.p.run()
+
+ def test_map_does_not_type_check_using_type_hints_decorator(self):
+ @with_input_types(s=str)
+ @with_output_types(str)
+ def upper(s):
+ return s.upper()
+
+ # Hinted function above expects a str at pipeline construction.
+ # However, 'Map' should detect that Create has hinted an int instead.
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ d = (self.p
+ | df.Create('s', [1, 2, 3, 4]).with_output_types(int)
+ | df.Map('upper', upper))
+
+ self.assertEqual("Type hint violation for 'upper': "
+ "requires <type 'str'> but got <type 'int'> for s",
+ e.exception.message)
+
+ def test_map_properly_type_checks_using_type_hints_decorator(self):
+ @with_input_types(a=bool)
+ @with_output_types(int)
+ def bool_to_int(a):
+ return int(a)
+
+ # If this type-checks than no error should be raised.
+ d = (self.p
+ | df.Create('bools', [True, False, True]).with_output_types(bool)
+ | df.Map('to_ints', bool_to_int))
+ assert_that(d, equal_to([1, 0, 1]))
+ self.p.run()
+
+ def test_filter_does_not_type_check_using_type_hints_method(self):
+ # Filter is expecting an int but instead looks to the 'left' and sees a str
+ # incoming.
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('strs', ['1', '2', '3', '4', '5']).with_output_types(str)
+ | df.Map('lower', lambda x: x.lower()).with_input_types(str).with_output_types(str)
+ | df.Filter('below 3', lambda x: x < 3).with_input_types(int))
+
+ self.assertEqual("Type hint violation for 'below 3': "
+ "requires <type 'int'> but got <type 'str'> for x",
+ e.exception.message)
+
+ def test_filter_type_checks_using_type_hints_method(self):
+ # No error should be raised if this type-checks properly.
+ d = (self.p
+ | df.Create('strs', ['1', '2', '3', '4', '5']).with_output_types(str)
+ | df.Map('to int', lambda x: int(x)).with_input_types(str).with_output_types(int)
+ | df.Filter('below 3', lambda x: x < 3).with_input_types(int))
+ assert_that(d, equal_to([1, 2]))
+ self.p.run()
+
+ def test_filter_does_not_type_check_using_type_hints_decorator(self):
+ @with_input_types(a=float)
+ def more_than_half(a):
+ return a > 0.50
+
+ # Func above was hinted to only take a float, yet an int will be passed.
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ d = (self.p
+ | df.Create('ints', [1, 2, 3, 4]).with_output_types(int)
+ | df.Filter('half', more_than_half))
+
+ self.assertEqual("Type hint violation for 'half': "
+ "requires <type 'float'> but got <type 'int'> for a",
+ e.exception.message)
+
+ def test_filter_type_checks_using_type_hints_decorator(self):
+ @with_input_types(b=int)
+ def half(b):
+ import random
+ return bool(random.choice([0, 1]))
+
+ # Filter should deduce that it returns the same type that it takes.
+ (self.p
+ | df.Create('str', range(5)).with_output_types(int)
+ | df.Filter('half', half)
+ | df.Map('to bool', lambda x: bool(x)).with_input_types(int).with_output_types(bool))
+
+ def test_group_by_key_only_output_type_deduction(self):
+ d = (self.p
+ | df.Create('str', ['t', 'e', 's', 't']).with_output_types(str)
+ | (df.Map('pair', lambda x: (x, ord(x)))
+ .with_output_types(typehints.KV[str, str]))
+ | df.GroupByKeyOnly('O'))
+
+ # Output type should correctly be deduced.
+ # GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
+ self.assertCompatible(typehints.KV[str, typehints.Iterable[str]],
+ d.element_type)
+
+ def test_group_by_key_output_type_deduction(self):
+ d = (self.p
+ | df.Create('str', range(20)).with_output_types(int)
+ | (df.Map('pair negative', lambda x: (x % 5, -x))
+ .with_output_types(typehints.KV[int, int]))
+ | df.GroupByKey('T'))
+
+ # Output type should correctly be deduced.
+ # GBK should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
+ self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
+ d.element_type)
+
+ def test_group_by_key_only_does_not_type_check(self):
+ # GBK will be passed raw int's here instead of some form of KV[A, B].
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ d = (self.p
+ | df.Create('s', [1, 2, 3]).with_output_types(int)
+ | df.GroupByKeyOnly('F'))
+
+ self.assertEqual("Input type hint violation at F: "
+ "expected Tuple[TypeVariable[K], TypeVariable[V]], "
+ "got <type 'int'>",
+ e.exception.message)
+
+ def test_group_by_does_not_type_check(self):
+ # Create is returning a List[int, str], rather than a KV[int, str] that is
+ # aliased to Tuple[int, str].
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ d = (self.p
+ | (df.Create('s', range(5))
+ .with_output_types(typehints.Iterable[int]))
+ | df.GroupByKey('T'))
+
+ self.assertEqual("Input type hint violation at T: "
+ "expected Tuple[TypeVariable[K], TypeVariable[V]], "
+ "got Iterable[int]",
+ e.exception.message)
+
+ def test_pipeline_checking_pardo_insufficient_type_information(self):
+ self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
+
+ # Type checking is enabled, but 'Create' doesn't pass on any relevant type
+ # information to the ParDo.
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('nums', range(5))
+ | df.FlatMap('mod dup', lambda x: (x % 2, x)))
+
+ self.assertEqual('Pipeline type checking is enabled, however no output '
+ 'type-hint was found for the PTransform Create(nums)',
+ e.exception.message)
+
+ def test_pipeline_checking_gbk_insufficient_type_information(self):
+ self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
+ # Type checking is enabled, but 'Map' doesn't pass on any relevant type
+ # information to GBK-only.
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('nums', range(5)).with_output_types(int)
+ | df.Map('mod dup', lambda x: (x % 2, x))
+ | df.GroupByKeyOnly('G'))
+
+ self.assertEqual('Pipeline type checking is enabled, however no output '
+ 'type-hint was found for the PTransform '
+ 'ParDo(mod dup)',
+ e.exception.message)
+
+ def test_disable_pipeline_type_check(self):
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+
+ # The pipeline below should raise a TypeError, however pipeline type
+ # checking was disabled above.
+ (self.p
+ | df.Create('t', [1, 2, 3]).with_output_types(int)
+ | df.Map('lower', lambda x: x.lower()).with_input_types(str).with_output_types(str))
+
+ def test_run_time_type_checking_enabled_type_violation(self):
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ @with_output_types(str)
+ @with_input_types(x=int)
+ def int_to_string(x):
+ return str(x)
+
+ # Function above has been type-hinted to only accept an int. But in the
+ # pipeline execution it'll be passed a string due to the output of Create.
+ (self.p
+ | df.Create('t', ['some_string'])
+ | df.Map('to str', int_to_string))
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ self.p.run()
+
+ self.assertStartswith(
+ e.exception.message,
+ "Runtime type violation detected within ParDo(to str): "
+ "Type-hint for argument: 'x' violated. "
+ "Expected an instance of <type 'int'>, "
+ "instead found some_string, an instance of <type 'str'>.")
+
+ def test_run_time_type_checking_enabled_types_satisfied(self):
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ @with_output_types(typehints.KV[int, str])
+ @with_input_types(x=str)
+ def group_with_upper_ord(x):
+ return (ord(x.upper()) % 5, x)
+
+ # Pipeline checking is off, but the above function should satisfy types at
+ # run-time.
+ result = (self.p
+ | df.Create('t', ['t', 'e', 's', 't', 'i', 'n', 'g']).with_output_types(str)
+ | df.Map('gen keys', group_with_upper_ord)
+ | df.GroupByKey('O'))
+
+ assert_that(result, equal_to([(1, ['g']),
+ (3, ['s', 'i', 'n']),
+ (4, ['t', 'e', 't'])]))
+ self.p.run()
+
+ def test_pipeline_checking_satisfied_but_run_time_types_violate(self):
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ @with_output_types(typehints.KV[bool, int])
+ @with_input_types(a=int)
+ def is_even_as_key(a):
+ # Simulate a programming error, should be: return (a % 2 == 0, a)
+ # However this returns KV[int, int]
+ return (a % 2, a)
+
+ (self.p
+ | df.Create('nums', range(5)).with_output_types(int)
+ | df.Map('is even', is_even_as_key)
+ | df.GroupByKey('parity'))
+
+ # Although all the types appear to be correct when checked at pipeline
+ # construction. Runtime type-checking should detect the 'is_even_as_key' is
+ # returning Tuple[int, int], instead of Tuple[bool, int].
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ self.p.run()
+
+ self.assertStartswith(
+ e.exception.message,
+ "Runtime type violation detected within ParDo(is even): "
+ "Tuple[bool, int] hint type-constraint violated. "
+ "The type of element #0 in the passed tuple is incorrect. "
+ "Expected an instance of type bool, "
+ "instead received an instance of type int.")
+
+ def test_pipeline_checking_satisfied_run_time_checking_satisfied(self):
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+
+ @with_output_types(typehints.KV[bool, int])
+ @with_input_types(a=int)
+ def is_even_as_key(a):
+ # The programming error in the above test-case has now been fixed.
+ # Everything should properly type-check.
+ return (a % 2 == 0, a)
+
+ result = (self.p
+ | df.Create('nums', range(5)).with_output_types(int)
+ | df.Map('is even', is_even_as_key)
+ | df.GroupByKey('parity'))
+
+ assert_that(result, equal_to([(False, [1, 3]), (True, [0, 2, 4])]))
+ self.p.run()
+
+ def test_pipeline_runtime_checking_violation_simple_type_input(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+
+ # The type-hinted applied via the 'with_input_types()' method indicates the
+ # ParDo should receive an instance of type 'str', however an 'int' will be
+ # passed instead.
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p | df.Create('n', [1, 2, 3])
+ | (df.FlatMap('to int', lambda x: [int(x)])
+ .with_input_types(str).with_output_types(int))
+ )
+ self.p.run()
+
+ self.assertStartswith(
+ e.exception.message,
+ "Runtime type violation detected within ParDo(to int): "
+ "Type-hint for argument: 'x' violated. "
+ "Expected an instance of <type 'str'>, "
+ "instead found 1, an instance of <type 'int'>.")
+
+ def test_pipeline_runtime_checking_violation_composite_type_input(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('n', [(1, 3.0), (2, 4.9), (3, 9.5)])
+ | (df.FlatMap('add', lambda (x, y): [x + y])
+ .with_input_types(typehints.Tuple[int, int]).with_output_types(int))
+ )
+ self.p.run()
+
+ self.assertStartswith(
+ e.exception.message,
+ "Runtime type violation detected within ParDo(add): "
+ "Type-hint for argument: 'y' violated. "
+ "Expected an instance of <type 'int'>, "
+ "instead found 3.0, an instance of <type 'float'>.")
+
+ def test_pipeline_runtime_checking_violation_simple_type_output(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+
+ # The type-hinted applied via the 'returns()' method indicates the ParDo
+ # should output an instance of type 'int', however a 'float' will be
+ # generated instead.
+ print "HINTS", df.FlatMap(
+ 'to int',
+ lambda x: [float(x)]).with_input_types(int).with_output_types(
+ int).get_type_hints()
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p | df.Create('n', [1, 2, 3])
+ | (df.FlatMap('to int', lambda x: [float(x)])
+ .with_input_types(int).with_output_types(int))
+ )
+ self.p.run()
+
+ self.assertStartswith(
+ e.exception.message,
+ "Runtime type violation detected within "
+ "ParDo(to int): "
+ "According to type-hint expected output should be "
+ "of type <type 'int'>. Instead, received '1.0', "
+ "an instance of type <type 'float'>.")
+
+ def test_pipeline_runtime_checking_violation_composite_type_output(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+
+ # The type-hinted applied via the 'returns()' method indicates the ParDo
+ # should return an instance of type: Tuple[float, int]. However, an instance
+ # of 'int' will be generated instead.
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('n', [(1, 3.0), (2, 4.9), (3, 9.5)])
+ | (df.FlatMap('swap', lambda (x, y): [x + y])
+ .with_input_types(typehints.Tuple[int, float])
+ .with_output_types(typehints.Tuple[float, int]))
+ )
+ self.p.run()
+
+ self.assertStartswith(
+ e.exception.message,
+ "Runtime type violation detected within "
+ "ParDo(swap): Tuple type constraint violated. "
+ "Valid object instance must be of type 'tuple'. Instead, "
+ "an instance of 'float' was received.")
+
+ def test_pipline_runtime_checking_violation_with_side_inputs_decorator(self):
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ @with_output_types(int)
+ @with_input_types(a=int, b=int)
+ def add(a, b):
+ return a + b
+
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p | df.Create('t', [1, 2, 3, 4]) | df.Map('add 1', add, 1.0))
+ self.p.run()
+
+ self.assertStartswith(
+ e.exception.message,
+ "Runtime type violation detected within ParDo(add 1): "
+ "Type-hint for argument: 'b' violated. "
+ "Expected an instance of <type 'int'>, "
+ "instead found 1.0, an instance of <type 'float'>.")
+
+ def test_pipline_runtime_checking_violation_with_side_inputs_via_method(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('t', [1, 2, 3, 4])
+ | (df.Map('add 1', lambda x, one: x + one, 1.0)
+ .with_input_types(int, int)
+ .with_output_types(float)))
+ self.p.run()
+
+ self.assertStartswith(
+ e.exception.message,
+ "Runtime type violation detected within ParDo(add 1): "
+ "Type-hint for argument: 'one' violated. "
+ "Expected an instance of <type 'int'>, "
+ "instead found 1.0, an instance of <type 'float'>.")
+
+ def test_combine_properly_pipeline_type_checks_using_decorator(self):
+ @with_output_types(int)
+ @with_input_types(ints=typehints.Iterable[int])
+ def sum_ints(ints):
+ return sum(ints)
+
+ d = (self.p
+ | df.Create('t', [1, 2, 3]).with_output_types(int)
+ | df.CombineGlobally('sum', sum_ints))
+
+ self.assertEqual(int, d.element_type)
+ assert_that(d, equal_to([6]))
+ self.p.run()
+
+ def test_combine_func_type_hint_does_not_take_iterable_using_decorator(self):
+ @with_output_types(int)
+ @with_input_types(a=int)
+ def bad_combine(a):
+ 5 + a
+
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('m', [1, 2, 3]).with_output_types(int)
+ | df.CombineGlobally('add', bad_combine))
+
+ self.assertEqual(
+ "All functions for a Combine PTransform must accept a "
+ "single argument compatible with: Iterable[Any]. "
+ "Instead a function with input type: <type 'int'> was received.",
+ e.exception.message)
+
+ def test_combine_pipeline_type_propagation_using_decorators(self):
+ @with_output_types(int)
+ @with_input_types(ints=typehints.Iterable[int])
+ def sum_ints(ints):
+ return sum(ints)
+
+ @with_output_types(typehints.List[int])
+ @with_input_types(n=int)
+ def range_from_zero(n):
+ return list(range(n+1))
+
+ d = (self.p
+ | df.Create('t', [1, 2, 3]).with_output_types(int)
+ | df.CombineGlobally('sum', sum_ints)
+ | df.ParDo('range', range_from_zero))
+
+ self.assertEqual(int, d.element_type)
+ assert_that(d, equal_to([0, 1, 2, 3, 4, 5, 6]))
+ self.p.run()
+
+ def test_combine_runtime_type_check_satisfied_using_decorators(self):
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+
+ @with_output_types(int)
+ @with_input_types(ints=typehints.Iterable[int])
+ def iter_mul(ints):
+ return reduce(operator.mul, ints, 1)
+
+ d = (self.p
+ | df.Create('k', [5, 5, 5, 5]).with_output_types(int)
+ | df.CombineGlobally('mul', iter_mul))
+
+ assert_that(d, equal_to([625]))
+ self.p.run()
+
+ def test_combine_runtime_type_check_violation_using_decorators(self):
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ # Combine fn is returning the incorrect type
+ @with_output_types(int)
+ @with_input_types(ints=typehints.Iterable[int])
+ def iter_mul(ints):
+ return str(reduce(operator.mul, ints, 1))
+
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('k', [5, 5, 5, 5]).with_output_types(int)
+ | df.CombineGlobally('mul', iter_mul))
+ self.p.run()
+
+ self.assertStartswith(
+ e.exception.message,
+ "Runtime type violation detected within "
+ "ParDo(mul/CombinePerKey/Combine/ParDo(CombineValuesDoFn)): "
+ "Tuple[TypeVariable[K], int] hint type-constraint violated. "
+ "The type of element #1 in the passed tuple is incorrect. "
+ "Expected an instance of type int, "
+ "instead received an instance of type str.")
+
+ def test_combine_pipeline_type_check_using_methods(self):
+ d = (self.p
+ | df.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
+ | (df.CombineGlobally('concat', lambda s: ''.join(s))
+ .with_input_types(str).with_output_types(str)))
+
+ def matcher(expected):
+ def match(actual):
+ equal_to(expected)(list(actual[0]))
+ return match
+ assert_that(d, matcher('estt'))
+ self.p.run()
+
+ def test_combine_runtime_type_check_using_methods(self):
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ d = (self.p
+ | df.Create('s', range(5)).with_output_types(int)
+ | (df.CombineGlobally('sum', lambda s: sum(s))
+ .with_input_types(int).with_output_types(int)))
+
+ assert_that(d, equal_to([10]))
+ self.p.run()
+
+ def test_combine_pipeline_type_check_violation_using_methods(self):
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('e', range(3)).with_output_types(int)
+ | (df.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
+ .with_input_types(str).with_output_types(str)))
+
+ self.assertEqual("Input type hint violation at sort join: "
+ "expected <type 'str'>, got <type 'int'>",
+ e.exception.message)
+
+ def test_combine_runtime_type_check_violation_using_methods(self):
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('e', range(3)).with_output_types(int)
+ | (df.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
+ .with_input_types(str).with_output_types(str)))
+ self.p.run()
+
+ self.assertStartswith(
+ e.exception.message,
+ "Runtime type violation detected within "
+ "ParDo(sort join/KeyWithVoid): "
+ "Type-hint for argument: 'v' violated. "
+ "Expected an instance of <type 'str'>, "
+ "instead found 0, an instance of <type 'int'>.")
+
+ def test_combine_insufficient_type_hint_information(self):
+ self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
+
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('e', range(3)).with_output_types(int)
+ | df.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
+ | df.Map('f', lambda x: x + 1))
+
+ self.assertEqual(
+ 'Pipeline type checking is enabled, '
+ 'however no output type-hint was found for the PTransform '
+ 'ParDo(sort join/CombinePerKey/Combine/ParDo(CombineValuesDoFn))',
+ e.exception.message)
+
+ def test_mean_globally_pipeline_checking_satisfied(self):
+ d = (self.p
+ | df.Create('c', range(5)).with_output_types(int)
+ | combine.Mean.Globally('mean'))
+
+ self.assertTrue(d.element_type is float)
+ assert_that(d, equal_to([2.0]))
+ self.p.run()
+
+ def test_mean_globally_pipeline_checking_violated(self):
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ d = (self.p
+ | df.Create('c', ['test']).with_output_types(str)
+ | combine.Mean.Globally('mean'))
+
+ self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
+ "requires Tuple[TypeVariable[K], "
+ "Iterable[Union[float, int, long]]] "
+ "but got Tuple[None, Iterable[str]] for p_context",
+ e.exception.message)
+
+ def test_mean_globally_runtime_checking_satisfied(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ d = (self.p
+ | df.Create('c', range(5)).with_output_types(int)
+ | combine.Mean.Globally('mean'))
+
+ self.assertTrue(d.element_type is float)
+ assert_that(d, equal_to([2.0]))
+ self.p.run()
+
+ def test_mean_globally_runtime_checking_violated(self):
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('c', ['t', 'e', 's', 't']).with_output_types(str)
+ | combine.Mean.Globally('mean'))
+ self.p.run()
+ self.assertEqual("Runtime type violation detected for transform input "
+ "when executing ParDoFlatMap(Combine): Tuple[Any, "
+ "Iterable[Union[int, float]]] hint type-constraint "
+ "violated. The type of element #1 in the passed tuple "
+ "is incorrect. Iterable[Union[int, float]] hint "
+ "type-constraint violated. The type of element #0 in "
+ "the passed Iterable is incorrect: Union[int, float] "
+ "type-constraint violated. Expected an instance of one "
+ "of: ('int', 'float'), received str instead.",
+ e.exception.message)
+
+ def test_mean_per_key_pipeline_checking_satisfied(self):
+ d = (self.p
+ | df.Create('c', range(5)).with_output_types(int)
+ | (df.Map('even group', lambda x: (not x % 2, x))
+ .with_output_types(typehints.KV[bool, int]))
+ | combine.Mean.PerKey('even mean'))
+
+ self.assertCompatible(typehints.KV[bool, float], d.element_type)
+ assert_that(d, equal_to([(False, 2.0), (True, 2.0)]))
+ self.p.run()
+
+ def test_mean_per_key_pipeline_checking_violated(self):
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('e', map(str, range(5))).with_output_types(str)
+ | (df.Map('upper pair', lambda x: (x.upper(), x))
+ .with_output_types(typehints.KV[str, str]))
+ | combine.Mean.PerKey('even mean'))
+ self.p.run()
+
+ self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
+ "requires Tuple[TypeVariable[K], "
+ "Iterable[Union[float, int, long]]] "
+ "but got Tuple[str, Iterable[str]] for p_context",
+ e.exception.message)
+
+ def test_mean_per_key_runtime_checking_satisfied(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ d = (self.p
+ | df.Create('c', range(5)).with_output_types(int)
+ | (df.Map('odd group', lambda x: (bool(x % 2), x))
+ .with_output_types(typehints.KV[bool, int]))
+ | combine.Mean.PerKey('odd mean'))
+
+ self.assertCompatible(typehints.KV[bool, float], d.element_type)
+ assert_that(d, equal_to([(False, 2.0), (True, 2.0)]))
+ self.p.run()
+
+ def test_mean_per_key_runtime_checking_violated(self):
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('c', range(5)).with_output_types(int)
+ | (df.Map('odd group', lambda x: (x, str(bool(x % 2))))
+ .with_output_types(typehints.KV[int, str]))
+ | combine.Mean.PerKey('odd mean'))
+ self.p.run()
+
+ self.assertStartswith(
+ e.exception.message,
+ "Runtime type violation detected within "
+ "ParDo(odd mean/CombinePerKey(MeanCombineFn)/"
+ "Combine/ParDo(CombineValuesDoFn)): "
+ "Type-hint for argument: 'p_context' violated: "
+ "Tuple[TypeVariable[K], Iterable[Union[float, int, long]]]"
+ " hint type-constraint violated. "
+ "The type of element #1 in the passed tuple is incorrect. "
+ "Iterable[Union[float, int, long]] "
+ "hint type-constraint violated. The type of element #0 "
+ "in the passed Iterable is incorrect: "
+ "Union[float, int, long] type-constraint violated. "
+ "Expected an instance of one of: "
+ "('float', 'int', 'long'), received str instead.")
+
+ def test_count_globally_pipeline_type_checking_satisfied(self):
+ d = (self.p
+ | df.Create('p', range(5)).with_output_types(int)
+ | combine.Count.Globally('count int'))
+
+ self.assertTrue(d.element_type is int)
+ assert_that(d, equal_to([5]))
+ self.p.run()
+
+ def test_count_globally_runtime_type_checking_satisfied(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ d = (self.p
+ | df.Create('p', range(5)).with_output_types(int)
+ | combine.Count.Globally('count int'))
+
+ self.assertTrue(d.element_type is int)
+ assert_that(d, equal_to([5]))
+ self.p.run()
+
+ def test_count_perkey_pipeline_type_checking_satisfied(self):
+ d = (self.p
+ | df.Create('p', range(5)).with_output_types(int)
+ | (df.Map('even group', lambda x: (not x % 2, x))
+ .with_output_types(typehints.KV[bool, int]))
+ | combine.Count.PerKey('count int'))
+
+ self.assertCompatible(typehints.KV[bool, int], d.element_type)
+ assert_that(d, equal_to([(False, 2), (True, 3)]))
+ self.p.run()
+
+ def test_count_perkey_pipeline_type_checking_violated(self):
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('p', range(5)).with_output_types(int)
+ | combine.Count.PerKey('count int'))
+
+ self.assertEqual("Input type hint violation at GroupByKey: "
+ "expected Tuple[TypeVariable[K], TypeVariable[V]], "
+ "got <type 'int'>",
+ e.exception.message)
+
+ def test_count_perkey_runtime_type_checking_satisfied(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ d = (self.p
+ | df.Create('c', ['t', 'e', 's', 't']).with_output_types(str)
+ | df.Map('dup key', lambda x: (x, x)).with_output_types(typehints.KV[str, str])
+ | combine.Count.PerKey('count dups'))
+
+ self.assertCompatible(typehints.KV[str, int], d.element_type)
+ assert_that(d, equal_to([('e', 1), ('s', 1), ('t', 2)]))
+ self.p.run()
+
+ def test_count_perelement_pipeline_type_checking_satisfied(self):
+ d = (self.p
+ | df.Create('w', [1, 1, 2, 3]).with_output_types(int)
+ | combine.Count.PerElement('count elems'))
+
+ self.assertCompatible(typehints.KV[int, int], d.element_type)
+ assert_that(d, equal_to([(1, 2), (2, 1), (3, 1)]))
+ self.p.run()
+
+ def test_count_perelement_pipeline_type_checking_violated(self):
+ self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
+
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ (self.p
+ | df.Create('f', [1, 1, 2, 3])
+ | combine.Count.PerElement('count elems'))
+
+ self.assertEqual('Pipeline type checking is enabled, however no output '
+ 'type-hint was found for the PTransform '
+ 'Create(f)',
+ e.exception.message)
+
+ def test_count_perelement_runtime_type_checking_satisfied(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ d = (self.p
+ | df.Create('w', [True, True, False, True, True]).with_output_types(bool)
+ | combine.Count.PerElement('count elems'))
+
+ self.assertCompatible(typehints.KV[bool, int], d.element_type)
+ assert_that(d, equal_to([(False, 1), (True, 4)]))
+ self.p.run()
+
+ def test_top_of_pipeline_checking_satisfied(self):
+ d = (self.p
+ | df.Create('n', range(5, 11)).with_output_types(int)
+ | combine.Top.Of('top 3', 3, lambda x, y: x < y))
+
+ self.assertCompatible(typehints.Iterable[int],
+ d.element_type)
+ assert_that(d, equal_to([[10, 9, 8]]))
+ self.p.run()
+
+ def test_top_of_runtime_checking_satisfied(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ d = (self.p
+ | df.Create('n', list('testing')).with_output_types(str)
+ | combine.Top.Of('acii top', 3, lambda x, y: x < y))
+
+ self.assertCompatible(typehints.Iterable[str], d.element_type)
+ assert_that(d, equal_to([['t', 't', 's']]))
+ self.p.run()
+
+ def test_per_key_pipeline_checking_violated(self):
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ d = (self.p
+ | df.Create('n', range(100)).with_output_types(int)
+ | df.Map('num + 1', lambda x: x + 1).with_output_types(int)
+ | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
+
+ self.assertEqual("Input type hint violation at GroupByKey: "
+ "expected Tuple[TypeVariable[K], TypeVariable[V]], "
+ "got <type 'int'>",
+ e.exception.message)
+
+ def test_per_key_pipeline_checking_satisfied(self):
+ d = (self.p
+ | df.Create('n', range(100)).with_output_types(int)
+ | (df.Map('group mod 3', lambda x: (x % 3, x))
+ .with_output_types(typehints.KV[int, int]))
+ | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
+
+ self.assertCompatible(typehints.Tuple[int, typehints.Iterable[int]],
+ d.element_type)
+ assert_that(d, equal_to([(0, [99]), (1, [97]), (2, [98])]))
+ self.p.run()
+
+ def test_per_key_runtime_checking_satisfied(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ d = (self.p
+ | df.Create('n', range(21))
+ | (df.Map('group mod 3', lambda x: (x % 3, x))
+ .with_output_types(typehints.KV[int, int]))
+ | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
+
+ self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
+ d.element_type)
+ assert_that(d, equal_to([(0, [18]), (1, [19]), (2, [20])]))
+ self.p.run()
+
+ def test_sample_globally_pipeline_satisfied(self):
+ d = (self.p
+ | df.Create('m', [2, 2, 3, 3]).with_output_types(int)
+ | combine.Sample.FixedSizeGlobally('sample', 3))
+
+ self.assertCompatible(typehints.Iterable[int], d.element_type)
+ def matcher(expected_len):
+ def match(actual):
+ equal_to([expected_len])([len(actual[0])])
+ return match
+ assert_that(d, matcher(3))
+ self.p.run()
+
+ def test_sample_globally_runtime_satisfied(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ d = (self.p
+ | df.Create('m', [2, 2, 3, 3]).with_output_types(int)
+ | combine.Sample.FixedSizeGlobally('sample', 2))
+
+ self.assertCompatible(typehints.Iterable[int], d.element_type)
+ def matcher(expected_len):
+ def match(actual):
+ equal_to([expected_len])([len(actual[0])])
+ return match
+ assert_that(d, matcher(2))
+ self.p.run()
+
+ def test_sample_per_key_pipeline_satisfied(self):
+ d = (self.p
+ | (df.Create('m', [(1, 2), (1, 2), (2, 3), (2, 3)])
+ .with_output_types(typehints.KV[int, int]))
+ | combine.Sample.FixedSizePerKey('sample', 2))
+
+ self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
+ d.element_type)
+ def matcher(expected_len):
+ def match(actual):
+ for _, sample in actual:
+ equal_to([expected_len])([len(sample)])
+ return match
+ assert_that(d, matcher(2))
+ self.p.run()
+
+ def test_sample_per_key_runtime_satisfied(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ d = (self.p
+ | (df.Create('m', [(1, 2), (1, 2), (2, 3), (2, 3)])
+ .with_output_types(typehints.KV[int, int]))
+ | combine.Sample.FixedSizePerKey('sample', 1))
+
+ self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
+ d.element_type)
+ def matcher(expected_len):
+ def match(actual):
+ for _, sample in actual:
+ equal_to([expected_len])([len(sample)])
+ return match
+ assert_that(d, matcher(1))
+ self.p.run()
+
+ def test_to_list_pipeline_check_satisfied(self):
+ d = (self.p
+ | df.Create('c', (1, 2, 3, 4)).with_output_types(int)
+ | combine.ToList('to list'))
+
+ self.assertCompatible(typehints.List[int], d.element_type)
+ def matcher(expected):
+ def match(actual):
+ equal_to(expected)(actual[0])
+ return match
+ assert_that(d, matcher([1, 2, 3, 4]))
+ self.p.run()
+
+ def test_to_list_runtime_check_satisfied(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ d = (self.p
+ | df.Create('c', list('test')).with_output_types(str)
+ | combine.ToList('to list'))
+
+ self.assertCompatible(typehints.List[str], d.element_type)
+ def matcher(expected):
+ def match(actual):
+ equal_to(expected)(actual[0])
+ return match
+ assert_that(d, matcher(['e', 's', 't', 't']))
+ self.p.run()
+
+ def test_to_dict_pipeline_check_violated(self):
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ d = (self.p
+ | df.Create('d', [1, 2, 3, 4]).with_output_types(int)
+ | combine.ToDict('to dict'))
+
+ self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
+ "requires Tuple[TypeVariable[K], "
+ "Iterable[Tuple[TypeVariable[K], TypeVariable[V]]]] "
+ "but got Tuple[None, Iterable[int]] for p_context",
+ e.exception.message)
+
+ def test_to_dict_pipeline_check_satisfied(self):
+ d = (self.p
+ | df.Create(
+ 'd',
+ [(1, 2), (3, 4)]).with_output_types(typehints.Tuple[int, int])
+ | combine.ToDict('to dict'))
+
+ self.assertCompatible(typehints.Dict[int, int], d.element_type)
+ assert_that(d, equal_to([{1: 2, 3: 4}]))
+ self.p.run()
+
+ def test_to_dict_runtime_check_satisfied(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ d = (self.p
+ | (df.Create('d', [('1', 2), ('3', 4)])
+ .with_output_types(typehints.Tuple[str, int]))
+ | combine.ToDict('to dict'))
+
+ self.assertCompatible(typehints.Dict[str, int], d.element_type)
+ assert_that(d, equal_to([{'1': 2, '3': 4}]))
+ self.p.run()
+
+ def test_runtime_type_check_python_type_error(self):
+ self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+ with self.assertRaises(TypeError) as e:
+ (self.p
+ | df.Create('t', [1, 2, 3]).with_output_types(int)
+ | df.Map('len', lambda x: len(x)).with_output_types(int))
+ self.p.run()
+
+ # Our special type-checking related TypeError shouldn't have been raised.
+ # Instead the above pipeline should have triggered a regular Python runtime
+ # TypeError.
+ self.assertEqual("object of type 'int' has no len() [while running 'len']",
+ e.exception.message)
+ self.assertFalse(isinstance(e, typehints.TypeCheckError))
+
+ def test_pardo_type_inference(self):
+ self.assertEqual(int,
+ df.Filter(lambda x: False).infer_output_type(int))
+ self.assertEqual(typehints.Tuple[str, int],
+ df.Map(lambda x: (x, 1)).infer_output_type(str))
+
+ def test_gbk_type_inference(self):
+ self.assertEqual(
+ typehints.Tuple[str, typehints.Iterable[int]],
+ df.core.GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
+
+ def test_pipeline_inference(self):
+ created = self.p | df.Create('c', ['a', 'b', 'c'])
+ mapped = created | df.Map('pair with 1', lambda x: (x, 1))
+ grouped = mapped | df.GroupByKey()
+ self.assertEqual(str, created.element_type)
+ self.assertEqual(typehints.KV[str, int], mapped.element_type)
+ self.assertEqual(typehints.KV[str, typehints.Iterable[int]],
+ grouped.element_type)
+
+ def test_inferred_bad_kv_type(self):
+ with self.assertRaises(typehints.TypeCheckError) as e:
+ _ = (self.p
+ | df.Create('t', ['a', 'b', 'c'])
+ | df.Map('ungroupable', lambda x: (x, 0, 1.0))
+ | df.GroupByKey())
+
+ self.assertEqual('Input type hint violation at GroupByKey: '
+ 'expected Tuple[TypeVariable[K], TypeVariable[V]], '
+ 'got Tuple[str, int, float]',
+ e.exception.message)
+
+ def test_type_inference_command_line_flag_toggle(self):
+ self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ x = self.p | df.Create('t', [1, 2, 3, 4])
+ self.assertIsNone(x.element_type)
+
+ self.p.options.view_as(TypeOptions).pipeline_type_check = True
+ x = self.p | df.Create('m', [1, 2, 3, 4])
+ self.assertEqual(int, x.element_type)
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py
new file mode 100644
index 0000000..b8efe82
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -0,0 +1,145 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Internal side input transforms and implementations.
+
+Important: this module is an implementation detail and should not be used
+directly by pipeline writers. Instead, users should use the helper methods
+AsSingleton, AsIter, AsList and AsDict in google.cloud.dataflow.pvalue.
+"""
+
+from __future__ import absolute_import
+
+from google.cloud.dataflow import pvalue
+from google.cloud.dataflow import typehints
+from google.cloud.dataflow.transforms.ptransform import PTransform
+
+
+class CreatePCollectionView(PTransform):
+ """Transform to materialize a given PCollectionView in the pipeline.
+
+ Important: this transform is an implementation detail and should not be used
+ directly by pipeline writers.
+ """
+
+ def __init__(self, view):
+ self.view = view
+ super(CreatePCollectionView, self).__init__()
+
+ def infer_output_type(self, input_type):
+ # TODO(ccy): Figure out if we want to create a new type of type hint, i.e.,
+ # typehints.View[...].
+ return input_type
+
+ def apply(self, pcoll):
+ return self.view
+
+
+class ViewAsSingleton(PTransform):
+ """Transform to view PCollection as a singleton PCollectionView.
+
+ Important: this transform is an implementation detail and should not be used
+ directly by pipeline writers. Use pvalue.AsSingleton(...) instead.
+ """
+
+ def __init__(self, has_default, default_value, label=None):
+ if label:
+ label = 'ViewAsSingleton(%s)' % label
+ super(ViewAsSingleton, self).__init__(label=label)
+ self.has_default = has_default
+ self.default_value = default_value
+
+ def apply(self, pcoll):
+ self._check_pcollection(pcoll)
+ input_type = pcoll.element_type
+ output_type = input_type
+ return (pcoll
+ | CreatePCollectionView(
+ pvalue.SingletonPCollectionView(
+ pcoll.pipeline, self.has_default, self.default_value))
+ .with_input_types(input_type)
+ .with_output_types(output_type))
+
+
+class ViewAsIterable(PTransform):
+ """Transform to view PCollection as an iterable PCollectionView.
+
+ Important: this transform is an implementation detail and should not be used
+ directly by pipeline writers. Use pvalue.AsIter(...) instead.
+ """
+
+ def __init__(self, label=None):
+ if label:
+ label = 'ViewAsIterable(%s)' % label
+ super(ViewAsIterable, self).__init__(label=label)
+
+ def apply(self, pcoll):
+ self._check_pcollection(pcoll)
+ input_type = pcoll.element_type
+ output_type = typehints.Iterable[input_type]
+ return (pcoll
+ | CreatePCollectionView(
+ pvalue.IterablePCollectionView(pcoll.pipeline))
+ .with_input_types(input_type)
+ .with_output_types(output_type))
+
+
+class ViewAsList(PTransform):
+ """Transform to view PCollection as a list PCollectionView.
+
+ Important: this transform is an implementation detail and should not be used
+ directly by pipeline writers. Use pvalue.AsList(...) instead.
+ """
+
+ def __init__(self, label=None):
+ if label:
+ label = 'ViewAsList(%s)' % label
+ super(ViewAsList, self).__init__(label=label)
+
+ def apply(self, pcoll):
+ self._check_pcollection(pcoll)
+ input_type = pcoll.element_type
+ output_type = typehints.List[input_type]
+ return (pcoll
+ | CreatePCollectionView(pvalue.ListPCollectionView(pcoll.pipeline))
+ .with_input_types(input_type)
+ .with_output_types(output_type))
+
+K = typehints.TypeVariable('K')
+V = typehints.TypeVariable('V')
+@typehints.with_input_types(typehints.Tuple[K, V])
+@typehints.with_output_types(typehints.Dict[K, V])
+class ViewAsDict(PTransform): # pylint: disable=g-wrong-blank-lines
+ """Transform to view PCollection as a dict PCollectionView.
+
+ Important: this transform is an implementation detail and should not be used
+ directly by pipeline writers. Use pvalue.AsDict(...) instead.
+ """
+
+ def __init__(self, label=None):
+ if label:
+ label = 'ViewAsDict(%s)' % label
+ super(ViewAsDict, self).__init__(label=label)
+
+ def apply(self, pcoll):
+ self._check_pcollection(pcoll)
+ input_type = pcoll.element_type
+ key_type, value_type = (
+ typehints.trivial_inference.key_value_types(input_type))
+ output_type = typehints.Dict[key_type, value_type]
+ return (pcoll
+ | CreatePCollectionView(
+ pvalue.DictPCollectionView(pcoll.pipeline))
+ .with_input_types(input_type)
+ .with_output_types(output_type))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/transforms/timeutil.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py
new file mode 100644
index 0000000..7b750f9
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -0,0 +1,310 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Time and timer utilities."""
+
+from __future__ import absolute_import
+
+from abc import ABCMeta
+from abc import abstractmethod
+
+import datetime
+import sys
+
+
+class Timestamp(object):
+ """Represents a Unix second timestamp with microsecond granularity.
+
+ Can be treated in common timestamp arithmetic operations as a numeric type.
+
+ Internally stores a time interval as an int of microseconds. This strategy
+ is necessary since floating point values lose precision when storing values,
+ especially after arithmetic operations (for example, 10000000 % 0.1 evaluates
+ to 0.0999999994448885).
+ """
+
+ def __init__(self, seconds=0, micros=0):
+ self.micros = int(seconds * 1000000) + int(micros)
+
+ @staticmethod
+ def of(seconds):
+ """Return the Timestamp for the given number of seconds.
+
+ If the input is already a Timestamp, the input itself will be returned.
+
+ Args:
+ seconds: Number of seconds as int, float or Timestamp.
+
+ Returns:
+ Corresponding Timestamp object.
+ """
+
+ if isinstance(seconds, Duration):
+ raise TypeError('Can\'t interpret %s as Timestamp.' % seconds)
+ if isinstance(seconds, Timestamp):
+ return seconds
+ return Timestamp(seconds)
+
+ def __repr__(self):
+ micros = self.micros
+ sign = ''
+ if micros < 0:
+ sign = '-'
+ micros = -micros
+ int_part = micros / 1000000
+ frac_part = micros % 1000000
+ if frac_part:
+ return 'Timestamp(%s%d.%06d)' % (sign, int_part, frac_part)
+ else:
+ return 'Timestamp(%s%d)' % (sign, int_part)
+
+ def to_utc_datetime(self):
+ epoch = datetime.datetime.utcfromtimestamp(0)
+ # We can't easily construct a datetime object from microseconds, so we
+ # create one at the epoch and add an appropriate timedelta interval.
+ return epoch + datetime.timedelta(microseconds=self.micros)
+
+ def isoformat(self):
+ # Append 'Z' for UTC timezone.
+ return self.to_utc_datetime().isoformat() + 'Z'
+
+ def __float__(self):
+ # Note that the returned value may have lost precision.
+ return float(self.micros) / 1000000
+
+ def __int__(self):
+ # Note that the returned value may have lost precision.
+ return self.micros / 1000000
+
+ def __cmp__(self, other):
+ # Allow comparisons between Duration and Timestamp values.
+ if not isinstance(other, Duration):
+ other = Timestamp.of(other)
+ return cmp(self.micros, other.micros)
+
+ def __hash__(self):
+ return hash(self.micros)
+
+ def __add__(self, other):
+ other = Duration.of(other)
+ return Timestamp(micros=self.micros + other.micros)
+
+ def __radd__(self, other):
+ return self + other
+
+ def __sub__(self, other):
+ other = Duration.of(other)
+ return Timestamp(micros=self.micros - other.micros)
+
+ def __mod__(self, other):
+ other = Duration.of(other)
+ return Duration(micros=self.micros % other.micros)
+
+
+MIN_TIMESTAMP = Timestamp(micros=-sys.maxint - 1)
+MAX_TIMESTAMP = Timestamp(micros=sys.maxint)
+
+
+class Duration(object):
+ """Represents a second duration with microsecond granularity.
+
+ Can be treated in common arithmetic operations as a numeric type.
+
+ Internally stores a time interval as an int of microseconds. This strategy
+ is necessary since floating point values lose precision when storing values,
+ especially after arithmetic operations (for example, 10000000 % 0.1 evaluates
+ to 0.0999999994448885).
+ """
+
+ def __init__(self, seconds=0, micros=0):
+ self.micros = int(seconds * 1000000) + int(micros)
+
+ @staticmethod
+ def of(seconds):
+ """Return the Duration for the given number of seconds since Unix epoch.
+
+ If the input is already a Duration, the input itself will be returned.
+
+ Args:
+ seconds: Number of seconds as int, float or Duration.
+
+ Returns:
+ Corresponding Duration object.
+ """
+
+ if isinstance(seconds, Timestamp):
+ raise TypeError('Can\'t interpret %s as Duration.' % seconds)
+ if isinstance(seconds, Duration):
+ return seconds
+ return Duration(seconds)
+
+ def __repr__(self):
+ micros = self.micros
+ sign = ''
+ if micros < 0:
+ sign = '-'
+ micros = -micros
+ int_part = micros / 1000000
+ frac_part = micros % 1000000
+ if frac_part:
+ return 'Duration(%s%d.%06d)' % (sign, int_part, frac_part)
+ else:
+ return 'Duration(%s%d)' % (sign, int_part)
+
+ def __float__(self):
+ # Note that the returned value may have lost precision.
+ return float(self.micros) / 1000000
+
+ def __int__(self):
+ # Note that the returned value may have lost precision.
+ return self.micros / 1000000
+
+ def __cmp__(self, other):
+ # Allow comparisons between Duration and Timestamp values.
+ if not isinstance(other, Timestamp):
+ other = Duration.of(other)
+ return cmp(self.micros, other.micros)
+
+ def __hash__(self):
+ return hash(self.micros)
+
+ def __neg__(self):
+ return Duration(micros=-self.micros)
+
+ def __add__(self, other):
+ if isinstance(other, Timestamp):
+ return other + self
+ other = Duration.of(other)
+ return Duration(micros=self.micros + other.micros)
+
+ def __radd__(self, other):
+ return self + other
+
+ def __sub__(self, other):
+ other = Duration.of(other)
+ return Duration(micros=self.micros - other.micros)
+
+ def __rsub__(self, other):
+ return -(self - other)
+
+ def __mul__(self, other):
+ other = Duration.of(other)
+ return Duration(micros=self.micros * other.micros / 1000000)
+
+ def __rmul__(self, other):
+ return self * other
+
+ def __mod__(self, other):
+ other = Duration.of(other)
+ return Duration(micros=self.micros % other.micros)
+
+
+class TimeDomain(object):
+ """Time domain for streaming timers."""
+
+ WATERMARK = 'WATERMARK'
+ REAL_TIME = 'REAL_TIME'
+ DEPENDENT_REAL_TIME = 'DEPENDENT_REAL_TIME'
+
+ @staticmethod
+ def from_string(domain):
+ if domain in (TimeDomain.WATERMARK,
+ TimeDomain.REAL_TIME,
+ TimeDomain.DEPENDENT_REAL_TIME):
+ return domain
+ raise ValueError('Unknown time domain: %s' % domain)
+
+
+class OutputTimeFnImpl(object):
+ """Implementation of OutputTimeFn."""
+
+ __metaclass__ = ABCMeta
+
+ @abstractmethod
+ def assign_output_time(self, window, input_timestamp):
+ pass
+
+ @abstractmethod
+ def combine(self, output_timestamp, other_output_timestamp):
+ pass
+
+ def combine_all(self, merging_timestamps):
+ """Apply combine to list of timestamps."""
+ combined_output_time = None
+ for output_time in merging_timestamps:
+ if combined_output_time is None:
+ combined_output_time = output_time
+ else:
+ combined_output_time = self.combine(
+ combined_output_time, output_time)
+ return combined_output_time
+
+ def merge(self, unused_result_window, merging_timestamps):
+ """Default to returning the result of combine_all."""
+ return self.combine_all(merging_timestamps)
+
+
+class DependsOnlyOnWindow(OutputTimeFnImpl):
+ """OutputTimeFnImpl that only depends on the window."""
+
+ __metaclass__ = ABCMeta
+
+ def combine(self, output_timestamp, other_output_timestamp):
+ return output_timestamp
+
+ def merge(self, result_window, unused_merging_timestamps):
+ # Since we know that the result only depends on the window, we can ignore
+ # the given timestamps.
+ return self.assign_output_time(result_window, None)
+
+
+class OutputAtEarliestInputTimestampImpl(OutputTimeFnImpl):
+ """OutputTimeFnImpl outputting at earliest input timestamp."""
+
+ def assign_output_time(self, window, input_timestamp):
+ return input_timestamp
+
+ def combine(self, output_timestamp, other_output_timestamp):
+ """Default to returning the earlier of two timestamps."""
+ return min(output_timestamp, other_output_timestamp)
+
+
+class OutputAtEarliestTransformedInputTimestampImpl(OutputTimeFnImpl):
+ """OutputTimeFnImpl outputting at earliest input timestamp."""
+
+ def __init__(self, window_fn):
+ self.window_fn = window_fn
+
+ def assign_output_time(self, window, input_timestamp):
+ return self.window_fn.get_transformed_output_time(window, input_timestamp)
+
+ def combine(self, output_timestamp, other_output_timestamp):
+ return min(output_timestamp, other_output_timestamp)
+
+
+class OutputAtLatestInputTimestampImpl(OutputTimeFnImpl):
+ """OutputTimeFnImpl outputting at latest input timestamp."""
+
+ def assign_output_time(self, window, input_timestamp):
+ return input_timestamp
+
+ def combine(self, output_timestamp, other_output_timestamp):
+ return max(output_timestamp, other_output_timestamp)
+
+
+class OutputAtEndOfWindowImpl(DependsOnlyOnWindow):
+ """OutputTimeFnImpl outputting at end of window."""
+
+ def assign_output_time(self, window, unused_input_timestamp):
+ return window.end