You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:13:05 UTC

[30/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
new file mode 100644
index 0000000..00b6c8d
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -0,0 +1,1814 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for the PTransform and descendants."""
+
+from __future__ import absolute_import
+
+import operator
+import re
+import unittest
+
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.pipeline import Pipeline
+import google.cloud.dataflow.pvalue as pvalue
+import google.cloud.dataflow.transforms.combiners as combine
+from google.cloud.dataflow.transforms.ptransform import PTransform
+from google.cloud.dataflow.transforms.util import assert_that, equal_to
+import google.cloud.dataflow.typehints as typehints
+from google.cloud.dataflow.typehints import with_input_types
+from google.cloud.dataflow.typehints import with_output_types
+from google.cloud.dataflow.typehints.typehints_test import TypeHintTestCase
+from google.cloud.dataflow.utils.options import PipelineOptions
+from google.cloud.dataflow.utils.options import TypeOptions
+
+
+# Disable frequent lint warning due to pipe operator for chaining transforms.
+# pylint: disable=expression-not-assigned
+
+
+class PTransformTest(unittest.TestCase):
+
+  def assertStartswith(self, msg, prefix):
+    self.assertTrue(msg.startswith(prefix),
+                    '"%s" does not start with "%s"' % (msg, prefix))
+
+  def test_str(self):
+    self.assertEqual('<PTransform(PTransform) label=[PTransform]>',
+                     str(PTransform()))
+
+    pa = Pipeline('DirectPipelineRunner')
+    res = pa | df.Create('a_label', [1, 2])
+    self.assertEqual('<Create(PTransform) label=[a_label]>',
+                     str(res.producer.transform))
+
+    pc = Pipeline('DirectPipelineRunner')
+    res = pc | df.Create('with_inputs', [1, 2])
+    inputs_tr = res.producer.transform
+    inputs_tr.inputs = ('ci',)
+    self.assertEqual(
+        """<Create(PTransform) label=[with_inputs] inputs=('ci',)>""",
+        str(inputs_tr))
+
+    pd = Pipeline('DirectPipelineRunner')
+    res = pd | df.Create('with_sidei', [1, 2])
+    side_tr = res.producer.transform
+    side_tr.side_inputs = (4,)
+    self.assertEqual(
+        '<Create(PTransform) label=[with_sidei] side_inputs=(4,)>',
+        str(side_tr))
+
+    inputs_tr.side_inputs = ('cs',)
+    self.assertEqual(
+        """<Create(PTransform) label=[with_inputs] """
+        """inputs=('ci',) side_inputs=('cs',)>""",
+        str(inputs_tr))
+
+  def test_parse_label_and_arg(self):
+
+    def fun(*args, **kwargs):
+      return PTransform().parse_label_and_arg(args, kwargs, 'name')
+
+    self.assertEqual(('PTransform', 'value'), fun('value'))
+    self.assertEqual(('PTransform', 'value'), fun(name='value'))
+    self.assertEqual(('label', 'value'), fun('label', 'value'))
+    self.assertEqual(('label', 'value'), fun('label', name='value'))
+    self.assertEqual(('label', 'value'), fun('value', label='label'))
+    self.assertEqual(('label', 'value'), fun(name='value', label='label'))
+
+    self.assertRaises(ValueError, fun)
+    self.assertRaises(ValueError, fun, 0, 'value')
+    self.assertRaises(ValueError, fun, label=0, name='value')
+    self.assertRaises(ValueError, fun, other='value')
+
+    with self.assertRaises(ValueError) as cm:
+      fun(0, name='value')
+    self.assertEqual(
+        cm.exception.message,
+        'PTransform expects a (label, name) or (name) argument list '
+        'instead of args=(0,), kwargs={\'name\': \'value\'}')
+
+  def test_do_with_do_fn(self):
+    class AddNDoFn(df.DoFn):
+
+      def process(self, context, addon):
+        return [context.element + addon]
+
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', [1, 2, 3])
+    result = pcoll | df.ParDo('do', AddNDoFn(), 10)
+    assert_that(result, equal_to([11, 12, 13]))
+    pipeline.run()
+
+  def test_do_with_unconstructed_do_fn(self):
+    class MyDoFn(df.DoFn):
+
+      def process(self, context):
+        pass
+
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', [1, 2, 3])
+    with self.assertRaises(ValueError):
+      pcoll | df.ParDo('do', MyDoFn)  # Note the lack of ()'s
+
+  def test_do_with_callable(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', [1, 2, 3])
+    result = pcoll | df.FlatMap('do', lambda x, addon: [x + addon], 10)
+    assert_that(result, equal_to([11, 12, 13]))
+    pipeline.run()
+
+  def test_do_with_side_input_as_arg(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    side = pipeline | df.Create('side', [10])
+    pcoll = pipeline | df.Create('start', [1, 2, 3])
+    result = pcoll | df.FlatMap(
+        'do', lambda x, addon: [x + addon], pvalue.AsSingleton(side))
+    assert_that(result, equal_to([11, 12, 13]))
+    pipeline.run()
+
+  def test_do_with_side_input_as_keyword_arg(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    side = pipeline | df.Create('side', [10])
+    pcoll = pipeline | df.Create('start', [1, 2, 3])
+    result = pcoll | df.FlatMap(
+        'do', lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side))
+    assert_that(result, equal_to([11, 12, 13]))
+    pipeline.run()
+
+  def test_do_with_do_fn_returning_string_raises_warning(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', ['2', '9', '3'])
+    pcoll | df.FlatMap('do', lambda x: x + '1')
+
+    # Since the DoFn directly returns a string we should get an error warning
+    # us.
+    with self.assertRaises(typehints.TypeCheckError) as cm:
+      pipeline.run()
+
+    expected_error_prefix = ('Returning a str from a ParDo or FlatMap '
+                             'is discouraged.')
+    self.assertStartswith(cm.exception.message, expected_error_prefix)
+
+  def test_do_with_do_fn_returning_dict_raises_warning(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', ['2', '9', '3'])
+    pcoll | df.FlatMap('do', lambda x: {x: '1'})
+
+    # Since the DoFn directly returns a dict we should get an error warning
+    # us.
+    with self.assertRaises(typehints.TypeCheckError) as cm:
+      pipeline.run()
+
+    expected_error_prefix = ('Returning a dict from a ParDo or FlatMap '
+                             'is discouraged.')
+    self.assertStartswith(cm.exception.message, expected_error_prefix)
+
+  def test_do_with_side_outputs_maintains_unique_name(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', [1, 2, 3])
+    r1 = pcoll | df.FlatMap('a', lambda x: [x + 1]).with_outputs(main='m')
+    r2 = pcoll | df.FlatMap('b', lambda x: [x + 2]).with_outputs(main='m')
+    assert_that(r1.m, equal_to([2, 3, 4]), label='r1')
+    assert_that(r2.m, equal_to([3, 4, 5]), label='r2')
+    pipeline.run()
+
+  def test_do_requires_do_fn_returning_iterable(self):
+    # This function is incorrect because it returns an object that isn't an
+    # iterable.
+    def incorrect_par_do_fn(x):
+      return x + 5
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', [2, 9, 3])
+    pcoll | df.FlatMap('do', incorrect_par_do_fn)
+    # It's a requirement that all user-defined functions to a ParDo return
+    # an iterable.
+    with self.assertRaises(typehints.TypeCheckError) as cm:
+      pipeline.run()
+
+    expected_error_prefix = 'FlatMap and ParDo must return an iterable.'
+    self.assertStartswith(cm.exception.message, expected_error_prefix)
+
+  def test_do_fn_with_start_finish(self):
+    class MyDoFn(df.DoFn):
+      def start_bundle(self, c):
+        yield 'start'
+      def process(self, c):
+        pass
+      def finish_bundle(self, c):
+        yield 'finish'
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', [1, 2, 3])
+    result = pcoll | df.ParDo('do', MyDoFn())
+
+    # May have many bundles, but each has a start and finish.
+    def  matcher():
+      def match(actual):
+        equal_to(['start', 'finish'])(list(set(actual)))
+        equal_to([actual.count('start')])([actual.count('finish')])
+      return match
+
+    assert_that(result, matcher())
+    pipeline.run()
+
+  def test_filter(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', [1, 2, 3, 4])
+    result = pcoll | df.Filter(
+        'filter', lambda x: x % 2 == 0)
+    assert_that(result, equal_to([2, 4]))
+    pipeline.run()
+
+  class _MeanCombineFn(df.CombineFn):
+
+    def create_accumulator(self):
+      return (0, 0)
+
+    def add_input(self, (sum_, count), element):
+      return sum_ + element, count + 1
+
+    def merge_accumulators(self, accumulators):
+      sums, counts = zip(*accumulators)
+      return sum(sums), sum(counts)
+
+    def extract_output(self, (sum_, count)):
+      if not count:
+        return float('nan')
+      return sum_ / float(count)
+
+  def test_combine_with_combine_fn(self):
+    vals = [1, 2, 3, 4, 5, 6, 7]
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', vals)
+    result = pcoll | df.CombineGlobally('mean', self._MeanCombineFn())
+    assert_that(result, equal_to([sum(vals) / len(vals)]))
+    pipeline.run()
+
+  def test_combine_with_callable(self):
+    vals = [1, 2, 3, 4, 5, 6, 7]
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', vals)
+    result = pcoll | df.CombineGlobally(sum)
+    assert_that(result, equal_to([sum(vals)]))
+    pipeline.run()
+
+  def test_combine_with_side_input_as_arg(self):
+    values = [1, 2, 3, 4, 5, 6, 7]
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', values)
+    divisor = pipeline | df.Create('divisor', [2])
+    result = pcoll | df.CombineGlobally(
+        'max',
+        # Multiples of divisor only.
+        lambda vals, d: max(v for v in vals if v % d == 0),
+        pvalue.AsSingleton(divisor)).without_defaults()
+    filt_vals = [v for v in values if v % 2 == 0]
+    assert_that(result, equal_to([max(filt_vals)]))
+    pipeline.run()
+
+  def test_combine_per_key_with_combine_fn(self):
+    vals_1 = [1, 2, 3, 4, 5, 6, 7]
+    vals_2 = [2, 4, 6, 8, 10, 12, 14]
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', ([('a', x) for x in vals_1] +
+                                           [('b', x) for x in vals_2]))
+    result = pcoll | df.CombinePerKey('mean', self._MeanCombineFn())
+    assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)),
+                                  ('b', sum(vals_2) / len(vals_2))]))
+    pipeline.run()
+
+  def test_combine_per_key_with_callable(self):
+    vals_1 = [1, 2, 3, 4, 5, 6, 7]
+    vals_2 = [2, 4, 6, 8, 10, 12, 14]
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', ([('a', x) for x in vals_1] +
+                                           [('b', x) for x in vals_2]))
+    result = pcoll | df.CombinePerKey(sum)
+    assert_that(result, equal_to([('a', sum(vals_1)), ('b', sum(vals_2))]))
+    pipeline.run()
+
+  def test_combine_per_key_with_side_input_as_arg(self):
+    vals_1 = [1, 2, 3, 4, 5, 6, 7]
+    vals_2 = [2, 4, 6, 8, 10, 12, 14]
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', ([('a', x) for x in vals_1] +
+                                           [('b', x) for x in vals_2]))
+    divisor = pipeline | df.Create('divisor', [2])
+    result = pcoll | df.CombinePerKey(
+        lambda vals, d: max(v for v in vals if v % d == 0),
+        pvalue.AsSingleton(divisor))  # Multiples of divisor only.
+    m_1 = max(v for v in vals_1 if v % 2 == 0)
+    m_2 = max(v for v in vals_2 if v % 2 == 0)
+    assert_that(result, equal_to([('a', m_1), ('b', m_2)]))
+    pipeline.run()
+
+  def test_group_by_key(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create(
+        'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)])
+    result = pcoll | df.GroupByKey('group')
+    assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
+    pipeline.run()
+
+  def test_partition_with_partition_fn(self):
+
+    class SomePartitionFn(df.PartitionFn):
+
+      def partition_for(self, context, num_partitions, offset):
+        return (context.element % 3) + offset
+
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
+    # Attempt nominal partition operation.
+    partitions = pcoll | df.Partition('part1', SomePartitionFn(), 4, 1)
+    assert_that(partitions[0], equal_to([]))
+    assert_that(partitions[1], equal_to([0, 3, 6]), label='p1')
+    assert_that(partitions[2], equal_to([1, 4, 7]), label='p2')
+    assert_that(partitions[3], equal_to([2, 5, 8]), label='p3')
+    pipeline.run()
+
+    # Check that a bad partition label will yield an error. For the
+    # DirectPipelineRunner, this error manifests as an exception.
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
+    partitions = pcoll | df.Partition('part2', SomePartitionFn(), 4, 10000)
+    with self.assertRaises(ValueError):
+      pipeline.run()
+
+  def test_partition_with_callable(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
+    partitions = (
+        pcoll | df.Partition(
+            'part',
+            lambda e, n, offset: (e % 3) + offset, 4,
+            1))
+    assert_that(partitions[0], equal_to([]))
+    assert_that(partitions[1], equal_to([0, 3, 6]), label='p1')
+    assert_that(partitions[2], equal_to([1, 4, 7]), label='p2')
+    assert_that(partitions[3], equal_to([2, 5, 8]), label='p3')
+    pipeline.run()
+
+  def test_partition_followed_by_flatten_and_groupbykey(self):
+    """Regression test for an issue with how partitions are handled."""
+    pipeline = Pipeline('DirectPipelineRunner')
+    contents = [('aa', 1), ('bb', 2), ('aa', 2)]
+    created = pipeline | df.Create('A', contents)
+    partitioned = created | df.Partition('B', lambda x, n: len(x) % n, 3)
+    flattened = partitioned | df.Flatten('C')
+    grouped = flattened | df.GroupByKey('D')
+    assert_that(grouped, equal_to([('aa', [1, 2]), ('bb', [2])]))
+    pipeline.run()
+
+  def test_flatten_pcollections(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll_1 = pipeline | df.Create('start_1', [0, 1, 2, 3])
+    pcoll_2 = pipeline | df.Create('start_2', [4, 5, 6, 7])
+    result = (pcoll_1, pcoll_2) | df.Flatten('flatten')
+    assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))
+    pipeline.run()
+
+  def test_flatten_no_pcollections(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    with self.assertRaises(ValueError):
+      () | df.Flatten('pipeline arg missing')
+    result = () | df.Flatten('empty', pipeline=pipeline)
+    assert_that(result, equal_to([]))
+    pipeline.run()
+
+  def test_flatten_pcollections_in_iterable(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll_1 = pipeline | df.Create('start_1', [0, 1, 2, 3])
+    pcoll_2 = pipeline | df.Create('start_2', [4, 5, 6, 7])
+    result = ([pcoll for pcoll in (pcoll_1, pcoll_2)]
+              | df.Flatten('flatten'))
+    assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))
+    pipeline.run()
+
+  def test_flatten_input_type_must_be_iterable(self):
+    # Inputs to flatten *must* be an iterable.
+    with self.assertRaises(ValueError):
+      4 | df.Flatten('flatten')
+
+  def test_flatten_input_type_must_be_iterable_of_pcolls(self):
+    # Inputs to flatten *must* be an iterable of PCollections.
+    with self.assertRaises(TypeError):
+      {'l': 'test'} | df.Flatten('flatten')
+    with self.assertRaises(TypeError):
+      set([1, 2, 3]) | df.Flatten('flatten')
+
+  def test_co_group_by_key_on_list(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll_1 = pipeline | df.Create(
+        'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
+    pcoll_2 = pipeline | df.Create(
+        'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
+    result = (pcoll_1, pcoll_2) | df.CoGroupByKey('cgbk')
+    assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
+                                  ('b', ([3], [])),
+                                  ('c', ([4], [7, 8]))]))
+    pipeline.run()
+
+  def test_co_group_by_key_on_iterable(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll_1 = pipeline | df.Create(
+        'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
+    pcoll_2 = pipeline | df.Create(
+        'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
+    result = ([pc for pc in (pcoll_1, pcoll_2)]
+              | df.CoGroupByKey('cgbk'))
+    assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
+                                  ('b', ([3], [])),
+                                  ('c', ([4], [7, 8]))]))
+    pipeline.run()
+
+  def test_co_group_by_key_on_dict(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll_1 = pipeline | df.Create(
+        'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
+    pcoll_2 = pipeline | df.Create(
+        'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
+    result = {'X': pcoll_1, 'Y': pcoll_2} | df.CoGroupByKey('cgbk')
+    assert_that(result, equal_to([('a', {'X': [1, 2], 'Y': [5, 6]}),
+                                  ('b', {'X': [3], 'Y': []}),
+                                  ('c', {'X': [4], 'Y': [7, 8]})]))
+    pipeline.run()
+
+  def test_group_by_key_input_must_be_kv_pairs(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcolls = pipeline | df.Create('A', [1, 2, 3, 4, 5])
+
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      pcolls | df.GroupByKey('D')
+      pipeline.run()
+
+    self.assertStartswith(
+        e.exception.message,
+        'Runtime type violation detected within '
+        'ParDo(D/reify_windows): Input to GroupByKey must be '
+        'a PCollection with elements compatible with KV[A, B]')
+
+  def test_group_by_key_only_input_must_be_kv_pairs(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcolls = pipeline | df.Create('A', ['a', 'b', 'f'])
+    with self.assertRaises(typehints.TypeCheckError) as cm:
+      pcolls | df.GroupByKeyOnly('D')
+      pipeline.run()
+
+    expected_error_prefix = ('Input to GroupByKeyOnly must be a PCollection of '
+                             'windowed key-value pairs.')
+    self.assertStartswith(cm.exception.message, expected_error_prefix)
+
+  def test_keys_and_values(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create(
+        'start', [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)])
+    keys = pcoll.apply('keys', df.Keys())
+    vals = pcoll.apply('vals', df.Values())
+    assert_that(keys, equal_to([1, 2, 2, 3, 3, 3]), label='assert:keys')
+    assert_that(vals, equal_to([1, 1, 1, 2, 2, 3]), label='assert:vals')
+    pipeline.run()
+
+  def test_kv_swap(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create(
+        'start', [(6, 3), (1, 2), (7, 1), (5, 2), (3, 2)])
+    result = pcoll.apply('swap', df.KvSwap())
+    assert_that(result, equal_to([(1, 7), (2, 1), (2, 3), (2, 5), (3, 6)]))
+    pipeline.run()
+
+  def test_remove_duplicates(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create(
+        'start', [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
+    result = pcoll.apply('nodupes', df.RemoveDuplicates())
+    assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
+    pipeline.run()
+
+  def test_chained_ptransforms(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    t = (df.Map(lambda x: (x, 1))
+         | df.GroupByKey()
+         | df.Map(lambda (x, ones): (x, sum(ones))))
+    result = pipeline | df.Create('start', ['a', 'a', 'b']) | t
+    assert_that(result, equal_to([('a', 2), ('b', 1)]))
+    pipeline.run()
+
+  def test_apply_to_list(self):
+    self.assertEqual([1, 2, 3], [0, 1, 2] | df.Map('add_one', lambda x: x + 1))
+    self.assertEqual([1], [0, 1, 2] | df.Filter('odd', lambda x: x % 2))
+    self.assertEqual([1, 2, 3, 100],
+                     ([1, 2, 3], [100]) | df.Flatten('flat'))
+    join_input = ([('k', 'a')],
+                  [('k', 'b'), ('k', 'c')])
+    self.assertEqual([('k', (['a'], ['b', 'c']))],
+                     join_input | df.CoGroupByKey('join'))
+
+  def test_multi_input_ptransform(self):
+    class DisjointUnion(PTransform):
+      def apply(self, pcollections):
+        return (pcollections
+                | df.Flatten()
+                | df.Map(lambda x: (x, None))
+                | df.GroupByKey()
+                | df.Map(lambda (x, _): x))
+    self.assertEqual([1, 2, 3], sorted(([1, 2], [2, 3]) | DisjointUnion()))
+
+  def test_apply_to_crazy_pvaluish(self):
+    class NestedFlatten(PTransform):
+      """A PTransform taking and returning nested PValueish.
+
+      Takes as input a list of dicts, and returns a dict with the corresponding
+      values flattened.
+      """
+      def _extract_input_pvalues(self, pvalueish):
+        pvalueish = list(pvalueish)
+        return pvalueish, sum([list(p.values()) for p in pvalueish], [])
+      def apply(self, pcoll_dicts):
+        keys = reduce(operator.or_, [set(p.keys()) for p in pcoll_dicts])
+        res = {}
+        for k in keys:
+          res[k] = [p[k] for p in pcoll_dicts if k in p] | df.Flatten(k)
+        return res
+    res = [{'a': [1, 2, 3]},
+           {'a': [4, 5, 6], 'b': ['x', 'y', 'z']},
+           {'a': [7, 8], 'b': ['x', 'y'], 'c': []}] | NestedFlatten()
+    self.assertEqual(3, len(res))
+    self.assertEqual([1, 2, 3, 4, 5, 6, 7, 8], sorted(res['a']))
+    self.assertEqual(['x', 'x', 'y', 'y', 'z'], sorted(res['b']))
+    self.assertEqual([], sorted(res['c']))
+
+@df.ptransform_fn
+def SamplePTransform(label, pcoll, context, *args, **kwargs):
+  """Sample transform using the @ptransform_fn decorator."""
+  _ = label, args, kwargs
+  map_transform = df.Map('ToPairs', lambda v: (v, None))
+  combine_transform = df.CombinePerKey('Group', lambda vs: None)
+  keys_transform = df.Keys('RemoveDuplicates')
+  context.extend([map_transform, combine_transform, keys_transform])
+  return pcoll | map_transform | combine_transform | keys_transform
+
+
+class PTransformLabelsTest(unittest.TestCase):
+
+  class CustomTransform(df.PTransform):
+
+    pardo = None
+
+    def apply(self, pcoll):
+      self.pardo = df.FlatMap('*do*', lambda x: [x + 1])
+      return pcoll | self.pardo
+
+  def test_chained_ptransforms(self):
+    """Tests that chaining gets proper nesting."""
+    pipeline = Pipeline('DirectPipelineRunner')
+    map1 = df.Map('map1', lambda x: (x, 1))
+    gbk = df.GroupByKey('gbk')
+    map2 = df.Map('map2', lambda (x, ones): (x, sum(ones)))
+    t = (map1 | gbk | map2)
+    result = pipeline | df.Create('start', ['a', 'a', 'b']) | t
+    self.assertTrue('map1|gbk|map2/map1' in pipeline.applied_labels)
+    self.assertTrue('map1|gbk|map2/gbk' in pipeline.applied_labels)
+    self.assertTrue('map1|gbk|map2/map2' in pipeline.applied_labels)
+    assert_that(result, equal_to([('a', 2), ('b', 1)]))
+    pipeline.run()
+
+  def test_apply_custom_transform_without_label(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('pcoll', [1, 2, 3])
+    custom = PTransformLabelsTest.CustomTransform()
+    result = pipeline.apply(custom, pcoll)
+    self.assertTrue('CustomTransform' in pipeline.applied_labels)
+    self.assertTrue('CustomTransform/*do*' in pipeline.applied_labels)
+    assert_that(result, equal_to([2, 3, 4]))
+    pipeline.run()
+
+  def test_apply_custom_transform_with_label(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('pcoll', [1, 2, 3])
+    custom = PTransformLabelsTest.CustomTransform('*custom*')
+    result = pipeline.apply(custom, pcoll)
+    self.assertTrue('*custom*' in pipeline.applied_labels)
+    self.assertTrue('*custom*/*do*' in pipeline.applied_labels)
+    assert_that(result, equal_to([2, 3, 4]))
+    pipeline.run()
+
+  def test_combine_without_label(self):
+    vals = [1, 2, 3, 4, 5, 6, 7]
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', vals)
+    combine = df.CombineGlobally(sum)
+    result = pcoll | combine
+    self.assertTrue('CombineGlobally(sum)' in pipeline.applied_labels)
+    assert_that(result, equal_to([sum(vals)]))
+    pipeline.run()
+
+  def test_apply_ptransform_using_decorator(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('pcoll', [1, 2, 3])
+    context = []
+    sample = SamplePTransform('*sample*', context)
+    _ = pcoll | sample
+    self.assertTrue('*sample*' in pipeline.applied_labels)
+    self.assertTrue('*sample*/ToPairs' in pipeline.applied_labels)
+    self.assertTrue('*sample*/Group' in pipeline.applied_labels)
+    self.assertTrue('*sample*/RemoveDuplicates' in pipeline.applied_labels)
+
+  def test_combine_with_label(self):
+    vals = [1, 2, 3, 4, 5, 6, 7]
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | df.Create('start', vals)
+    combine = df.CombineGlobally('*sum*', sum)
+    result = pcoll | combine
+    self.assertTrue('*sum*' in pipeline.applied_labels)
+    assert_that(result, equal_to([sum(vals)]))
+    pipeline.run()
+
+  def check_label(self, ptransform, expected_label):
+    pipeline = Pipeline('DirectPipelineRunner')
+    pipeline | df.Create('start', [('a', 1)]) | ptransform
+    actual_label = sorted(pipeline.applied_labels - {'start'})[0]
+    self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label))
+
+  def test_default_labels(self):
+    self.check_label(df.Map(len), r'Map(len)')
+    self.check_label(df.Map(lambda x: x),
+                     r'Map(<lambda at ptransform_test.py:#>)')
+    self.check_label(df.FlatMap(list), r'FlatMap(list)')
+    self.check_label(df.Filter(sum), r'Filter(sum)')
+    self.check_label(df.CombineGlobally(sum), r'CombineGlobally(sum)')
+    self.check_label(df.CombinePerKey(sum), r'CombinePerKey(sum)')
+
+    class MyDoFn(df.DoFn):
+      def process(self, context):
+        pass
+
+    self.check_label(df.ParDo(MyDoFn()), r'ParDo(MyDoFn)')
+
+
+class PTransformTypeCheckTestCase(TypeHintTestCase):
+
+  def assertStartswith(self, msg, prefix):
+    self.assertTrue(msg.startswith(prefix),
+                    '"%s" does not start with "%s"' % (msg, prefix))
+
+  def setUp(self):
+    self.p = Pipeline(options=PipelineOptions([]))
+
+  def test_do_fn_pipeline_pipeline_type_check_satisfied(self):
+    @with_input_types(int, int)
+    @with_output_types(typehints.List[int])
+    class AddWithFive(df.DoFn):
+      def process(self, context, five):
+        return [context.element + five]
+
+    d = (self.p
+         | df.Create('t', [1, 2, 3]).with_output_types(int)
+         | df.ParDo('add', AddWithFive(), 5))
+
+    assert_that(d, equal_to([6, 7, 8]))
+    self.p.run()
+
+  def test_do_fn_pipeline_pipeline_type_check_violated(self):
+    @with_input_types(str, str)
+    @with_output_types(typehints.List[str])
+    class ToUpperCaseWithPrefix(df.DoFn):
+      def process(self, context, prefix):
+        return [prefix + context.element.upper()]
+
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      d = (self.p
+           | df.Create('t', [1, 2, 3]).with_output_types(int)
+           | df.ParDo('upper', ToUpperCaseWithPrefix(), 'hello'))
+
+    self.assertEqual("Type hint violation for 'upper': "
+                     "requires <type 'str'> but got <type 'int'> for context",
+                     e.exception.message)
+
+  def test_do_fn_pipeline_runtime_type_check_satisfied(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    @with_input_types(int, int)
+    @with_output_types(int)
+    class AddWithNum(df.DoFn):
+      def process(self, context, num):
+        return [context.element + num]
+
+    d = (self.p
+         | df.Create('t', [1, 2, 3]).with_output_types(int)
+         | df.ParDo('add', AddWithNum(), 5))
+
+    assert_that(d, equal_to([6, 7, 8]))
+    self.p.run()
+
+  def test_do_fn_pipeline_runtime_type_check_violated(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    @with_input_types(int, int)
+    @with_output_types(typehints.List[int])
+    class AddWithNum(df.DoFn):
+      def process(self, context, num):
+        return [context.element + num]
+
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      d = (self.p
+           | df.Create('t', ['1', '2', '3']).with_output_types(str)
+           | df.ParDo('add', AddWithNum(), 5))
+      self.p.run()
+
+    self.assertEqual("Type hint violation for 'add': "
+                     "requires <type 'int'> but got <type 'str'> for context",
+                     e.exception.message)
+
+  def test_pardo_does_not_type_check_using_type_hint_decorators(self):
+    @with_input_types(a=int)
+    @with_output_types(typehints.List[str])
+    def int_to_str(a):
+      return [str(a)]
+
+    # The function above is expecting an int for its only parameter. However, it
+    # will receive a str instead, which should result in a raised exception.
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      c = (self.p
+           | df.Create('s', ['b', 'a', 'r']).with_output_types(str)
+           | df.FlatMap('to str', int_to_str))
+
+    self.assertEqual("Type hint violation for 'to str': "
+                     "requires <type 'int'> but got <type 'str'> for a",
+                     e.exception.message)
+
+  def test_pardo_properly_type_checks_using_type_hint_decorators(self):
+    @with_input_types(a=str)
+    @with_output_types(typehints.List[str])
+    def to_all_upper_case(a):
+      return [a.upper()]
+
+    # If this type-checks than no error should be raised.
+    d = (self.p
+         | df.Create('t', ['t', 'e', 's', 't']).with_output_types(str)
+         | df.FlatMap('case', to_all_upper_case))
+    assert_that(d, equal_to(['T', 'E', 'S', 'T']))
+    self.p.run()
+
+    # Output type should have been recognized as 'str' rather than List[str] to
+    # do the flatten part of FlatMap.
+    self.assertEqual(str, d.element_type)
+
+  def test_pardo_does_not_type_check_using_type_hint_methods(self):
+    # The first ParDo outputs pcoll's of type int, however the second ParDo is
+    # expecting pcoll's of type str instead.
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
+       | (df.FlatMap('score', lambda x: [1] if x == 't' else [2])
+          .with_input_types(str).with_output_types(int))
+       | (df.FlatMap('upper', lambda x: [x.upper()])
+          .with_input_types(str).with_output_types(str)))
+
+    self.assertEqual("Type hint violation for 'upper': "
+                     "requires <type 'str'> but got <type 'int'> for x",
+                     e.exception.message)
+
+  def test_pardo_properly_type_checks_using_type_hint_methods(self):
+    # Pipeline should be created successfully without an error
+    d = (self.p
+         | df.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
+         | df.FlatMap('dup', lambda x: [x + x])
+            .with_input_types(str).with_output_types(str)
+         | df.FlatMap('upper', lambda x: [x.upper()])
+            .with_input_types(str).with_output_types(str))
+
+    assert_that(d, equal_to(['TT', 'EE', 'SS', 'TT']))
+    self.p.run()
+
+  def test_map_does_not_type_check_using_type_hints_methods(self):
+    # The transform before 'Map' has indicated that it outputs PCollections with
+    # int's, while Map is expecting one of str.
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      d = (self.p
+           | df.Create('s', [1, 2, 3, 4]).with_output_types(int)
+           | df.Map('upper', lambda x: x.upper()).with_input_types(str).with_output_types(str))
+
+    self.assertEqual("Type hint violation for 'upper': "
+                     "requires <type 'str'> but got <type 'int'> for x",
+                     e.exception.message)
+
+  def test_map_properly_type_checks_using_type_hints_methods(self):
+    # No error should be raised if this type-checks properly.
+    d = (self.p
+         | df.Create('s', [1, 2, 3, 4]).with_output_types(int)
+         | df.Map('to_str', lambda x: str(x)).with_input_types(int).with_output_types(str))
+    assert_that(d, equal_to(['1', '2', '3', '4']))
+    self.p.run()
+
+  def test_map_does_not_type_check_using_type_hints_decorator(self):
+    @with_input_types(s=str)
+    @with_output_types(str)
+    def upper(s):
+      return s.upper()
+
+    # Hinted function above expects a str at pipeline construction.
+    # However, 'Map' should detect that Create has hinted an int instead.
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      d = (self.p
+           | df.Create('s', [1, 2, 3, 4]).with_output_types(int)
+           | df.Map('upper', upper))
+
+    self.assertEqual("Type hint violation for 'upper': "
+                     "requires <type 'str'> but got <type 'int'> for s",
+                     e.exception.message)
+
+  def test_map_properly_type_checks_using_type_hints_decorator(self):
+    @with_input_types(a=bool)
+    @with_output_types(int)
+    def bool_to_int(a):
+      return int(a)
+
+    # If this type-checks than no error should be raised.
+    d = (self.p
+         | df.Create('bools', [True, False, True]).with_output_types(bool)
+         | df.Map('to_ints', bool_to_int))
+    assert_that(d, equal_to([1, 0, 1]))
+    self.p.run()
+
+  def test_filter_does_not_type_check_using_type_hints_method(self):
+    # Filter is expecting an int but instead looks to the 'left' and sees a str
+    # incoming.
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('strs', ['1', '2', '3', '4', '5']).with_output_types(str)
+       | df.Map('lower', lambda x: x.lower()).with_input_types(str).with_output_types(str)
+       | df.Filter('below 3', lambda x: x < 3).with_input_types(int))
+
+    self.assertEqual("Type hint violation for 'below 3': "
+                     "requires <type 'int'> but got <type 'str'> for x",
+                     e.exception.message)
+
+  def test_filter_type_checks_using_type_hints_method(self):
+    # No error should be raised if this type-checks properly.
+    d = (self.p
+         | df.Create('strs', ['1', '2', '3', '4', '5']).with_output_types(str)
+         | df.Map('to int', lambda x: int(x)).with_input_types(str).with_output_types(int)
+         | df.Filter('below 3', lambda x: x < 3).with_input_types(int))
+    assert_that(d, equal_to([1, 2]))
+    self.p.run()
+
+  def test_filter_does_not_type_check_using_type_hints_decorator(self):
+    @with_input_types(a=float)
+    def more_than_half(a):
+      return a > 0.50
+
+    # Func above was hinted to only take a float, yet an int will be passed.
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      d = (self.p
+           | df.Create('ints', [1, 2, 3, 4]).with_output_types(int)
+           | df.Filter('half', more_than_half))
+
+    self.assertEqual("Type hint violation for 'half': "
+                     "requires <type 'float'> but got <type 'int'> for a",
+                     e.exception.message)
+
+  def test_filter_type_checks_using_type_hints_decorator(self):
+    @with_input_types(b=int)
+    def half(b):
+      import random
+      return bool(random.choice([0, 1]))
+
+    # Filter should deduce that it returns the same type that it takes.
+    (self.p
+     | df.Create('str', range(5)).with_output_types(int)
+     | df.Filter('half', half)
+     | df.Map('to bool', lambda x: bool(x)).with_input_types(int).with_output_types(bool))
+
+  def test_group_by_key_only_output_type_deduction(self):
+    d = (self.p
+         | df.Create('str', ['t', 'e', 's', 't']).with_output_types(str)
+         | (df.Map('pair', lambda x: (x, ord(x)))
+            .with_output_types(typehints.KV[str, str]))
+         | df.GroupByKeyOnly('O'))
+
+    # Output type should correctly be deduced.
+    # GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
+    self.assertCompatible(typehints.KV[str, typehints.Iterable[str]],
+                          d.element_type)
+
+  def test_group_by_key_output_type_deduction(self):
+    d = (self.p
+         | df.Create('str', range(20)).with_output_types(int)
+         | (df.Map('pair negative', lambda x: (x % 5, -x))
+            .with_output_types(typehints.KV[int, int]))
+         | df.GroupByKey('T'))
+
+    # Output type should correctly be deduced.
+    # GBK should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
+    self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
+                          d.element_type)
+
+  def test_group_by_key_only_does_not_type_check(self):
+    # GBK will be passed raw int's here instead of some form of KV[A, B].
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      d = (self.p
+           | df.Create('s', [1, 2, 3]).with_output_types(int)
+           | df.GroupByKeyOnly('F'))
+
+    self.assertEqual("Input type hint violation at F: "
+                     "expected Tuple[TypeVariable[K], TypeVariable[V]], "
+                     "got <type 'int'>",
+                     e.exception.message)
+
+  def test_group_by_does_not_type_check(self):
+    # Create is returning a List[int, str], rather than a KV[int, str] that is
+    # aliased to Tuple[int, str].
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      d = (self.p
+           | (df.Create('s', range(5))
+              .with_output_types(typehints.Iterable[int]))
+           | df.GroupByKey('T'))
+
+    self.assertEqual("Input type hint violation at T: "
+                     "expected Tuple[TypeVariable[K], TypeVariable[V]], "
+                     "got Iterable[int]",
+                     e.exception.message)
+
+  def test_pipeline_checking_pardo_insufficient_type_information(self):
+    self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
+
+    # Type checking is enabled, but 'Create' doesn't pass on any relevant type
+    # information to the ParDo.
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('nums', range(5))
+       | df.FlatMap('mod dup', lambda x: (x % 2, x)))
+
+    self.assertEqual('Pipeline type checking is enabled, however no output '
+                     'type-hint was found for the PTransform Create(nums)',
+                     e.exception.message)
+
+  def test_pipeline_checking_gbk_insufficient_type_information(self):
+    self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
+    # Type checking is enabled, but 'Map' doesn't pass on any relevant type
+    # information to GBK-only.
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('nums', range(5)).with_output_types(int)
+       | df.Map('mod dup', lambda x: (x % 2, x))
+       | df.GroupByKeyOnly('G'))
+
+    self.assertEqual('Pipeline type checking is enabled, however no output '
+                     'type-hint was found for the PTransform '
+                     'ParDo(mod dup)',
+                     e.exception.message)
+
+  def test_disable_pipeline_type_check(self):
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+
+    # The pipeline below should raise a TypeError, however pipeline type
+    # checking was disabled above.
+    (self.p
+     | df.Create('t', [1, 2, 3]).with_output_types(int)
+     | df.Map('lower', lambda x: x.lower()).with_input_types(str).with_output_types(str))
+
+  def test_run_time_type_checking_enabled_type_violation(self):
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    @with_output_types(str)
+    @with_input_types(x=int)
+    def int_to_string(x):
+      return str(x)
+
+    # Function above has been type-hinted to only accept an int. But in the
+    # pipeline execution it'll be passed a string due to the output of Create.
+    (self.p
+     | df.Create('t', ['some_string'])
+     | df.Map('to str', int_to_string))
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      self.p.run()
+
+    self.assertStartswith(
+        e.exception.message,
+        "Runtime type violation detected within ParDo(to str): "
+        "Type-hint for argument: 'x' violated. "
+        "Expected an instance of <type 'int'>, "
+        "instead found some_string, an instance of <type 'str'>.")
+
+  def test_run_time_type_checking_enabled_types_satisfied(self):
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    @with_output_types(typehints.KV[int, str])
+    @with_input_types(x=str)
+    def group_with_upper_ord(x):
+      return (ord(x.upper()) % 5, x)
+
+    # Pipeline checking is off, but the above function should satisfy types at
+    # run-time.
+    result = (self.p
+              | df.Create('t', ['t', 'e', 's', 't', 'i', 'n', 'g']).with_output_types(str)
+              | df.Map('gen keys', group_with_upper_ord)
+              | df.GroupByKey('O'))
+
+    assert_that(result, equal_to([(1, ['g']),
+                                  (3, ['s', 'i', 'n']),
+                                  (4, ['t', 'e', 't'])]))
+    self.p.run()
+
+  def test_pipeline_checking_satisfied_but_run_time_types_violate(self):
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    @with_output_types(typehints.KV[bool, int])
+    @with_input_types(a=int)
+    def is_even_as_key(a):
+      # Simulate a programming error, should be: return (a % 2 == 0, a)
+      # However this returns KV[int, int]
+      return (a % 2, a)
+
+    (self.p
+     | df.Create('nums', range(5)).with_output_types(int)
+     | df.Map('is even', is_even_as_key)
+     | df.GroupByKey('parity'))
+
+    # Although all the types appear to be correct when checked at pipeline
+    # construction. Runtime type-checking should detect the 'is_even_as_key' is
+    # returning Tuple[int, int], instead of Tuple[bool, int].
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      self.p.run()
+
+    self.assertStartswith(
+        e.exception.message,
+        "Runtime type violation detected within ParDo(is even): "
+        "Tuple[bool, int] hint type-constraint violated. "
+        "The type of element #0 in the passed tuple is incorrect. "
+        "Expected an instance of type bool, "
+        "instead received an instance of type int.")
+
+  def test_pipeline_checking_satisfied_run_time_checking_satisfied(self):
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+
+    @with_output_types(typehints.KV[bool, int])
+    @with_input_types(a=int)
+    def is_even_as_key(a):
+      # The programming error in the above test-case has now been fixed.
+      # Everything should properly type-check.
+      return (a % 2 == 0, a)
+
+    result = (self.p
+              | df.Create('nums', range(5)).with_output_types(int)
+              | df.Map('is even', is_even_as_key)
+              | df.GroupByKey('parity'))
+
+    assert_that(result, equal_to([(False, [1, 3]), (True, [0, 2, 4])]))
+    self.p.run()
+
+  def test_pipeline_runtime_checking_violation_simple_type_input(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+
+    # The type-hinted applied via the 'with_input_types()' method indicates the
+    # ParDo should receive an instance of type 'str', however an 'int' will be
+    # passed instead.
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p | df.Create('n', [1, 2, 3])
+       | (df.FlatMap('to int', lambda x: [int(x)])
+          .with_input_types(str).with_output_types(int))
+      )
+      self.p.run()
+
+    self.assertStartswith(
+        e.exception.message,
+        "Runtime type violation detected within ParDo(to int): "
+        "Type-hint for argument: 'x' violated. "
+        "Expected an instance of <type 'str'>, "
+        "instead found 1, an instance of <type 'int'>.")
+
+  def test_pipeline_runtime_checking_violation_composite_type_input(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('n', [(1, 3.0), (2, 4.9), (3, 9.5)])
+       | (df.FlatMap('add', lambda (x, y): [x + y])
+          .with_input_types(typehints.Tuple[int, int]).with_output_types(int))
+      )
+      self.p.run()
+
+    self.assertStartswith(
+        e.exception.message,
+        "Runtime type violation detected within ParDo(add): "
+        "Type-hint for argument: 'y' violated. "
+        "Expected an instance of <type 'int'>, "
+        "instead found 3.0, an instance of <type 'float'>.")
+
+  def test_pipeline_runtime_checking_violation_simple_type_output(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+
+    # The type-hinted applied via the 'returns()' method indicates the ParDo
+    # should output an instance of type 'int', however a 'float' will be
+    # generated instead.
+    print "HINTS", df.FlatMap(
+        'to int',
+        lambda x: [float(x)]).with_input_types(int).with_output_types(
+            int).get_type_hints()
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p | df.Create('n', [1, 2, 3])
+       | (df.FlatMap('to int', lambda x: [float(x)])
+          .with_input_types(int).with_output_types(int))
+      )
+      self.p.run()
+
+    self.assertStartswith(
+        e.exception.message,
+        "Runtime type violation detected within "
+        "ParDo(to int): "
+        "According to type-hint expected output should be "
+        "of type <type 'int'>. Instead, received '1.0', "
+        "an instance of type <type 'float'>.")
+
+  def test_pipeline_runtime_checking_violation_composite_type_output(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+
+    # The type-hinted applied via the 'returns()' method indicates the ParDo
+    # should return an instance of type: Tuple[float, int]. However, an instance
+    # of 'int' will be generated instead.
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('n', [(1, 3.0), (2, 4.9), (3, 9.5)])
+       | (df.FlatMap('swap', lambda (x, y): [x + y])
+          .with_input_types(typehints.Tuple[int, float])
+          .with_output_types(typehints.Tuple[float, int]))
+      )
+      self.p.run()
+
+    self.assertStartswith(
+        e.exception.message,
+        "Runtime type violation detected within "
+        "ParDo(swap): Tuple type constraint violated. "
+        "Valid object instance must be of type 'tuple'. Instead, "
+        "an instance of 'float' was received.")
+
+  def test_pipline_runtime_checking_violation_with_side_inputs_decorator(self):
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    @with_output_types(int)
+    @with_input_types(a=int, b=int)
+    def add(a, b):
+      return a + b
+
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p | df.Create('t', [1, 2, 3, 4]) | df.Map('add 1', add, 1.0))
+      self.p.run()
+
+    self.assertStartswith(
+        e.exception.message,
+        "Runtime type violation detected within ParDo(add 1): "
+        "Type-hint for argument: 'b' violated. "
+        "Expected an instance of <type 'int'>, "
+        "instead found 1.0, an instance of <type 'float'>.")
+
+  def test_pipline_runtime_checking_violation_with_side_inputs_via_method(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('t', [1, 2, 3, 4])
+       | (df.Map('add 1', lambda x, one: x + one, 1.0)
+          .with_input_types(int, int)
+          .with_output_types(float)))
+      self.p.run()
+
+    self.assertStartswith(
+        e.exception.message,
+        "Runtime type violation detected within ParDo(add 1): "
+        "Type-hint for argument: 'one' violated. "
+        "Expected an instance of <type 'int'>, "
+        "instead found 1.0, an instance of <type 'float'>.")
+
+  def test_combine_properly_pipeline_type_checks_using_decorator(self):
+    @with_output_types(int)
+    @with_input_types(ints=typehints.Iterable[int])
+    def sum_ints(ints):
+      return sum(ints)
+
+    d = (self.p
+         | df.Create('t', [1, 2, 3]).with_output_types(int)
+         | df.CombineGlobally('sum', sum_ints))
+
+    self.assertEqual(int, d.element_type)
+    assert_that(d, equal_to([6]))
+    self.p.run()
+
+  def test_combine_func_type_hint_does_not_take_iterable_using_decorator(self):
+    @with_output_types(int)
+    @with_input_types(a=int)
+    def bad_combine(a):
+      5 + a
+
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('m', [1, 2, 3]).with_output_types(int)
+       | df.CombineGlobally('add', bad_combine))
+
+    self.assertEqual(
+        "All functions for a Combine PTransform must accept a "
+         "single argument compatible with: Iterable[Any]. "
+         "Instead a function with input type: <type 'int'> was received.",
+         e.exception.message)
+
+  def test_combine_pipeline_type_propagation_using_decorators(self):
+    @with_output_types(int)
+    @with_input_types(ints=typehints.Iterable[int])
+    def sum_ints(ints):
+      return sum(ints)
+
+    @with_output_types(typehints.List[int])
+    @with_input_types(n=int)
+    def range_from_zero(n):
+      return list(range(n+1))
+
+    d = (self.p
+         | df.Create('t', [1, 2, 3]).with_output_types(int)
+         | df.CombineGlobally('sum', sum_ints)
+         | df.ParDo('range', range_from_zero))
+
+    self.assertEqual(int, d.element_type)
+    assert_that(d, equal_to([0, 1, 2, 3, 4, 5, 6]))
+    self.p.run()
+
+  def test_combine_runtime_type_check_satisfied_using_decorators(self):
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+
+    @with_output_types(int)
+    @with_input_types(ints=typehints.Iterable[int])
+    def iter_mul(ints):
+      return reduce(operator.mul, ints, 1)
+
+    d = (self.p
+         | df.Create('k', [5, 5, 5, 5]).with_output_types(int)
+         | df.CombineGlobally('mul', iter_mul))
+
+    assert_that(d, equal_to([625]))
+    self.p.run()
+
+  def test_combine_runtime_type_check_violation_using_decorators(self):
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    # Combine fn is returning the incorrect type
+    @with_output_types(int)
+    @with_input_types(ints=typehints.Iterable[int])
+    def iter_mul(ints):
+      return str(reduce(operator.mul, ints, 1))
+
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('k', [5, 5, 5, 5]).with_output_types(int)
+       | df.CombineGlobally('mul', iter_mul))
+      self.p.run()
+
+    self.assertStartswith(
+        e.exception.message,
+        "Runtime type violation detected within "
+        "ParDo(mul/CombinePerKey/Combine/ParDo(CombineValuesDoFn)): "
+        "Tuple[TypeVariable[K], int] hint type-constraint violated. "
+        "The type of element #1 in the passed tuple is incorrect. "
+        "Expected an instance of type int, "
+        "instead received an instance of type str.")
+
+  def test_combine_pipeline_type_check_using_methods(self):
+    d = (self.p
+         | df.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
+         | (df.CombineGlobally('concat', lambda s: ''.join(s))
+            .with_input_types(str).with_output_types(str)))
+
+    def matcher(expected):
+      def match(actual):
+        equal_to(expected)(list(actual[0]))
+      return match
+    assert_that(d, matcher('estt'))
+    self.p.run()
+
+  def test_combine_runtime_type_check_using_methods(self):
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    d = (self.p
+         | df.Create('s', range(5)).with_output_types(int)
+         | (df.CombineGlobally('sum', lambda s: sum(s))
+            .with_input_types(int).with_output_types(int)))
+
+    assert_that(d, equal_to([10]))
+    self.p.run()
+
+  def test_combine_pipeline_type_check_violation_using_methods(self):
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('e', range(3)).with_output_types(int)
+       | (df.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
+          .with_input_types(str).with_output_types(str)))
+
+    self.assertEqual("Input type hint violation at sort join: "
+                     "expected <type 'str'>, got <type 'int'>",
+                     e.exception.message)
+
+  def test_combine_runtime_type_check_violation_using_methods(self):
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('e', range(3)).with_output_types(int)
+       | (df.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
+          .with_input_types(str).with_output_types(str)))
+      self.p.run()
+
+    self.assertStartswith(
+        e.exception.message,
+        "Runtime type violation detected within "
+        "ParDo(sort join/KeyWithVoid): "
+        "Type-hint for argument: 'v' violated. "
+        "Expected an instance of <type 'str'>, "
+        "instead found 0, an instance of <type 'int'>.")
+
+  def test_combine_insufficient_type_hint_information(self):
+    self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
+
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('e', range(3)).with_output_types(int)
+       | df.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
+       | df.Map('f', lambda x: x + 1))
+
+    self.assertEqual(
+        'Pipeline type checking is enabled, '
+        'however no output type-hint was found for the PTransform '
+        'ParDo(sort join/CombinePerKey/Combine/ParDo(CombineValuesDoFn))',
+        e.exception.message)
+
+  def test_mean_globally_pipeline_checking_satisfied(self):
+    d = (self.p
+         | df.Create('c', range(5)).with_output_types(int)
+         | combine.Mean.Globally('mean'))
+
+    self.assertTrue(d.element_type is float)
+    assert_that(d, equal_to([2.0]))
+    self.p.run()
+
+  def test_mean_globally_pipeline_checking_violated(self):
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      d = (self.p
+           | df.Create('c', ['test']).with_output_types(str)
+           | combine.Mean.Globally('mean'))
+
+    self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
+                     "requires Tuple[TypeVariable[K], "
+                     "Iterable[Union[float, int, long]]] "
+                     "but got Tuple[None, Iterable[str]] for p_context",
+                     e.exception.message)
+
+  def test_mean_globally_runtime_checking_satisfied(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    d = (self.p
+         | df.Create('c', range(5)).with_output_types(int)
+         | combine.Mean.Globally('mean'))
+
+    self.assertTrue(d.element_type is float)
+    assert_that(d, equal_to([2.0]))
+    self.p.run()
+
+  def test_mean_globally_runtime_checking_violated(self):
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('c', ['t', 'e', 's', 't']).with_output_types(str)
+       | combine.Mean.Globally('mean'))
+      self.p.run()
+      self.assertEqual("Runtime type violation detected for transform input "
+                       "when executing ParDoFlatMap(Combine): Tuple[Any, "
+                       "Iterable[Union[int, float]]] hint type-constraint "
+                       "violated. The type of element #1 in the passed tuple "
+                       "is incorrect. Iterable[Union[int, float]] hint "
+                       "type-constraint violated. The type of element #0 in "
+                       "the passed Iterable is incorrect: Union[int, float] "
+                       "type-constraint violated. Expected an instance of one "
+                       "of: ('int', 'float'), received str instead.",
+                       e.exception.message)
+
+  def test_mean_per_key_pipeline_checking_satisfied(self):
+    d = (self.p
+         | df.Create('c', range(5)).with_output_types(int)
+         | (df.Map('even group', lambda x: (not x % 2, x))
+            .with_output_types(typehints.KV[bool, int]))
+         | combine.Mean.PerKey('even mean'))
+
+    self.assertCompatible(typehints.KV[bool, float], d.element_type)
+    assert_that(d, equal_to([(False, 2.0), (True, 2.0)]))
+    self.p.run()
+
+  def test_mean_per_key_pipeline_checking_violated(self):
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('e', map(str, range(5))).with_output_types(str)
+       | (df.Map('upper pair', lambda x: (x.upper(), x))
+          .with_output_types(typehints.KV[str, str]))
+       | combine.Mean.PerKey('even mean'))
+      self.p.run()
+
+    self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
+                     "requires Tuple[TypeVariable[K], "
+                     "Iterable[Union[float, int, long]]] "
+                     "but got Tuple[str, Iterable[str]] for p_context",
+                     e.exception.message)
+
+  def test_mean_per_key_runtime_checking_satisfied(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    d = (self.p
+         | df.Create('c', range(5)).with_output_types(int)
+         | (df.Map('odd group', lambda x: (bool(x % 2), x))
+            .with_output_types(typehints.KV[bool, int]))
+         | combine.Mean.PerKey('odd mean'))
+
+    self.assertCompatible(typehints.KV[bool, float], d.element_type)
+    assert_that(d, equal_to([(False, 2.0), (True, 2.0)]))
+    self.p.run()
+
+  def test_mean_per_key_runtime_checking_violated(self):
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('c', range(5)).with_output_types(int)
+       | (df.Map('odd group', lambda x: (x, str(bool(x % 2))))
+          .with_output_types(typehints.KV[int, str]))
+       | combine.Mean.PerKey('odd mean'))
+      self.p.run()
+
+    self.assertStartswith(
+        e.exception.message,
+        "Runtime type violation detected within "
+        "ParDo(odd mean/CombinePerKey(MeanCombineFn)/"
+        "Combine/ParDo(CombineValuesDoFn)): "
+        "Type-hint for argument: 'p_context' violated: "
+        "Tuple[TypeVariable[K], Iterable[Union[float, int, long]]]"
+        " hint type-constraint violated. "
+        "The type of element #1 in the passed tuple is incorrect. "
+        "Iterable[Union[float, int, long]] "
+        "hint type-constraint violated. The type of element #0 "
+        "in the passed Iterable is incorrect: "
+        "Union[float, int, long] type-constraint violated. "
+        "Expected an instance of one of: "
+        "('float', 'int', 'long'), received str instead.")
+
+  def test_count_globally_pipeline_type_checking_satisfied(self):
+    d = (self.p
+         | df.Create('p', range(5)).with_output_types(int)
+         | combine.Count.Globally('count int'))
+
+    self.assertTrue(d.element_type is int)
+    assert_that(d, equal_to([5]))
+    self.p.run()
+
+  def test_count_globally_runtime_type_checking_satisfied(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    d = (self.p
+         | df.Create('p', range(5)).with_output_types(int)
+         | combine.Count.Globally('count int'))
+
+    self.assertTrue(d.element_type is int)
+    assert_that(d, equal_to([5]))
+    self.p.run()
+
+  def test_count_perkey_pipeline_type_checking_satisfied(self):
+    d = (self.p
+         | df.Create('p', range(5)).with_output_types(int)
+         | (df.Map('even group', lambda x: (not x % 2, x))
+            .with_output_types(typehints.KV[bool, int]))
+         | combine.Count.PerKey('count int'))
+
+    self.assertCompatible(typehints.KV[bool, int], d.element_type)
+    assert_that(d, equal_to([(False, 2), (True, 3)]))
+    self.p.run()
+
+  def test_count_perkey_pipeline_type_checking_violated(self):
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('p', range(5)).with_output_types(int)
+       | combine.Count.PerKey('count int'))
+
+    self.assertEqual("Input type hint violation at GroupByKey: "
+                     "expected Tuple[TypeVariable[K], TypeVariable[V]], "
+                     "got <type 'int'>",
+                     e.exception.message)
+
+  def test_count_perkey_runtime_type_checking_satisfied(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    d = (self.p
+         | df.Create('c', ['t', 'e', 's', 't']).with_output_types(str)
+         | df.Map('dup key', lambda x: (x, x)).with_output_types(typehints.KV[str, str])
+         | combine.Count.PerKey('count dups'))
+
+    self.assertCompatible(typehints.KV[str, int], d.element_type)
+    assert_that(d, equal_to([('e', 1), ('s', 1), ('t', 2)]))
+    self.p.run()
+
+  def test_count_perelement_pipeline_type_checking_satisfied(self):
+    d = (self.p
+         | df.Create('w', [1, 1, 2, 3]).with_output_types(int)
+         | combine.Count.PerElement('count elems'))
+
+    self.assertCompatible(typehints.KV[int, int], d.element_type)
+    assert_that(d, equal_to([(1, 2), (2, 1), (3, 1)]))
+    self.p.run()
+
+  def test_count_perelement_pipeline_type_checking_violated(self):
+    self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
+
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      (self.p
+       | df.Create('f', [1, 1, 2, 3])
+       | combine.Count.PerElement('count elems'))
+
+    self.assertEqual('Pipeline type checking is enabled, however no output '
+                     'type-hint was found for the PTransform '
+                     'Create(f)',
+                     e.exception.message)
+
+  def test_count_perelement_runtime_type_checking_satisfied(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    d = (self.p
+         | df.Create('w', [True, True, False, True, True]).with_output_types(bool)
+         | combine.Count.PerElement('count elems'))
+
+    self.assertCompatible(typehints.KV[bool, int], d.element_type)
+    assert_that(d, equal_to([(False, 1), (True, 4)]))
+    self.p.run()
+
+  def test_top_of_pipeline_checking_satisfied(self):
+    d = (self.p
+         | df.Create('n', range(5, 11)).with_output_types(int)
+         | combine.Top.Of('top 3', 3, lambda x, y: x < y))
+
+    self.assertCompatible(typehints.Iterable[int],
+                          d.element_type)
+    assert_that(d, equal_to([[10, 9, 8]]))
+    self.p.run()
+
+  def test_top_of_runtime_checking_satisfied(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    d = (self.p
+         | df.Create('n', list('testing')).with_output_types(str)
+         | combine.Top.Of('acii top', 3, lambda x, y: x < y))
+
+    self.assertCompatible(typehints.Iterable[str], d.element_type)
+    assert_that(d, equal_to([['t', 't', 's']]))
+    self.p.run()
+
+  def test_per_key_pipeline_checking_violated(self):
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      d = (self.p
+           | df.Create('n', range(100)).with_output_types(int)
+           | df.Map('num + 1', lambda x: x + 1).with_output_types(int)
+           | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
+
+    self.assertEqual("Input type hint violation at GroupByKey: "
+                     "expected Tuple[TypeVariable[K], TypeVariable[V]], "
+                     "got <type 'int'>",
+                     e.exception.message)
+
+  def test_per_key_pipeline_checking_satisfied(self):
+    d = (self.p
+         | df.Create('n', range(100)).with_output_types(int)
+         | (df.Map('group mod 3', lambda x: (x % 3, x))
+            .with_output_types(typehints.KV[int, int]))
+         | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
+
+    self.assertCompatible(typehints.Tuple[int, typehints.Iterable[int]],
+                          d.element_type)
+    assert_that(d, equal_to([(0, [99]), (1, [97]), (2, [98])]))
+    self.p.run()
+
+  def test_per_key_runtime_checking_satisfied(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    d = (self.p
+         | df.Create('n', range(21))
+         | (df.Map('group mod 3', lambda x: (x % 3, x))
+            .with_output_types(typehints.KV[int, int]))
+         | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
+
+    self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
+                          d.element_type)
+    assert_that(d, equal_to([(0, [18]), (1, [19]), (2, [20])]))
+    self.p.run()
+
+  def test_sample_globally_pipeline_satisfied(self):
+    d = (self.p
+         | df.Create('m', [2, 2, 3, 3]).with_output_types(int)
+         | combine.Sample.FixedSizeGlobally('sample', 3))
+
+    self.assertCompatible(typehints.Iterable[int], d.element_type)
+    def matcher(expected_len):
+      def match(actual):
+        equal_to([expected_len])([len(actual[0])])
+      return match
+    assert_that(d, matcher(3))
+    self.p.run()
+
+  def test_sample_globally_runtime_satisfied(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    d = (self.p
+         | df.Create('m', [2, 2, 3, 3]).with_output_types(int)
+         | combine.Sample.FixedSizeGlobally('sample', 2))
+
+    self.assertCompatible(typehints.Iterable[int], d.element_type)
+    def matcher(expected_len):
+      def match(actual):
+        equal_to([expected_len])([len(actual[0])])
+      return match
+    assert_that(d, matcher(2))
+    self.p.run()
+
+  def test_sample_per_key_pipeline_satisfied(self):
+    d = (self.p
+         | (df.Create('m', [(1, 2), (1, 2), (2, 3), (2, 3)])
+            .with_output_types(typehints.KV[int, int]))
+         | combine.Sample.FixedSizePerKey('sample', 2))
+
+    self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
+                          d.element_type)
+    def matcher(expected_len):
+      def match(actual):
+        for _, sample in actual:
+          equal_to([expected_len])([len(sample)])
+      return match
+    assert_that(d, matcher(2))
+    self.p.run()
+
+  def test_sample_per_key_runtime_satisfied(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    d = (self.p
+         | (df.Create('m', [(1, 2), (1, 2), (2, 3), (2, 3)])
+            .with_output_types(typehints.KV[int, int]))
+         | combine.Sample.FixedSizePerKey('sample', 1))
+
+    self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
+                          d.element_type)
+    def matcher(expected_len):
+      def match(actual):
+        for _, sample in actual:
+          equal_to([expected_len])([len(sample)])
+      return match
+    assert_that(d, matcher(1))
+    self.p.run()
+
+  def test_to_list_pipeline_check_satisfied(self):
+    d = (self.p
+         | df.Create('c', (1, 2, 3, 4)).with_output_types(int)
+         | combine.ToList('to list'))
+
+    self.assertCompatible(typehints.List[int], d.element_type)
+    def matcher(expected):
+      def match(actual):
+        equal_to(expected)(actual[0])
+      return match
+    assert_that(d, matcher([1, 2, 3, 4]))
+    self.p.run()
+
+  def test_to_list_runtime_check_satisfied(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    d = (self.p
+         | df.Create('c', list('test')).with_output_types(str)
+         | combine.ToList('to list'))
+
+    self.assertCompatible(typehints.List[str], d.element_type)
+    def matcher(expected):
+      def match(actual):
+        equal_to(expected)(actual[0])
+      return match
+    assert_that(d, matcher(['e', 's', 't', 't']))
+    self.p.run()
+
+  def test_to_dict_pipeline_check_violated(self):
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      d = (self.p
+           | df.Create('d', [1, 2, 3, 4]).with_output_types(int)
+           | combine.ToDict('to dict'))
+
+    self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
+                     "requires Tuple[TypeVariable[K], "
+                     "Iterable[Tuple[TypeVariable[K], TypeVariable[V]]]] "
+                     "but got Tuple[None, Iterable[int]] for p_context",
+                     e.exception.message)
+
+  def test_to_dict_pipeline_check_satisfied(self):
+    d = (self.p
+         | df.Create(
+             'd',
+             [(1, 2), (3, 4)]).with_output_types(typehints.Tuple[int, int])
+         | combine.ToDict('to dict'))
+
+    self.assertCompatible(typehints.Dict[int, int], d.element_type)
+    assert_that(d, equal_to([{1: 2, 3: 4}]))
+    self.p.run()
+
+  def test_to_dict_runtime_check_satisfied(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    d = (self.p
+         | (df.Create('d', [('1', 2), ('3', 4)])
+            .with_output_types(typehints.Tuple[str, int]))
+         | combine.ToDict('to dict'))
+
+    self.assertCompatible(typehints.Dict[str, int], d.element_type)
+    assert_that(d, equal_to([{'1': 2, '3': 4}]))
+    self.p.run()
+
+  def test_runtime_type_check_python_type_error(self):
+    self.p.options.view_as(TypeOptions).runtime_type_check = True
+
+    with self.assertRaises(TypeError) as e:
+      (self.p
+       | df.Create('t', [1, 2, 3]).with_output_types(int)
+       | df.Map('len', lambda x: len(x)).with_output_types(int))
+      self.p.run()
+
+    # Our special type-checking related TypeError shouldn't have been raised.
+    # Instead the above pipeline should have triggered a regular Python runtime
+    # TypeError.
+    self.assertEqual("object of type 'int' has no len() [while running 'len']",
+                     e.exception.message)
+    self.assertFalse(isinstance(e, typehints.TypeCheckError))
+
+  def test_pardo_type_inference(self):
+    self.assertEqual(int,
+                     df.Filter(lambda x: False).infer_output_type(int))
+    self.assertEqual(typehints.Tuple[str, int],
+                     df.Map(lambda x: (x, 1)).infer_output_type(str))
+
+  def test_gbk_type_inference(self):
+    self.assertEqual(
+        typehints.Tuple[str, typehints.Iterable[int]],
+        df.core.GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
+
+  def test_pipeline_inference(self):
+    created = self.p | df.Create('c', ['a', 'b', 'c'])
+    mapped = created | df.Map('pair with 1', lambda x: (x, 1))
+    grouped = mapped | df.GroupByKey()
+    self.assertEqual(str, created.element_type)
+    self.assertEqual(typehints.KV[str, int], mapped.element_type)
+    self.assertEqual(typehints.KV[str, typehints.Iterable[int]],
+                     grouped.element_type)
+
+  def test_inferred_bad_kv_type(self):
+    with self.assertRaises(typehints.TypeCheckError) as e:
+      _ = (self.p
+           | df.Create('t', ['a', 'b', 'c'])
+           | df.Map('ungroupable', lambda x: (x, 0, 1.0))
+           | df.GroupByKey())
+
+    self.assertEqual('Input type hint violation at GroupByKey: '
+                     'expected Tuple[TypeVariable[K], TypeVariable[V]], '
+                     'got Tuple[str, int, float]',
+                     e.exception.message)
+
+  def test_type_inference_command_line_flag_toggle(self):
+    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    x = self.p | df.Create('t', [1, 2, 3, 4])
+    self.assertIsNone(x.element_type)
+
+    self.p.options.view_as(TypeOptions).pipeline_type_check = True
+    x = self.p | df.Create('m', [1, 2, 3, 4])
+    self.assertEqual(int, x.element_type)
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py
new file mode 100644
index 0000000..b8efe82
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -0,0 +1,145 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Internal side input transforms and implementations.
+
+Important: this module is an implementation detail and should not be used
+directly by pipeline writers. Instead, users should use the helper methods
+AsSingleton, AsIter, AsList and AsDict in google.cloud.dataflow.pvalue.
+"""
+
+from __future__ import absolute_import
+
+from google.cloud.dataflow import pvalue
+from google.cloud.dataflow import typehints
+from google.cloud.dataflow.transforms.ptransform import PTransform
+
+
+class CreatePCollectionView(PTransform):
+  """Transform to materialize a given PCollectionView in the pipeline.
+
+  Important: this transform is an implementation detail and should not be used
+  directly by pipeline writers.
+  """
+
+  def __init__(self, view):
+    self.view = view
+    super(CreatePCollectionView, self).__init__()
+
+  def infer_output_type(self, input_type):
+    # TODO(ccy): Figure out if we want to create a new type of type hint, i.e.,
+    # typehints.View[...].
+    return input_type
+
+  def apply(self, pcoll):
+    return self.view
+
+
+class ViewAsSingleton(PTransform):
+  """Transform to view PCollection as a singleton PCollectionView.
+
+  Important: this transform is an implementation detail and should not be used
+  directly by pipeline writers. Use pvalue.AsSingleton(...) instead.
+  """
+
+  def __init__(self, has_default, default_value, label=None):
+    if label:
+      label = 'ViewAsSingleton(%s)' % label
+    super(ViewAsSingleton, self).__init__(label=label)
+    self.has_default = has_default
+    self.default_value = default_value
+
+  def apply(self, pcoll):
+    self._check_pcollection(pcoll)
+    input_type = pcoll.element_type
+    output_type = input_type
+    return (pcoll
+            | CreatePCollectionView(
+                pvalue.SingletonPCollectionView(
+                    pcoll.pipeline, self.has_default, self.default_value))
+            .with_input_types(input_type)
+            .with_output_types(output_type))
+
+
+class ViewAsIterable(PTransform):
+  """Transform to view PCollection as an iterable PCollectionView.
+
+  Important: this transform is an implementation detail and should not be used
+  directly by pipeline writers. Use pvalue.AsIter(...) instead.
+  """
+
+  def __init__(self, label=None):
+    if label:
+      label = 'ViewAsIterable(%s)' % label
+    super(ViewAsIterable, self).__init__(label=label)
+
+  def apply(self, pcoll):
+    self._check_pcollection(pcoll)
+    input_type = pcoll.element_type
+    output_type = typehints.Iterable[input_type]
+    return (pcoll
+            | CreatePCollectionView(
+                pvalue.IterablePCollectionView(pcoll.pipeline))
+            .with_input_types(input_type)
+            .with_output_types(output_type))
+
+
+class ViewAsList(PTransform):
+  """Transform to view PCollection as a list PCollectionView.
+
+  Important: this transform is an implementation detail and should not be used
+  directly by pipeline writers. Use pvalue.AsList(...) instead.
+  """
+
+  def __init__(self, label=None):
+    if label:
+      label = 'ViewAsList(%s)' % label
+    super(ViewAsList, self).__init__(label=label)
+
+  def apply(self, pcoll):
+    self._check_pcollection(pcoll)
+    input_type = pcoll.element_type
+    output_type = typehints.List[input_type]
+    return (pcoll
+            | CreatePCollectionView(pvalue.ListPCollectionView(pcoll.pipeline))
+            .with_input_types(input_type)
+            .with_output_types(output_type))
+
+K = typehints.TypeVariable('K')
+V = typehints.TypeVariable('V')
+@typehints.with_input_types(typehints.Tuple[K, V])
+@typehints.with_output_types(typehints.Dict[K, V])
+class ViewAsDict(PTransform):  # pylint: disable=g-wrong-blank-lines
+  """Transform to view PCollection as a dict PCollectionView.
+
+  Important: this transform is an implementation detail and should not be used
+  directly by pipeline writers. Use pvalue.AsDict(...) instead.
+  """
+
+  def __init__(self, label=None):
+    if label:
+      label = 'ViewAsDict(%s)' % label
+    super(ViewAsDict, self).__init__(label=label)
+
+  def apply(self, pcoll):
+    self._check_pcollection(pcoll)
+    input_type = pcoll.element_type
+    key_type, value_type = (
+        typehints.trivial_inference.key_value_types(input_type))
+    output_type = typehints.Dict[key_type, value_type]
+    return (pcoll
+            | CreatePCollectionView(
+                pvalue.DictPCollectionView(pcoll.pipeline))
+            .with_input_types(input_type)
+            .with_output_types(output_type))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/transforms/timeutil.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py
new file mode 100644
index 0000000..7b750f9
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -0,0 +1,310 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Time and timer utilities."""
+
+from __future__ import absolute_import
+
+from abc import ABCMeta
+from abc import abstractmethod
+
+import datetime
+import sys
+
+
+class Timestamp(object):
+  """Represents a Unix second timestamp with microsecond granularity.
+
+  Can be treated in common timestamp arithmetic operations as a numeric type.
+
+  Internally stores a time interval as an int of microseconds. This strategy
+  is necessary since floating point values lose precision when storing values,
+  especially after arithmetic operations (for example, 10000000 % 0.1 evaluates
+  to 0.0999999994448885).
+  """
+
+  def __init__(self, seconds=0, micros=0):
+    self.micros = int(seconds * 1000000) + int(micros)
+
+  @staticmethod
+  def of(seconds):
+    """Return the Timestamp for the given number of seconds.
+
+    If the input is already a Timestamp, the input itself will be returned.
+
+    Args:
+      seconds: Number of seconds as int, float or Timestamp.
+
+    Returns:
+      Corresponding Timestamp object.
+    """
+
+    if isinstance(seconds, Duration):
+      raise TypeError('Can\'t interpret %s as Timestamp.' % seconds)
+    if isinstance(seconds, Timestamp):
+      return seconds
+    return Timestamp(seconds)
+
+  def __repr__(self):
+    micros = self.micros
+    sign = ''
+    if micros < 0:
+      sign = '-'
+      micros = -micros
+    int_part = micros / 1000000
+    frac_part = micros % 1000000
+    if frac_part:
+      return 'Timestamp(%s%d.%06d)' % (sign, int_part, frac_part)
+    else:
+      return 'Timestamp(%s%d)' % (sign, int_part)
+
+  def to_utc_datetime(self):
+    epoch = datetime.datetime.utcfromtimestamp(0)
+    # We can't easily construct a datetime object from microseconds, so we
+    # create one at the epoch and add an appropriate timedelta interval.
+    return epoch + datetime.timedelta(microseconds=self.micros)
+
+  def isoformat(self):
+    # Append 'Z' for UTC timezone.
+    return self.to_utc_datetime().isoformat() + 'Z'
+
+  def __float__(self):
+    # Note that the returned value may have lost precision.
+    return float(self.micros) / 1000000
+
+  def __int__(self):
+    # Note that the returned value may have lost precision.
+    return self.micros / 1000000
+
+  def __cmp__(self, other):
+    # Allow comparisons between Duration and Timestamp values.
+    if not isinstance(other, Duration):
+      other = Timestamp.of(other)
+    return cmp(self.micros, other.micros)
+
+  def __hash__(self):
+    return hash(self.micros)
+
+  def __add__(self, other):
+    other = Duration.of(other)
+    return Timestamp(micros=self.micros + other.micros)
+
+  def __radd__(self, other):
+    return self + other
+
+  def __sub__(self, other):
+    other = Duration.of(other)
+    return Timestamp(micros=self.micros - other.micros)
+
+  def __mod__(self, other):
+    other = Duration.of(other)
+    return Duration(micros=self.micros % other.micros)
+
+
+MIN_TIMESTAMP = Timestamp(micros=-sys.maxint - 1)
+MAX_TIMESTAMP = Timestamp(micros=sys.maxint)
+
+
+class Duration(object):
+  """Represents a second duration with microsecond granularity.
+
+  Can be treated in common arithmetic operations as a numeric type.
+
+  Internally stores a time interval as an int of microseconds. This strategy
+  is necessary since floating point values lose precision when storing values,
+  especially after arithmetic operations (for example, 10000000 % 0.1 evaluates
+  to 0.0999999994448885).
+  """
+
+  def __init__(self, seconds=0, micros=0):
+    self.micros = int(seconds * 1000000) + int(micros)
+
+  @staticmethod
+  def of(seconds):
+    """Return the Duration for the given number of seconds since Unix epoch.
+
+    If the input is already a Duration, the input itself will be returned.
+
+    Args:
+      seconds: Number of seconds as int, float or Duration.
+
+    Returns:
+      Corresponding Duration object.
+    """
+
+    if isinstance(seconds, Timestamp):
+      raise TypeError('Can\'t interpret %s as Duration.' % seconds)
+    if isinstance(seconds, Duration):
+      return seconds
+    return Duration(seconds)
+
+  def __repr__(self):
+    micros = self.micros
+    sign = ''
+    if micros < 0:
+      sign = '-'
+      micros = -micros
+    int_part = micros / 1000000
+    frac_part = micros % 1000000
+    if frac_part:
+      return 'Duration(%s%d.%06d)' % (sign, int_part, frac_part)
+    else:
+      return 'Duration(%s%d)' % (sign, int_part)
+
+  def __float__(self):
+    # Note that the returned value may have lost precision.
+    return float(self.micros) / 1000000
+
+  def __int__(self):
+    # Note that the returned value may have lost precision.
+    return self.micros / 1000000
+
+  def __cmp__(self, other):
+    # Allow comparisons between Duration and Timestamp values.
+    if not isinstance(other, Timestamp):
+      other = Duration.of(other)
+    return cmp(self.micros, other.micros)
+
+  def __hash__(self):
+    return hash(self.micros)
+
+  def __neg__(self):
+    return Duration(micros=-self.micros)
+
+  def __add__(self, other):
+    if isinstance(other, Timestamp):
+      return other + self
+    other = Duration.of(other)
+    return Duration(micros=self.micros + other.micros)
+
+  def __radd__(self, other):
+    return self + other
+
+  def __sub__(self, other):
+    other = Duration.of(other)
+    return Duration(micros=self.micros - other.micros)
+
+  def __rsub__(self, other):
+    return -(self - other)
+
+  def __mul__(self, other):
+    other = Duration.of(other)
+    return Duration(micros=self.micros * other.micros / 1000000)
+
+  def __rmul__(self, other):
+    return self * other
+
+  def __mod__(self, other):
+    other = Duration.of(other)
+    return Duration(micros=self.micros % other.micros)
+
+
+class TimeDomain(object):
+  """Time domain for streaming timers."""
+
+  WATERMARK = 'WATERMARK'
+  REAL_TIME = 'REAL_TIME'
+  DEPENDENT_REAL_TIME = 'DEPENDENT_REAL_TIME'
+
+  @staticmethod
+  def from_string(domain):
+    if domain in (TimeDomain.WATERMARK,
+                  TimeDomain.REAL_TIME,
+                  TimeDomain.DEPENDENT_REAL_TIME):
+      return domain
+    raise ValueError('Unknown time domain: %s' % domain)
+
+
+class OutputTimeFnImpl(object):
+  """Implementation of OutputTimeFn."""
+
+  __metaclass__ = ABCMeta
+
+  @abstractmethod
+  def assign_output_time(self, window, input_timestamp):
+    pass
+
+  @abstractmethod
+  def combine(self, output_timestamp, other_output_timestamp):
+    pass
+
+  def combine_all(self, merging_timestamps):
+    """Apply combine to list of timestamps."""
+    combined_output_time = None
+    for output_time in merging_timestamps:
+      if combined_output_time is None:
+        combined_output_time = output_time
+      else:
+        combined_output_time = self.combine(
+            combined_output_time, output_time)
+    return combined_output_time
+
+  def merge(self, unused_result_window, merging_timestamps):
+    """Default to returning the result of combine_all."""
+    return self.combine_all(merging_timestamps)
+
+
+class DependsOnlyOnWindow(OutputTimeFnImpl):
+  """OutputTimeFnImpl that only depends on the window."""
+
+  __metaclass__ = ABCMeta
+
+  def combine(self, output_timestamp, other_output_timestamp):
+    return output_timestamp
+
+  def merge(self, result_window, unused_merging_timestamps):
+    # Since we know that the result only depends on the window, we can ignore
+    # the given timestamps.
+    return self.assign_output_time(result_window, None)
+
+
+class OutputAtEarliestInputTimestampImpl(OutputTimeFnImpl):
+  """OutputTimeFnImpl outputting at earliest input timestamp."""
+
+  def assign_output_time(self, window, input_timestamp):
+    return input_timestamp
+
+  def combine(self, output_timestamp, other_output_timestamp):
+    """Default to returning the earlier of two timestamps."""
+    return min(output_timestamp, other_output_timestamp)
+
+
+class OutputAtEarliestTransformedInputTimestampImpl(OutputTimeFnImpl):
+  """OutputTimeFnImpl outputting at earliest input timestamp."""
+
+  def __init__(self, window_fn):
+    self.window_fn = window_fn
+
+  def assign_output_time(self, window, input_timestamp):
+    return self.window_fn.get_transformed_output_time(window, input_timestamp)
+
+  def combine(self, output_timestamp, other_output_timestamp):
+    return min(output_timestamp, other_output_timestamp)
+
+
+class OutputAtLatestInputTimestampImpl(OutputTimeFnImpl):
+  """OutputTimeFnImpl outputting at latest input timestamp."""
+
+  def assign_output_time(self, window, input_timestamp):
+    return input_timestamp
+
+  def combine(self, output_timestamp, other_output_timestamp):
+    return max(output_timestamp, other_output_timestamp)
+
+
+class OutputAtEndOfWindowImpl(DependsOnlyOnWindow):
+  """OutputTimeFnImpl outputting at end of window."""
+
+  def assign_output_time(self, window, unused_input_timestamp):
+    return window.end