You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:12:41 UTC

[06/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/ptransform_test.py b/sdks/python/google/cloud/dataflow/transforms/ptransform_test.py
deleted file mode 100644
index 00b6c8d..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/ptransform_test.py
+++ /dev/null
@@ -1,1814 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for the PTransform and descendants."""
-
-from __future__ import absolute_import
-
-import operator
-import re
-import unittest
-
-
-import google.cloud.dataflow as df
-from google.cloud.dataflow.pipeline import Pipeline
-import google.cloud.dataflow.pvalue as pvalue
-import google.cloud.dataflow.transforms.combiners as combine
-from google.cloud.dataflow.transforms.ptransform import PTransform
-from google.cloud.dataflow.transforms.util import assert_that, equal_to
-import google.cloud.dataflow.typehints as typehints
-from google.cloud.dataflow.typehints import with_input_types
-from google.cloud.dataflow.typehints import with_output_types
-from google.cloud.dataflow.typehints.typehints_test import TypeHintTestCase
-from google.cloud.dataflow.utils.options import PipelineOptions
-from google.cloud.dataflow.utils.options import TypeOptions
-
-
-# Disable frequent lint warning due to pipe operator for chaining transforms.
-# pylint: disable=expression-not-assigned
-
-
-class PTransformTest(unittest.TestCase):
-
-  def assertStartswith(self, msg, prefix):
-    self.assertTrue(msg.startswith(prefix),
-                    '"%s" does not start with "%s"' % (msg, prefix))
-
-  def test_str(self):
-    self.assertEqual('<PTransform(PTransform) label=[PTransform]>',
-                     str(PTransform()))
-
-    pa = Pipeline('DirectPipelineRunner')
-    res = pa | df.Create('a_label', [1, 2])
-    self.assertEqual('<Create(PTransform) label=[a_label]>',
-                     str(res.producer.transform))
-
-    pc = Pipeline('DirectPipelineRunner')
-    res = pc | df.Create('with_inputs', [1, 2])
-    inputs_tr = res.producer.transform
-    inputs_tr.inputs = ('ci',)
-    self.assertEqual(
-        """<Create(PTransform) label=[with_inputs] inputs=('ci',)>""",
-        str(inputs_tr))
-
-    pd = Pipeline('DirectPipelineRunner')
-    res = pd | df.Create('with_sidei', [1, 2])
-    side_tr = res.producer.transform
-    side_tr.side_inputs = (4,)
-    self.assertEqual(
-        '<Create(PTransform) label=[with_sidei] side_inputs=(4,)>',
-        str(side_tr))
-
-    inputs_tr.side_inputs = ('cs',)
-    self.assertEqual(
-        """<Create(PTransform) label=[with_inputs] """
-        """inputs=('ci',) side_inputs=('cs',)>""",
-        str(inputs_tr))
-
-  def test_parse_label_and_arg(self):
-
-    def fun(*args, **kwargs):
-      return PTransform().parse_label_and_arg(args, kwargs, 'name')
-
-    self.assertEqual(('PTransform', 'value'), fun('value'))
-    self.assertEqual(('PTransform', 'value'), fun(name='value'))
-    self.assertEqual(('label', 'value'), fun('label', 'value'))
-    self.assertEqual(('label', 'value'), fun('label', name='value'))
-    self.assertEqual(('label', 'value'), fun('value', label='label'))
-    self.assertEqual(('label', 'value'), fun(name='value', label='label'))
-
-    self.assertRaises(ValueError, fun)
-    self.assertRaises(ValueError, fun, 0, 'value')
-    self.assertRaises(ValueError, fun, label=0, name='value')
-    self.assertRaises(ValueError, fun, other='value')
-
-    with self.assertRaises(ValueError) as cm:
-      fun(0, name='value')
-    self.assertEqual(
-        cm.exception.message,
-        'PTransform expects a (label, name) or (name) argument list '
-        'instead of args=(0,), kwargs={\'name\': \'value\'}')
-
-  def test_do_with_do_fn(self):
-    class AddNDoFn(df.DoFn):
-
-      def process(self, context, addon):
-        return [context.element + addon]
-
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', [1, 2, 3])
-    result = pcoll | df.ParDo('do', AddNDoFn(), 10)
-    assert_that(result, equal_to([11, 12, 13]))
-    pipeline.run()
-
-  def test_do_with_unconstructed_do_fn(self):
-    class MyDoFn(df.DoFn):
-
-      def process(self, context):
-        pass
-
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', [1, 2, 3])
-    with self.assertRaises(ValueError):
-      pcoll | df.ParDo('do', MyDoFn)  # Note the lack of ()'s
-
-  def test_do_with_callable(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', [1, 2, 3])
-    result = pcoll | df.FlatMap('do', lambda x, addon: [x + addon], 10)
-    assert_that(result, equal_to([11, 12, 13]))
-    pipeline.run()
-
-  def test_do_with_side_input_as_arg(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    side = pipeline | df.Create('side', [10])
-    pcoll = pipeline | df.Create('start', [1, 2, 3])
-    result = pcoll | df.FlatMap(
-        'do', lambda x, addon: [x + addon], pvalue.AsSingleton(side))
-    assert_that(result, equal_to([11, 12, 13]))
-    pipeline.run()
-
-  def test_do_with_side_input_as_keyword_arg(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    side = pipeline | df.Create('side', [10])
-    pcoll = pipeline | df.Create('start', [1, 2, 3])
-    result = pcoll | df.FlatMap(
-        'do', lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side))
-    assert_that(result, equal_to([11, 12, 13]))
-    pipeline.run()
-
-  def test_do_with_do_fn_returning_string_raises_warning(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', ['2', '9', '3'])
-    pcoll | df.FlatMap('do', lambda x: x + '1')
-
-    # Since the DoFn directly returns a string we should get an error warning
-    # us.
-    with self.assertRaises(typehints.TypeCheckError) as cm:
-      pipeline.run()
-
-    expected_error_prefix = ('Returning a str from a ParDo or FlatMap '
-                             'is discouraged.')
-    self.assertStartswith(cm.exception.message, expected_error_prefix)
-
-  def test_do_with_do_fn_returning_dict_raises_warning(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', ['2', '9', '3'])
-    pcoll | df.FlatMap('do', lambda x: {x: '1'})
-
-    # Since the DoFn directly returns a dict we should get an error warning
-    # us.
-    with self.assertRaises(typehints.TypeCheckError) as cm:
-      pipeline.run()
-
-    expected_error_prefix = ('Returning a dict from a ParDo or FlatMap '
-                             'is discouraged.')
-    self.assertStartswith(cm.exception.message, expected_error_prefix)
-
-  def test_do_with_side_outputs_maintains_unique_name(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', [1, 2, 3])
-    r1 = pcoll | df.FlatMap('a', lambda x: [x + 1]).with_outputs(main='m')
-    r2 = pcoll | df.FlatMap('b', lambda x: [x + 2]).with_outputs(main='m')
-    assert_that(r1.m, equal_to([2, 3, 4]), label='r1')
-    assert_that(r2.m, equal_to([3, 4, 5]), label='r2')
-    pipeline.run()
-
-  def test_do_requires_do_fn_returning_iterable(self):
-    # This function is incorrect because it returns an object that isn't an
-    # iterable.
-    def incorrect_par_do_fn(x):
-      return x + 5
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', [2, 9, 3])
-    pcoll | df.FlatMap('do', incorrect_par_do_fn)
-    # It's a requirement that all user-defined functions to a ParDo return
-    # an iterable.
-    with self.assertRaises(typehints.TypeCheckError) as cm:
-      pipeline.run()
-
-    expected_error_prefix = 'FlatMap and ParDo must return an iterable.'
-    self.assertStartswith(cm.exception.message, expected_error_prefix)
-
-  def test_do_fn_with_start_finish(self):
-    class MyDoFn(df.DoFn):
-      def start_bundle(self, c):
-        yield 'start'
-      def process(self, c):
-        pass
-      def finish_bundle(self, c):
-        yield 'finish'
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', [1, 2, 3])
-    result = pcoll | df.ParDo('do', MyDoFn())
-
-    # May have many bundles, but each has a start and finish.
-    def  matcher():
-      def match(actual):
-        equal_to(['start', 'finish'])(list(set(actual)))
-        equal_to([actual.count('start')])([actual.count('finish')])
-      return match
-
-    assert_that(result, matcher())
-    pipeline.run()
-
-  def test_filter(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', [1, 2, 3, 4])
-    result = pcoll | df.Filter(
-        'filter', lambda x: x % 2 == 0)
-    assert_that(result, equal_to([2, 4]))
-    pipeline.run()
-
-  class _MeanCombineFn(df.CombineFn):
-
-    def create_accumulator(self):
-      return (0, 0)
-
-    def add_input(self, (sum_, count), element):
-      return sum_ + element, count + 1
-
-    def merge_accumulators(self, accumulators):
-      sums, counts = zip(*accumulators)
-      return sum(sums), sum(counts)
-
-    def extract_output(self, (sum_, count)):
-      if not count:
-        return float('nan')
-      return sum_ / float(count)
-
-  def test_combine_with_combine_fn(self):
-    vals = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', vals)
-    result = pcoll | df.CombineGlobally('mean', self._MeanCombineFn())
-    assert_that(result, equal_to([sum(vals) / len(vals)]))
-    pipeline.run()
-
-  def test_combine_with_callable(self):
-    vals = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', vals)
-    result = pcoll | df.CombineGlobally(sum)
-    assert_that(result, equal_to([sum(vals)]))
-    pipeline.run()
-
-  def test_combine_with_side_input_as_arg(self):
-    values = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', values)
-    divisor = pipeline | df.Create('divisor', [2])
-    result = pcoll | df.CombineGlobally(
-        'max',
-        # Multiples of divisor only.
-        lambda vals, d: max(v for v in vals if v % d == 0),
-        pvalue.AsSingleton(divisor)).without_defaults()
-    filt_vals = [v for v in values if v % 2 == 0]
-    assert_that(result, equal_to([max(filt_vals)]))
-    pipeline.run()
-
-  def test_combine_per_key_with_combine_fn(self):
-    vals_1 = [1, 2, 3, 4, 5, 6, 7]
-    vals_2 = [2, 4, 6, 8, 10, 12, 14]
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', ([('a', x) for x in vals_1] +
-                                           [('b', x) for x in vals_2]))
-    result = pcoll | df.CombinePerKey('mean', self._MeanCombineFn())
-    assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)),
-                                  ('b', sum(vals_2) / len(vals_2))]))
-    pipeline.run()
-
-  def test_combine_per_key_with_callable(self):
-    vals_1 = [1, 2, 3, 4, 5, 6, 7]
-    vals_2 = [2, 4, 6, 8, 10, 12, 14]
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', ([('a', x) for x in vals_1] +
-                                           [('b', x) for x in vals_2]))
-    result = pcoll | df.CombinePerKey(sum)
-    assert_that(result, equal_to([('a', sum(vals_1)), ('b', sum(vals_2))]))
-    pipeline.run()
-
-  def test_combine_per_key_with_side_input_as_arg(self):
-    vals_1 = [1, 2, 3, 4, 5, 6, 7]
-    vals_2 = [2, 4, 6, 8, 10, 12, 14]
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', ([('a', x) for x in vals_1] +
-                                           [('b', x) for x in vals_2]))
-    divisor = pipeline | df.Create('divisor', [2])
-    result = pcoll | df.CombinePerKey(
-        lambda vals, d: max(v for v in vals if v % d == 0),
-        pvalue.AsSingleton(divisor))  # Multiples of divisor only.
-    m_1 = max(v for v in vals_1 if v % 2 == 0)
-    m_2 = max(v for v in vals_2 if v % 2 == 0)
-    assert_that(result, equal_to([('a', m_1), ('b', m_2)]))
-    pipeline.run()
-
-  def test_group_by_key(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create(
-        'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)])
-    result = pcoll | df.GroupByKey('group')
-    assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
-    pipeline.run()
-
-  def test_partition_with_partition_fn(self):
-
-    class SomePartitionFn(df.PartitionFn):
-
-      def partition_for(self, context, num_partitions, offset):
-        return (context.element % 3) + offset
-
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
-    # Attempt nominal partition operation.
-    partitions = pcoll | df.Partition('part1', SomePartitionFn(), 4, 1)
-    assert_that(partitions[0], equal_to([]))
-    assert_that(partitions[1], equal_to([0, 3, 6]), label='p1')
-    assert_that(partitions[2], equal_to([1, 4, 7]), label='p2')
-    assert_that(partitions[3], equal_to([2, 5, 8]), label='p3')
-    pipeline.run()
-
-    # Check that a bad partition label will yield an error. For the
-    # DirectPipelineRunner, this error manifests as an exception.
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
-    partitions = pcoll | df.Partition('part2', SomePartitionFn(), 4, 10000)
-    with self.assertRaises(ValueError):
-      pipeline.run()
-
-  def test_partition_with_callable(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
-    partitions = (
-        pcoll | df.Partition(
-            'part',
-            lambda e, n, offset: (e % 3) + offset, 4,
-            1))
-    assert_that(partitions[0], equal_to([]))
-    assert_that(partitions[1], equal_to([0, 3, 6]), label='p1')
-    assert_that(partitions[2], equal_to([1, 4, 7]), label='p2')
-    assert_that(partitions[3], equal_to([2, 5, 8]), label='p3')
-    pipeline.run()
-
-  def test_partition_followed_by_flatten_and_groupbykey(self):
-    """Regression test for an issue with how partitions are handled."""
-    pipeline = Pipeline('DirectPipelineRunner')
-    contents = [('aa', 1), ('bb', 2), ('aa', 2)]
-    created = pipeline | df.Create('A', contents)
-    partitioned = created | df.Partition('B', lambda x, n: len(x) % n, 3)
-    flattened = partitioned | df.Flatten('C')
-    grouped = flattened | df.GroupByKey('D')
-    assert_that(grouped, equal_to([('aa', [1, 2]), ('bb', [2])]))
-    pipeline.run()
-
-  def test_flatten_pcollections(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll_1 = pipeline | df.Create('start_1', [0, 1, 2, 3])
-    pcoll_2 = pipeline | df.Create('start_2', [4, 5, 6, 7])
-    result = (pcoll_1, pcoll_2) | df.Flatten('flatten')
-    assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))
-    pipeline.run()
-
-  def test_flatten_no_pcollections(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    with self.assertRaises(ValueError):
-      () | df.Flatten('pipeline arg missing')
-    result = () | df.Flatten('empty', pipeline=pipeline)
-    assert_that(result, equal_to([]))
-    pipeline.run()
-
-  def test_flatten_pcollections_in_iterable(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll_1 = pipeline | df.Create('start_1', [0, 1, 2, 3])
-    pcoll_2 = pipeline | df.Create('start_2', [4, 5, 6, 7])
-    result = ([pcoll for pcoll in (pcoll_1, pcoll_2)]
-              | df.Flatten('flatten'))
-    assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))
-    pipeline.run()
-
-  def test_flatten_input_type_must_be_iterable(self):
-    # Inputs to flatten *must* be an iterable.
-    with self.assertRaises(ValueError):
-      4 | df.Flatten('flatten')
-
-  def test_flatten_input_type_must_be_iterable_of_pcolls(self):
-    # Inputs to flatten *must* be an iterable of PCollections.
-    with self.assertRaises(TypeError):
-      {'l': 'test'} | df.Flatten('flatten')
-    with self.assertRaises(TypeError):
-      set([1, 2, 3]) | df.Flatten('flatten')
-
-  def test_co_group_by_key_on_list(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll_1 = pipeline | df.Create(
-        'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
-    pcoll_2 = pipeline | df.Create(
-        'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
-    result = (pcoll_1, pcoll_2) | df.CoGroupByKey('cgbk')
-    assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
-                                  ('b', ([3], [])),
-                                  ('c', ([4], [7, 8]))]))
-    pipeline.run()
-
-  def test_co_group_by_key_on_iterable(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll_1 = pipeline | df.Create(
-        'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
-    pcoll_2 = pipeline | df.Create(
-        'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
-    result = ([pc for pc in (pcoll_1, pcoll_2)]
-              | df.CoGroupByKey('cgbk'))
-    assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
-                                  ('b', ([3], [])),
-                                  ('c', ([4], [7, 8]))]))
-    pipeline.run()
-
-  def test_co_group_by_key_on_dict(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll_1 = pipeline | df.Create(
-        'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
-    pcoll_2 = pipeline | df.Create(
-        'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
-    result = {'X': pcoll_1, 'Y': pcoll_2} | df.CoGroupByKey('cgbk')
-    assert_that(result, equal_to([('a', {'X': [1, 2], 'Y': [5, 6]}),
-                                  ('b', {'X': [3], 'Y': []}),
-                                  ('c', {'X': [4], 'Y': [7, 8]})]))
-    pipeline.run()
-
-  def test_group_by_key_input_must_be_kv_pairs(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcolls = pipeline | df.Create('A', [1, 2, 3, 4, 5])
-
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      pcolls | df.GroupByKey('D')
-      pipeline.run()
-
-    self.assertStartswith(
-        e.exception.message,
-        'Runtime type violation detected within '
-        'ParDo(D/reify_windows): Input to GroupByKey must be '
-        'a PCollection with elements compatible with KV[A, B]')
-
-  def test_group_by_key_only_input_must_be_kv_pairs(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcolls = pipeline | df.Create('A', ['a', 'b', 'f'])
-    with self.assertRaises(typehints.TypeCheckError) as cm:
-      pcolls | df.GroupByKeyOnly('D')
-      pipeline.run()
-
-    expected_error_prefix = ('Input to GroupByKeyOnly must be a PCollection of '
-                             'windowed key-value pairs.')
-    self.assertStartswith(cm.exception.message, expected_error_prefix)
-
-  def test_keys_and_values(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create(
-        'start', [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)])
-    keys = pcoll.apply('keys', df.Keys())
-    vals = pcoll.apply('vals', df.Values())
-    assert_that(keys, equal_to([1, 2, 2, 3, 3, 3]), label='assert:keys')
-    assert_that(vals, equal_to([1, 1, 1, 2, 2, 3]), label='assert:vals')
-    pipeline.run()
-
-  def test_kv_swap(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create(
-        'start', [(6, 3), (1, 2), (7, 1), (5, 2), (3, 2)])
-    result = pcoll.apply('swap', df.KvSwap())
-    assert_that(result, equal_to([(1, 7), (2, 1), (2, 3), (2, 5), (3, 6)]))
-    pipeline.run()
-
-  def test_remove_duplicates(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create(
-        'start', [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
-    result = pcoll.apply('nodupes', df.RemoveDuplicates())
-    assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
-    pipeline.run()
-
-  def test_chained_ptransforms(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    t = (df.Map(lambda x: (x, 1))
-         | df.GroupByKey()
-         | df.Map(lambda (x, ones): (x, sum(ones))))
-    result = pipeline | df.Create('start', ['a', 'a', 'b']) | t
-    assert_that(result, equal_to([('a', 2), ('b', 1)]))
-    pipeline.run()
-
-  def test_apply_to_list(self):
-    self.assertEqual([1, 2, 3], [0, 1, 2] | df.Map('add_one', lambda x: x + 1))
-    self.assertEqual([1], [0, 1, 2] | df.Filter('odd', lambda x: x % 2))
-    self.assertEqual([1, 2, 3, 100],
-                     ([1, 2, 3], [100]) | df.Flatten('flat'))
-    join_input = ([('k', 'a')],
-                  [('k', 'b'), ('k', 'c')])
-    self.assertEqual([('k', (['a'], ['b', 'c']))],
-                     join_input | df.CoGroupByKey('join'))
-
-  def test_multi_input_ptransform(self):
-    class DisjointUnion(PTransform):
-      def apply(self, pcollections):
-        return (pcollections
-                | df.Flatten()
-                | df.Map(lambda x: (x, None))
-                | df.GroupByKey()
-                | df.Map(lambda (x, _): x))
-    self.assertEqual([1, 2, 3], sorted(([1, 2], [2, 3]) | DisjointUnion()))
-
-  def test_apply_to_crazy_pvaluish(self):
-    class NestedFlatten(PTransform):
-      """A PTransform taking and returning nested PValueish.
-
-      Takes as input a list of dicts, and returns a dict with the corresponding
-      values flattened.
-      """
-      def _extract_input_pvalues(self, pvalueish):
-        pvalueish = list(pvalueish)
-        return pvalueish, sum([list(p.values()) for p in pvalueish], [])
-      def apply(self, pcoll_dicts):
-        keys = reduce(operator.or_, [set(p.keys()) for p in pcoll_dicts])
-        res = {}
-        for k in keys:
-          res[k] = [p[k] for p in pcoll_dicts if k in p] | df.Flatten(k)
-        return res
-    res = [{'a': [1, 2, 3]},
-           {'a': [4, 5, 6], 'b': ['x', 'y', 'z']},
-           {'a': [7, 8], 'b': ['x', 'y'], 'c': []}] | NestedFlatten()
-    self.assertEqual(3, len(res))
-    self.assertEqual([1, 2, 3, 4, 5, 6, 7, 8], sorted(res['a']))
-    self.assertEqual(['x', 'x', 'y', 'y', 'z'], sorted(res['b']))
-    self.assertEqual([], sorted(res['c']))
-
-@df.ptransform_fn
-def SamplePTransform(label, pcoll, context, *args, **kwargs):
-  """Sample transform using the @ptransform_fn decorator."""
-  _ = label, args, kwargs
-  map_transform = df.Map('ToPairs', lambda v: (v, None))
-  combine_transform = df.CombinePerKey('Group', lambda vs: None)
-  keys_transform = df.Keys('RemoveDuplicates')
-  context.extend([map_transform, combine_transform, keys_transform])
-  return pcoll | map_transform | combine_transform | keys_transform
-
-
-class PTransformLabelsTest(unittest.TestCase):
-
-  class CustomTransform(df.PTransform):
-
-    pardo = None
-
-    def apply(self, pcoll):
-      self.pardo = df.FlatMap('*do*', lambda x: [x + 1])
-      return pcoll | self.pardo
-
-  def test_chained_ptransforms(self):
-    """Tests that chaining gets proper nesting."""
-    pipeline = Pipeline('DirectPipelineRunner')
-    map1 = df.Map('map1', lambda x: (x, 1))
-    gbk = df.GroupByKey('gbk')
-    map2 = df.Map('map2', lambda (x, ones): (x, sum(ones)))
-    t = (map1 | gbk | map2)
-    result = pipeline | df.Create('start', ['a', 'a', 'b']) | t
-    self.assertTrue('map1|gbk|map2/map1' in pipeline.applied_labels)
-    self.assertTrue('map1|gbk|map2/gbk' in pipeline.applied_labels)
-    self.assertTrue('map1|gbk|map2/map2' in pipeline.applied_labels)
-    assert_that(result, equal_to([('a', 2), ('b', 1)]))
-    pipeline.run()
-
-  def test_apply_custom_transform_without_label(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('pcoll', [1, 2, 3])
-    custom = PTransformLabelsTest.CustomTransform()
-    result = pipeline.apply(custom, pcoll)
-    self.assertTrue('CustomTransform' in pipeline.applied_labels)
-    self.assertTrue('CustomTransform/*do*' in pipeline.applied_labels)
-    assert_that(result, equal_to([2, 3, 4]))
-    pipeline.run()
-
-  def test_apply_custom_transform_with_label(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('pcoll', [1, 2, 3])
-    custom = PTransformLabelsTest.CustomTransform('*custom*')
-    result = pipeline.apply(custom, pcoll)
-    self.assertTrue('*custom*' in pipeline.applied_labels)
-    self.assertTrue('*custom*/*do*' in pipeline.applied_labels)
-    assert_that(result, equal_to([2, 3, 4]))
-    pipeline.run()
-
-  def test_combine_without_label(self):
-    vals = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', vals)
-    combine = df.CombineGlobally(sum)
-    result = pcoll | combine
-    self.assertTrue('CombineGlobally(sum)' in pipeline.applied_labels)
-    assert_that(result, equal_to([sum(vals)]))
-    pipeline.run()
-
-  def test_apply_ptransform_using_decorator(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('pcoll', [1, 2, 3])
-    context = []
-    sample = SamplePTransform('*sample*', context)
-    _ = pcoll | sample
-    self.assertTrue('*sample*' in pipeline.applied_labels)
-    self.assertTrue('*sample*/ToPairs' in pipeline.applied_labels)
-    self.assertTrue('*sample*/Group' in pipeline.applied_labels)
-    self.assertTrue('*sample*/RemoveDuplicates' in pipeline.applied_labels)
-
-  def test_combine_with_label(self):
-    vals = [1, 2, 3, 4, 5, 6, 7]
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | df.Create('start', vals)
-    combine = df.CombineGlobally('*sum*', sum)
-    result = pcoll | combine
-    self.assertTrue('*sum*' in pipeline.applied_labels)
-    assert_that(result, equal_to([sum(vals)]))
-    pipeline.run()
-
-  def check_label(self, ptransform, expected_label):
-    pipeline = Pipeline('DirectPipelineRunner')
-    pipeline | df.Create('start', [('a', 1)]) | ptransform
-    actual_label = sorted(pipeline.applied_labels - {'start'})[0]
-    self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label))
-
-  def test_default_labels(self):
-    self.check_label(df.Map(len), r'Map(len)')
-    self.check_label(df.Map(lambda x: x),
-                     r'Map(<lambda at ptransform_test.py:#>)')
-    self.check_label(df.FlatMap(list), r'FlatMap(list)')
-    self.check_label(df.Filter(sum), r'Filter(sum)')
-    self.check_label(df.CombineGlobally(sum), r'CombineGlobally(sum)')
-    self.check_label(df.CombinePerKey(sum), r'CombinePerKey(sum)')
-
-    class MyDoFn(df.DoFn):
-      def process(self, context):
-        pass
-
-    self.check_label(df.ParDo(MyDoFn()), r'ParDo(MyDoFn)')
-
-
-class PTransformTypeCheckTestCase(TypeHintTestCase):
-
-  def assertStartswith(self, msg, prefix):
-    self.assertTrue(msg.startswith(prefix),
-                    '"%s" does not start with "%s"' % (msg, prefix))
-
-  def setUp(self):
-    self.p = Pipeline(options=PipelineOptions([]))
-
-  def test_do_fn_pipeline_pipeline_type_check_satisfied(self):
-    @with_input_types(int, int)
-    @with_output_types(typehints.List[int])
-    class AddWithFive(df.DoFn):
-      def process(self, context, five):
-        return [context.element + five]
-
-    d = (self.p
-         | df.Create('t', [1, 2, 3]).with_output_types(int)
-         | df.ParDo('add', AddWithFive(), 5))
-
-    assert_that(d, equal_to([6, 7, 8]))
-    self.p.run()
-
-  def test_do_fn_pipeline_pipeline_type_check_violated(self):
-    @with_input_types(str, str)
-    @with_output_types(typehints.List[str])
-    class ToUpperCaseWithPrefix(df.DoFn):
-      def process(self, context, prefix):
-        return [prefix + context.element.upper()]
-
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      d = (self.p
-           | df.Create('t', [1, 2, 3]).with_output_types(int)
-           | df.ParDo('upper', ToUpperCaseWithPrefix(), 'hello'))
-
-    self.assertEqual("Type hint violation for 'upper': "
-                     "requires <type 'str'> but got <type 'int'> for context",
-                     e.exception.message)
-
-  def test_do_fn_pipeline_runtime_type_check_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    @with_input_types(int, int)
-    @with_output_types(int)
-    class AddWithNum(df.DoFn):
-      def process(self, context, num):
-        return [context.element + num]
-
-    d = (self.p
-         | df.Create('t', [1, 2, 3]).with_output_types(int)
-         | df.ParDo('add', AddWithNum(), 5))
-
-    assert_that(d, equal_to([6, 7, 8]))
-    self.p.run()
-
-  def test_do_fn_pipeline_runtime_type_check_violated(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    @with_input_types(int, int)
-    @with_output_types(typehints.List[int])
-    class AddWithNum(df.DoFn):
-      def process(self, context, num):
-        return [context.element + num]
-
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      d = (self.p
-           | df.Create('t', ['1', '2', '3']).with_output_types(str)
-           | df.ParDo('add', AddWithNum(), 5))
-      self.p.run()
-
-    self.assertEqual("Type hint violation for 'add': "
-                     "requires <type 'int'> but got <type 'str'> for context",
-                     e.exception.message)
-
-  def test_pardo_does_not_type_check_using_type_hint_decorators(self):
-    @with_input_types(a=int)
-    @with_output_types(typehints.List[str])
-    def int_to_str(a):
-      return [str(a)]
-
-    # The function above is expecting an int for its only parameter. However, it
-    # will receive a str instead, which should result in a raised exception.
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      c = (self.p
-           | df.Create('s', ['b', 'a', 'r']).with_output_types(str)
-           | df.FlatMap('to str', int_to_str))
-
-    self.assertEqual("Type hint violation for 'to str': "
-                     "requires <type 'int'> but got <type 'str'> for a",
-                     e.exception.message)
-
-  def test_pardo_properly_type_checks_using_type_hint_decorators(self):
-    @with_input_types(a=str)
-    @with_output_types(typehints.List[str])
-    def to_all_upper_case(a):
-      return [a.upper()]
-
-    # If this type-checks than no error should be raised.
-    d = (self.p
-         | df.Create('t', ['t', 'e', 's', 't']).with_output_types(str)
-         | df.FlatMap('case', to_all_upper_case))
-    assert_that(d, equal_to(['T', 'E', 'S', 'T']))
-    self.p.run()
-
-    # Output type should have been recognized as 'str' rather than List[str] to
-    # do the flatten part of FlatMap.
-    self.assertEqual(str, d.element_type)
-
-  def test_pardo_does_not_type_check_using_type_hint_methods(self):
-    # The first ParDo outputs pcoll's of type int, however the second ParDo is
-    # expecting pcoll's of type str instead.
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
-       | (df.FlatMap('score', lambda x: [1] if x == 't' else [2])
-          .with_input_types(str).with_output_types(int))
-       | (df.FlatMap('upper', lambda x: [x.upper()])
-          .with_input_types(str).with_output_types(str)))
-
-    self.assertEqual("Type hint violation for 'upper': "
-                     "requires <type 'str'> but got <type 'int'> for x",
-                     e.exception.message)
-
-  def test_pardo_properly_type_checks_using_type_hint_methods(self):
-    # Pipeline should be created successfully without an error
-    d = (self.p
-         | df.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
-         | df.FlatMap('dup', lambda x: [x + x])
-            .with_input_types(str).with_output_types(str)
-         | df.FlatMap('upper', lambda x: [x.upper()])
-            .with_input_types(str).with_output_types(str))
-
-    assert_that(d, equal_to(['TT', 'EE', 'SS', 'TT']))
-    self.p.run()
-
-  def test_map_does_not_type_check_using_type_hints_methods(self):
-    # The transform before 'Map' has indicated that it outputs PCollections with
-    # int's, while Map is expecting one of str.
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      d = (self.p
-           | df.Create('s', [1, 2, 3, 4]).with_output_types(int)
-           | df.Map('upper', lambda x: x.upper()).with_input_types(str).with_output_types(str))
-
-    self.assertEqual("Type hint violation for 'upper': "
-                     "requires <type 'str'> but got <type 'int'> for x",
-                     e.exception.message)
-
-  def test_map_properly_type_checks_using_type_hints_methods(self):
-    # No error should be raised if this type-checks properly.
-    d = (self.p
-         | df.Create('s', [1, 2, 3, 4]).with_output_types(int)
-         | df.Map('to_str', lambda x: str(x)).with_input_types(int).with_output_types(str))
-    assert_that(d, equal_to(['1', '2', '3', '4']))
-    self.p.run()
-
-  def test_map_does_not_type_check_using_type_hints_decorator(self):
-    @with_input_types(s=str)
-    @with_output_types(str)
-    def upper(s):
-      return s.upper()
-
-    # Hinted function above expects a str at pipeline construction.
-    # However, 'Map' should detect that Create has hinted an int instead.
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      d = (self.p
-           | df.Create('s', [1, 2, 3, 4]).with_output_types(int)
-           | df.Map('upper', upper))
-
-    self.assertEqual("Type hint violation for 'upper': "
-                     "requires <type 'str'> but got <type 'int'> for s",
-                     e.exception.message)
-
-  def test_map_properly_type_checks_using_type_hints_decorator(self):
-    @with_input_types(a=bool)
-    @with_output_types(int)
-    def bool_to_int(a):
-      return int(a)
-
-    # If this type-checks than no error should be raised.
-    d = (self.p
-         | df.Create('bools', [True, False, True]).with_output_types(bool)
-         | df.Map('to_ints', bool_to_int))
-    assert_that(d, equal_to([1, 0, 1]))
-    self.p.run()
-
-  def test_filter_does_not_type_check_using_type_hints_method(self):
-    # Filter is expecting an int but instead looks to the 'left' and sees a str
-    # incoming.
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('strs', ['1', '2', '3', '4', '5']).with_output_types(str)
-       | df.Map('lower', lambda x: x.lower()).with_input_types(str).with_output_types(str)
-       | df.Filter('below 3', lambda x: x < 3).with_input_types(int))
-
-    self.assertEqual("Type hint violation for 'below 3': "
-                     "requires <type 'int'> but got <type 'str'> for x",
-                     e.exception.message)
-
-  def test_filter_type_checks_using_type_hints_method(self):
-    # No error should be raised if this type-checks properly.
-    d = (self.p
-         | df.Create('strs', ['1', '2', '3', '4', '5']).with_output_types(str)
-         | df.Map('to int', lambda x: int(x)).with_input_types(str).with_output_types(int)
-         | df.Filter('below 3', lambda x: x < 3).with_input_types(int))
-    assert_that(d, equal_to([1, 2]))
-    self.p.run()
-
-  def test_filter_does_not_type_check_using_type_hints_decorator(self):
-    @with_input_types(a=float)
-    def more_than_half(a):
-      return a > 0.50
-
-    # Func above was hinted to only take a float, yet an int will be passed.
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      d = (self.p
-           | df.Create('ints', [1, 2, 3, 4]).with_output_types(int)
-           | df.Filter('half', more_than_half))
-
-    self.assertEqual("Type hint violation for 'half': "
-                     "requires <type 'float'> but got <type 'int'> for a",
-                     e.exception.message)
-
-  def test_filter_type_checks_using_type_hints_decorator(self):
-    @with_input_types(b=int)
-    def half(b):
-      import random
-      return bool(random.choice([0, 1]))
-
-    # Filter should deduce that it returns the same type that it takes.
-    (self.p
-     | df.Create('str', range(5)).with_output_types(int)
-     | df.Filter('half', half)
-     | df.Map('to bool', lambda x: bool(x)).with_input_types(int).with_output_types(bool))
-
-  def test_group_by_key_only_output_type_deduction(self):
-    d = (self.p
-         | df.Create('str', ['t', 'e', 's', 't']).with_output_types(str)
-         | (df.Map('pair', lambda x: (x, ord(x)))
-            .with_output_types(typehints.KV[str, str]))
-         | df.GroupByKeyOnly('O'))
-
-    # Output type should correctly be deduced.
-    # GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
-    self.assertCompatible(typehints.KV[str, typehints.Iterable[str]],
-                          d.element_type)
-
-  def test_group_by_key_output_type_deduction(self):
-    d = (self.p
-         | df.Create('str', range(20)).with_output_types(int)
-         | (df.Map('pair negative', lambda x: (x % 5, -x))
-            .with_output_types(typehints.KV[int, int]))
-         | df.GroupByKey('T'))
-
-    # Output type should correctly be deduced.
-    # GBK should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
-    self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
-                          d.element_type)
-
-  def test_group_by_key_only_does_not_type_check(self):
-    # GBK will be passed raw int's here instead of some form of KV[A, B].
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      d = (self.p
-           | df.Create('s', [1, 2, 3]).with_output_types(int)
-           | df.GroupByKeyOnly('F'))
-
-    self.assertEqual("Input type hint violation at F: "
-                     "expected Tuple[TypeVariable[K], TypeVariable[V]], "
-                     "got <type 'int'>",
-                     e.exception.message)
-
-  def test_group_by_does_not_type_check(self):
-    # Create is returning a List[int, str], rather than a KV[int, str] that is
-    # aliased to Tuple[int, str].
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      d = (self.p
-           | (df.Create('s', range(5))
-              .with_output_types(typehints.Iterable[int]))
-           | df.GroupByKey('T'))
-
-    self.assertEqual("Input type hint violation at T: "
-                     "expected Tuple[TypeVariable[K], TypeVariable[V]], "
-                     "got Iterable[int]",
-                     e.exception.message)
-
-  def test_pipeline_checking_pardo_insufficient_type_information(self):
-    self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
-
-    # Type checking is enabled, but 'Create' doesn't pass on any relevant type
-    # information to the ParDo.
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('nums', range(5))
-       | df.FlatMap('mod dup', lambda x: (x % 2, x)))
-
-    self.assertEqual('Pipeline type checking is enabled, however no output '
-                     'type-hint was found for the PTransform Create(nums)',
-                     e.exception.message)
-
-  def test_pipeline_checking_gbk_insufficient_type_information(self):
-    self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
-    # Type checking is enabled, but 'Map' doesn't pass on any relevant type
-    # information to GBK-only.
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('nums', range(5)).with_output_types(int)
-       | df.Map('mod dup', lambda x: (x % 2, x))
-       | df.GroupByKeyOnly('G'))
-
-    self.assertEqual('Pipeline type checking is enabled, however no output '
-                     'type-hint was found for the PTransform '
-                     'ParDo(mod dup)',
-                     e.exception.message)
-
-  def test_disable_pipeline_type_check(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-
-    # The pipeline below should raise a TypeError, however pipeline type
-    # checking was disabled above.
-    (self.p
-     | df.Create('t', [1, 2, 3]).with_output_types(int)
-     | df.Map('lower', lambda x: x.lower()).with_input_types(str).with_output_types(str))
-
-  def test_run_time_type_checking_enabled_type_violation(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    @with_output_types(str)
-    @with_input_types(x=int)
-    def int_to_string(x):
-      return str(x)
-
-    # Function above has been type-hinted to only accept an int. But in the
-    # pipeline execution it'll be passed a string due to the output of Create.
-    (self.p
-     | df.Create('t', ['some_string'])
-     | df.Map('to str', int_to_string))
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      self.p.run()
-
-    self.assertStartswith(
-        e.exception.message,
-        "Runtime type violation detected within ParDo(to str): "
-        "Type-hint for argument: 'x' violated. "
-        "Expected an instance of <type 'int'>, "
-        "instead found some_string, an instance of <type 'str'>.")
-
-  def test_run_time_type_checking_enabled_types_satisfied(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    @with_output_types(typehints.KV[int, str])
-    @with_input_types(x=str)
-    def group_with_upper_ord(x):
-      return (ord(x.upper()) % 5, x)
-
-    # Pipeline checking is off, but the above function should satisfy types at
-    # run-time.
-    result = (self.p
-              | df.Create('t', ['t', 'e', 's', 't', 'i', 'n', 'g']).with_output_types(str)
-              | df.Map('gen keys', group_with_upper_ord)
-              | df.GroupByKey('O'))
-
-    assert_that(result, equal_to([(1, ['g']),
-                                  (3, ['s', 'i', 'n']),
-                                  (4, ['t', 'e', 't'])]))
-    self.p.run()
-
-  def test_pipeline_checking_satisfied_but_run_time_types_violate(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    @with_output_types(typehints.KV[bool, int])
-    @with_input_types(a=int)
-    def is_even_as_key(a):
-      # Simulate a programming error, should be: return (a % 2 == 0, a)
-      # However this returns KV[int, int]
-      return (a % 2, a)
-
-    (self.p
-     | df.Create('nums', range(5)).with_output_types(int)
-     | df.Map('is even', is_even_as_key)
-     | df.GroupByKey('parity'))
-
-    # Although all the types appear to be correct when checked at pipeline
-    # construction. Runtime type-checking should detect the 'is_even_as_key' is
-    # returning Tuple[int, int], instead of Tuple[bool, int].
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      self.p.run()
-
-    self.assertStartswith(
-        e.exception.message,
-        "Runtime type violation detected within ParDo(is even): "
-        "Tuple[bool, int] hint type-constraint violated. "
-        "The type of element #0 in the passed tuple is incorrect. "
-        "Expected an instance of type bool, "
-        "instead received an instance of type int.")
-
-  def test_pipeline_checking_satisfied_run_time_checking_satisfied(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-
-    @with_output_types(typehints.KV[bool, int])
-    @with_input_types(a=int)
-    def is_even_as_key(a):
-      # The programming error in the above test-case has now been fixed.
-      # Everything should properly type-check.
-      return (a % 2 == 0, a)
-
-    result = (self.p
-              | df.Create('nums', range(5)).with_output_types(int)
-              | df.Map('is even', is_even_as_key)
-              | df.GroupByKey('parity'))
-
-    assert_that(result, equal_to([(False, [1, 3]), (True, [0, 2, 4])]))
-    self.p.run()
-
-  def test_pipeline_runtime_checking_violation_simple_type_input(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-
-    # The type-hinted applied via the 'with_input_types()' method indicates the
-    # ParDo should receive an instance of type 'str', however an 'int' will be
-    # passed instead.
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p | df.Create('n', [1, 2, 3])
-       | (df.FlatMap('to int', lambda x: [int(x)])
-          .with_input_types(str).with_output_types(int))
-      )
-      self.p.run()
-
-    self.assertStartswith(
-        e.exception.message,
-        "Runtime type violation detected within ParDo(to int): "
-        "Type-hint for argument: 'x' violated. "
-        "Expected an instance of <type 'str'>, "
-        "instead found 1, an instance of <type 'int'>.")
-
-  def test_pipeline_runtime_checking_violation_composite_type_input(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('n', [(1, 3.0), (2, 4.9), (3, 9.5)])
-       | (df.FlatMap('add', lambda (x, y): [x + y])
-          .with_input_types(typehints.Tuple[int, int]).with_output_types(int))
-      )
-      self.p.run()
-
-    self.assertStartswith(
-        e.exception.message,
-        "Runtime type violation detected within ParDo(add): "
-        "Type-hint for argument: 'y' violated. "
-        "Expected an instance of <type 'int'>, "
-        "instead found 3.0, an instance of <type 'float'>.")
-
-  def test_pipeline_runtime_checking_violation_simple_type_output(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-
-    # The type-hinted applied via the 'returns()' method indicates the ParDo
-    # should output an instance of type 'int', however a 'float' will be
-    # generated instead.
-    print "HINTS", df.FlatMap(
-        'to int',
-        lambda x: [float(x)]).with_input_types(int).with_output_types(
-            int).get_type_hints()
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p | df.Create('n', [1, 2, 3])
-       | (df.FlatMap('to int', lambda x: [float(x)])
-          .with_input_types(int).with_output_types(int))
-      )
-      self.p.run()
-
-    self.assertStartswith(
-        e.exception.message,
-        "Runtime type violation detected within "
-        "ParDo(to int): "
-        "According to type-hint expected output should be "
-        "of type <type 'int'>. Instead, received '1.0', "
-        "an instance of type <type 'float'>.")
-
-  def test_pipeline_runtime_checking_violation_composite_type_output(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-
-    # The type-hinted applied via the 'returns()' method indicates the ParDo
-    # should return an instance of type: Tuple[float, int]. However, an instance
-    # of 'int' will be generated instead.
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('n', [(1, 3.0), (2, 4.9), (3, 9.5)])
-       | (df.FlatMap('swap', lambda (x, y): [x + y])
-          .with_input_types(typehints.Tuple[int, float])
-          .with_output_types(typehints.Tuple[float, int]))
-      )
-      self.p.run()
-
-    self.assertStartswith(
-        e.exception.message,
-        "Runtime type violation detected within "
-        "ParDo(swap): Tuple type constraint violated. "
-        "Valid object instance must be of type 'tuple'. Instead, "
-        "an instance of 'float' was received.")
-
-  def test_pipline_runtime_checking_violation_with_side_inputs_decorator(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    @with_output_types(int)
-    @with_input_types(a=int, b=int)
-    def add(a, b):
-      return a + b
-
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p | df.Create('t', [1, 2, 3, 4]) | df.Map('add 1', add, 1.0))
-      self.p.run()
-
-    self.assertStartswith(
-        e.exception.message,
-        "Runtime type violation detected within ParDo(add 1): "
-        "Type-hint for argument: 'b' violated. "
-        "Expected an instance of <type 'int'>, "
-        "instead found 1.0, an instance of <type 'float'>.")
-
-  def test_pipline_runtime_checking_violation_with_side_inputs_via_method(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('t', [1, 2, 3, 4])
-       | (df.Map('add 1', lambda x, one: x + one, 1.0)
-          .with_input_types(int, int)
-          .with_output_types(float)))
-      self.p.run()
-
-    self.assertStartswith(
-        e.exception.message,
-        "Runtime type violation detected within ParDo(add 1): "
-        "Type-hint for argument: 'one' violated. "
-        "Expected an instance of <type 'int'>, "
-        "instead found 1.0, an instance of <type 'float'>.")
-
-  def test_combine_properly_pipeline_type_checks_using_decorator(self):
-    @with_output_types(int)
-    @with_input_types(ints=typehints.Iterable[int])
-    def sum_ints(ints):
-      return sum(ints)
-
-    d = (self.p
-         | df.Create('t', [1, 2, 3]).with_output_types(int)
-         | df.CombineGlobally('sum', sum_ints))
-
-    self.assertEqual(int, d.element_type)
-    assert_that(d, equal_to([6]))
-    self.p.run()
-
-  def test_combine_func_type_hint_does_not_take_iterable_using_decorator(self):
-    @with_output_types(int)
-    @with_input_types(a=int)
-    def bad_combine(a):
-      5 + a
-
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('m', [1, 2, 3]).with_output_types(int)
-       | df.CombineGlobally('add', bad_combine))
-
-    self.assertEqual(
-        "All functions for a Combine PTransform must accept a "
-         "single argument compatible with: Iterable[Any]. "
-         "Instead a function with input type: <type 'int'> was received.",
-         e.exception.message)
-
-  def test_combine_pipeline_type_propagation_using_decorators(self):
-    @with_output_types(int)
-    @with_input_types(ints=typehints.Iterable[int])
-    def sum_ints(ints):
-      return sum(ints)
-
-    @with_output_types(typehints.List[int])
-    @with_input_types(n=int)
-    def range_from_zero(n):
-      return list(range(n+1))
-
-    d = (self.p
-         | df.Create('t', [1, 2, 3]).with_output_types(int)
-         | df.CombineGlobally('sum', sum_ints)
-         | df.ParDo('range', range_from_zero))
-
-    self.assertEqual(int, d.element_type)
-    assert_that(d, equal_to([0, 1, 2, 3, 4, 5, 6]))
-    self.p.run()
-
-  def test_combine_runtime_type_check_satisfied_using_decorators(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-
-    @with_output_types(int)
-    @with_input_types(ints=typehints.Iterable[int])
-    def iter_mul(ints):
-      return reduce(operator.mul, ints, 1)
-
-    d = (self.p
-         | df.Create('k', [5, 5, 5, 5]).with_output_types(int)
-         | df.CombineGlobally('mul', iter_mul))
-
-    assert_that(d, equal_to([625]))
-    self.p.run()
-
-  def test_combine_runtime_type_check_violation_using_decorators(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    # Combine fn is returning the incorrect type
-    @with_output_types(int)
-    @with_input_types(ints=typehints.Iterable[int])
-    def iter_mul(ints):
-      return str(reduce(operator.mul, ints, 1))
-
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('k', [5, 5, 5, 5]).with_output_types(int)
-       | df.CombineGlobally('mul', iter_mul))
-      self.p.run()
-
-    self.assertStartswith(
-        e.exception.message,
-        "Runtime type violation detected within "
-        "ParDo(mul/CombinePerKey/Combine/ParDo(CombineValuesDoFn)): "
-        "Tuple[TypeVariable[K], int] hint type-constraint violated. "
-        "The type of element #1 in the passed tuple is incorrect. "
-        "Expected an instance of type int, "
-        "instead received an instance of type str.")
-
-  def test_combine_pipeline_type_check_using_methods(self):
-    d = (self.p
-         | df.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
-         | (df.CombineGlobally('concat', lambda s: ''.join(s))
-            .with_input_types(str).with_output_types(str)))
-
-    def matcher(expected):
-      def match(actual):
-        equal_to(expected)(list(actual[0]))
-      return match
-    assert_that(d, matcher('estt'))
-    self.p.run()
-
-  def test_combine_runtime_type_check_using_methods(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    d = (self.p
-         | df.Create('s', range(5)).with_output_types(int)
-         | (df.CombineGlobally('sum', lambda s: sum(s))
-            .with_input_types(int).with_output_types(int)))
-
-    assert_that(d, equal_to([10]))
-    self.p.run()
-
-  def test_combine_pipeline_type_check_violation_using_methods(self):
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('e', range(3)).with_output_types(int)
-       | (df.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
-          .with_input_types(str).with_output_types(str)))
-
-    self.assertEqual("Input type hint violation at sort join: "
-                     "expected <type 'str'>, got <type 'int'>",
-                     e.exception.message)
-
-  def test_combine_runtime_type_check_violation_using_methods(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('e', range(3)).with_output_types(int)
-       | (df.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
-          .with_input_types(str).with_output_types(str)))
-      self.p.run()
-
-    self.assertStartswith(
-        e.exception.message,
-        "Runtime type violation detected within "
-        "ParDo(sort join/KeyWithVoid): "
-        "Type-hint for argument: 'v' violated. "
-        "Expected an instance of <type 'str'>, "
-        "instead found 0, an instance of <type 'int'>.")
-
-  def test_combine_insufficient_type_hint_information(self):
-    self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
-
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('e', range(3)).with_output_types(int)
-       | df.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
-       | df.Map('f', lambda x: x + 1))
-
-    self.assertEqual(
-        'Pipeline type checking is enabled, '
-        'however no output type-hint was found for the PTransform '
-        'ParDo(sort join/CombinePerKey/Combine/ParDo(CombineValuesDoFn))',
-        e.exception.message)
-
-  def test_mean_globally_pipeline_checking_satisfied(self):
-    d = (self.p
-         | df.Create('c', range(5)).with_output_types(int)
-         | combine.Mean.Globally('mean'))
-
-    self.assertTrue(d.element_type is float)
-    assert_that(d, equal_to([2.0]))
-    self.p.run()
-
-  def test_mean_globally_pipeline_checking_violated(self):
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      d = (self.p
-           | df.Create('c', ['test']).with_output_types(str)
-           | combine.Mean.Globally('mean'))
-
-    self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
-                     "requires Tuple[TypeVariable[K], "
-                     "Iterable[Union[float, int, long]]] "
-                     "but got Tuple[None, Iterable[str]] for p_context",
-                     e.exception.message)
-
-  def test_mean_globally_runtime_checking_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    d = (self.p
-         | df.Create('c', range(5)).with_output_types(int)
-         | combine.Mean.Globally('mean'))
-
-    self.assertTrue(d.element_type is float)
-    assert_that(d, equal_to([2.0]))
-    self.p.run()
-
-  def test_mean_globally_runtime_checking_violated(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('c', ['t', 'e', 's', 't']).with_output_types(str)
-       | combine.Mean.Globally('mean'))
-      self.p.run()
-      self.assertEqual("Runtime type violation detected for transform input "
-                       "when executing ParDoFlatMap(Combine): Tuple[Any, "
-                       "Iterable[Union[int, float]]] hint type-constraint "
-                       "violated. The type of element #1 in the passed tuple "
-                       "is incorrect. Iterable[Union[int, float]] hint "
-                       "type-constraint violated. The type of element #0 in "
-                       "the passed Iterable is incorrect: Union[int, float] "
-                       "type-constraint violated. Expected an instance of one "
-                       "of: ('int', 'float'), received str instead.",
-                       e.exception.message)
-
-  def test_mean_per_key_pipeline_checking_satisfied(self):
-    d = (self.p
-         | df.Create('c', range(5)).with_output_types(int)
-         | (df.Map('even group', lambda x: (not x % 2, x))
-            .with_output_types(typehints.KV[bool, int]))
-         | combine.Mean.PerKey('even mean'))
-
-    self.assertCompatible(typehints.KV[bool, float], d.element_type)
-    assert_that(d, equal_to([(False, 2.0), (True, 2.0)]))
-    self.p.run()
-
-  def test_mean_per_key_pipeline_checking_violated(self):
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('e', map(str, range(5))).with_output_types(str)
-       | (df.Map('upper pair', lambda x: (x.upper(), x))
-          .with_output_types(typehints.KV[str, str]))
-       | combine.Mean.PerKey('even mean'))
-      self.p.run()
-
-    self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
-                     "requires Tuple[TypeVariable[K], "
-                     "Iterable[Union[float, int, long]]] "
-                     "but got Tuple[str, Iterable[str]] for p_context",
-                     e.exception.message)
-
-  def test_mean_per_key_runtime_checking_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    d = (self.p
-         | df.Create('c', range(5)).with_output_types(int)
-         | (df.Map('odd group', lambda x: (bool(x % 2), x))
-            .with_output_types(typehints.KV[bool, int]))
-         | combine.Mean.PerKey('odd mean'))
-
-    self.assertCompatible(typehints.KV[bool, float], d.element_type)
-    assert_that(d, equal_to([(False, 2.0), (True, 2.0)]))
-    self.p.run()
-
-  def test_mean_per_key_runtime_checking_violated(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('c', range(5)).with_output_types(int)
-       | (df.Map('odd group', lambda x: (x, str(bool(x % 2))))
-          .with_output_types(typehints.KV[int, str]))
-       | combine.Mean.PerKey('odd mean'))
-      self.p.run()
-
-    self.assertStartswith(
-        e.exception.message,
-        "Runtime type violation detected within "
-        "ParDo(odd mean/CombinePerKey(MeanCombineFn)/"
-        "Combine/ParDo(CombineValuesDoFn)): "
-        "Type-hint for argument: 'p_context' violated: "
-        "Tuple[TypeVariable[K], Iterable[Union[float, int, long]]]"
-        " hint type-constraint violated. "
-        "The type of element #1 in the passed tuple is incorrect. "
-        "Iterable[Union[float, int, long]] "
-        "hint type-constraint violated. The type of element #0 "
-        "in the passed Iterable is incorrect: "
-        "Union[float, int, long] type-constraint violated. "
-        "Expected an instance of one of: "
-        "('float', 'int', 'long'), received str instead.")
-
-  def test_count_globally_pipeline_type_checking_satisfied(self):
-    d = (self.p
-         | df.Create('p', range(5)).with_output_types(int)
-         | combine.Count.Globally('count int'))
-
-    self.assertTrue(d.element_type is int)
-    assert_that(d, equal_to([5]))
-    self.p.run()
-
-  def test_count_globally_runtime_type_checking_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    d = (self.p
-         | df.Create('p', range(5)).with_output_types(int)
-         | combine.Count.Globally('count int'))
-
-    self.assertTrue(d.element_type is int)
-    assert_that(d, equal_to([5]))
-    self.p.run()
-
-  def test_count_perkey_pipeline_type_checking_satisfied(self):
-    d = (self.p
-         | df.Create('p', range(5)).with_output_types(int)
-         | (df.Map('even group', lambda x: (not x % 2, x))
-            .with_output_types(typehints.KV[bool, int]))
-         | combine.Count.PerKey('count int'))
-
-    self.assertCompatible(typehints.KV[bool, int], d.element_type)
-    assert_that(d, equal_to([(False, 2), (True, 3)]))
-    self.p.run()
-
-  def test_count_perkey_pipeline_type_checking_violated(self):
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('p', range(5)).with_output_types(int)
-       | combine.Count.PerKey('count int'))
-
-    self.assertEqual("Input type hint violation at GroupByKey: "
-                     "expected Tuple[TypeVariable[K], TypeVariable[V]], "
-                     "got <type 'int'>",
-                     e.exception.message)
-
-  def test_count_perkey_runtime_type_checking_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    d = (self.p
-         | df.Create('c', ['t', 'e', 's', 't']).with_output_types(str)
-         | df.Map('dup key', lambda x: (x, x)).with_output_types(typehints.KV[str, str])
-         | combine.Count.PerKey('count dups'))
-
-    self.assertCompatible(typehints.KV[str, int], d.element_type)
-    assert_that(d, equal_to([('e', 1), ('s', 1), ('t', 2)]))
-    self.p.run()
-
-  def test_count_perelement_pipeline_type_checking_satisfied(self):
-    d = (self.p
-         | df.Create('w', [1, 1, 2, 3]).with_output_types(int)
-         | combine.Count.PerElement('count elems'))
-
-    self.assertCompatible(typehints.KV[int, int], d.element_type)
-    assert_that(d, equal_to([(1, 2), (2, 1), (3, 1)]))
-    self.p.run()
-
-  def test_count_perelement_pipeline_type_checking_violated(self):
-    self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
-
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p
-       | df.Create('f', [1, 1, 2, 3])
-       | combine.Count.PerElement('count elems'))
-
-    self.assertEqual('Pipeline type checking is enabled, however no output '
-                     'type-hint was found for the PTransform '
-                     'Create(f)',
-                     e.exception.message)
-
-  def test_count_perelement_runtime_type_checking_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    d = (self.p
-         | df.Create('w', [True, True, False, True, True]).with_output_types(bool)
-         | combine.Count.PerElement('count elems'))
-
-    self.assertCompatible(typehints.KV[bool, int], d.element_type)
-    assert_that(d, equal_to([(False, 1), (True, 4)]))
-    self.p.run()
-
-  def test_top_of_pipeline_checking_satisfied(self):
-    d = (self.p
-         | df.Create('n', range(5, 11)).with_output_types(int)
-         | combine.Top.Of('top 3', 3, lambda x, y: x < y))
-
-    self.assertCompatible(typehints.Iterable[int],
-                          d.element_type)
-    assert_that(d, equal_to([[10, 9, 8]]))
-    self.p.run()
-
-  def test_top_of_runtime_checking_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    d = (self.p
-         | df.Create('n', list('testing')).with_output_types(str)
-         | combine.Top.Of('acii top', 3, lambda x, y: x < y))
-
-    self.assertCompatible(typehints.Iterable[str], d.element_type)
-    assert_that(d, equal_to([['t', 't', 's']]))
-    self.p.run()
-
-  def test_per_key_pipeline_checking_violated(self):
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      d = (self.p
-           | df.Create('n', range(100)).with_output_types(int)
-           | df.Map('num + 1', lambda x: x + 1).with_output_types(int)
-           | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
-
-    self.assertEqual("Input type hint violation at GroupByKey: "
-                     "expected Tuple[TypeVariable[K], TypeVariable[V]], "
-                     "got <type 'int'>",
-                     e.exception.message)
-
-  def test_per_key_pipeline_checking_satisfied(self):
-    d = (self.p
-         | df.Create('n', range(100)).with_output_types(int)
-         | (df.Map('group mod 3', lambda x: (x % 3, x))
-            .with_output_types(typehints.KV[int, int]))
-         | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
-
-    self.assertCompatible(typehints.Tuple[int, typehints.Iterable[int]],
-                          d.element_type)
-    assert_that(d, equal_to([(0, [99]), (1, [97]), (2, [98])]))
-    self.p.run()
-
-  def test_per_key_runtime_checking_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    d = (self.p
-         | df.Create('n', range(21))
-         | (df.Map('group mod 3', lambda x: (x % 3, x))
-            .with_output_types(typehints.KV[int, int]))
-         | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
-
-    self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
-                          d.element_type)
-    assert_that(d, equal_to([(0, [18]), (1, [19]), (2, [20])]))
-    self.p.run()
-
-  def test_sample_globally_pipeline_satisfied(self):
-    d = (self.p
-         | df.Create('m', [2, 2, 3, 3]).with_output_types(int)
-         | combine.Sample.FixedSizeGlobally('sample', 3))
-
-    self.assertCompatible(typehints.Iterable[int], d.element_type)
-    def matcher(expected_len):
-      def match(actual):
-        equal_to([expected_len])([len(actual[0])])
-      return match
-    assert_that(d, matcher(3))
-    self.p.run()
-
-  def test_sample_globally_runtime_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    d = (self.p
-         | df.Create('m', [2, 2, 3, 3]).with_output_types(int)
-         | combine.Sample.FixedSizeGlobally('sample', 2))
-
-    self.assertCompatible(typehints.Iterable[int], d.element_type)
-    def matcher(expected_len):
-      def match(actual):
-        equal_to([expected_len])([len(actual[0])])
-      return match
-    assert_that(d, matcher(2))
-    self.p.run()
-
-  def test_sample_per_key_pipeline_satisfied(self):
-    d = (self.p
-         | (df.Create('m', [(1, 2), (1, 2), (2, 3), (2, 3)])
-            .with_output_types(typehints.KV[int, int]))
-         | combine.Sample.FixedSizePerKey('sample', 2))
-
-    self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
-                          d.element_type)
-    def matcher(expected_len):
-      def match(actual):
-        for _, sample in actual:
-          equal_to([expected_len])([len(sample)])
-      return match
-    assert_that(d, matcher(2))
-    self.p.run()
-
-  def test_sample_per_key_runtime_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    d = (self.p
-         | (df.Create('m', [(1, 2), (1, 2), (2, 3), (2, 3)])
-            .with_output_types(typehints.KV[int, int]))
-         | combine.Sample.FixedSizePerKey('sample', 1))
-
-    self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
-                          d.element_type)
-    def matcher(expected_len):
-      def match(actual):
-        for _, sample in actual:
-          equal_to([expected_len])([len(sample)])
-      return match
-    assert_that(d, matcher(1))
-    self.p.run()
-
-  def test_to_list_pipeline_check_satisfied(self):
-    d = (self.p
-         | df.Create('c', (1, 2, 3, 4)).with_output_types(int)
-         | combine.ToList('to list'))
-
-    self.assertCompatible(typehints.List[int], d.element_type)
-    def matcher(expected):
-      def match(actual):
-        equal_to(expected)(actual[0])
-      return match
-    assert_that(d, matcher([1, 2, 3, 4]))
-    self.p.run()
-
-  def test_to_list_runtime_check_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    d = (self.p
-         | df.Create('c', list('test')).with_output_types(str)
-         | combine.ToList('to list'))
-
-    self.assertCompatible(typehints.List[str], d.element_type)
-    def matcher(expected):
-      def match(actual):
-        equal_to(expected)(actual[0])
-      return match
-    assert_that(d, matcher(['e', 's', 't', 't']))
-    self.p.run()
-
-  def test_to_dict_pipeline_check_violated(self):
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      d = (self.p
-           | df.Create('d', [1, 2, 3, 4]).with_output_types(int)
-           | combine.ToDict('to dict'))
-
-    self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
-                     "requires Tuple[TypeVariable[K], "
-                     "Iterable[Tuple[TypeVariable[K], TypeVariable[V]]]] "
-                     "but got Tuple[None, Iterable[int]] for p_context",
-                     e.exception.message)
-
-  def test_to_dict_pipeline_check_satisfied(self):
-    d = (self.p
-         | df.Create(
-             'd',
-             [(1, 2), (3, 4)]).with_output_types(typehints.Tuple[int, int])
-         | combine.ToDict('to dict'))
-
-    self.assertCompatible(typehints.Dict[int, int], d.element_type)
-    assert_that(d, equal_to([{1: 2, 3: 4}]))
-    self.p.run()
-
-  def test_to_dict_runtime_check_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    d = (self.p
-         | (df.Create('d', [('1', 2), ('3', 4)])
-            .with_output_types(typehints.Tuple[str, int]))
-         | combine.ToDict('to dict'))
-
-    self.assertCompatible(typehints.Dict[str, int], d.element_type)
-    assert_that(d, equal_to([{'1': 2, '3': 4}]))
-    self.p.run()
-
-  def test_runtime_type_check_python_type_error(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-
-    with self.assertRaises(TypeError) as e:
-      (self.p
-       | df.Create('t', [1, 2, 3]).with_output_types(int)
-       | df.Map('len', lambda x: len(x)).with_output_types(int))
-      self.p.run()
-
-    # Our special type-checking related TypeError shouldn't have been raised.
-    # Instead the above pipeline should have triggered a regular Python runtime
-    # TypeError.
-    self.assertEqual("object of type 'int' has no len() [while running 'len']",
-                     e.exception.message)
-    self.assertFalse(isinstance(e, typehints.TypeCheckError))
-
-  def test_pardo_type_inference(self):
-    self.assertEqual(int,
-                     df.Filter(lambda x: False).infer_output_type(int))
-    self.assertEqual(typehints.Tuple[str, int],
-                     df.Map(lambda x: (x, 1)).infer_output_type(str))
-
-  def test_gbk_type_inference(self):
-    self.assertEqual(
-        typehints.Tuple[str, typehints.Iterable[int]],
-        df.core.GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
-
-  def test_pipeline_inference(self):
-    created = self.p | df.Create('c', ['a', 'b', 'c'])
-    mapped = created | df.Map('pair with 1', lambda x: (x, 1))
-    grouped = mapped | df.GroupByKey()
-    self.assertEqual(str, created.element_type)
-    self.assertEqual(typehints.KV[str, int], mapped.element_type)
-    self.assertEqual(typehints.KV[str, typehints.Iterable[int]],
-                     grouped.element_type)
-
-  def test_inferred_bad_kv_type(self):
-    with self.assertRaises(typehints.TypeCheckError) as e:
-      _ = (self.p
-           | df.Create('t', ['a', 'b', 'c'])
-           | df.Map('ungroupable', lambda x: (x, 0, 1.0))
-           | df.GroupByKey())
-
-    self.assertEqual('Input type hint violation at GroupByKey: '
-                     'expected Tuple[TypeVariable[K], TypeVariable[V]], '
-                     'got Tuple[str, int, float]',
-                     e.exception.message)
-
-  def test_type_inference_command_line_flag_toggle(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    x = self.p | df.Create('t', [1, 2, 3, 4])
-    self.assertIsNone(x.element_type)
-
-    self.p.options.view_as(TypeOptions).pipeline_type_check = True
-    x = self.p | df.Create('m', [1, 2, 3, 4])
-    self.assertEqual(int, x.element_type)
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/sideinputs.py b/sdks/python/google/cloud/dataflow/transforms/sideinputs.py
deleted file mode 100644
index b8efe82..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/sideinputs.py
+++ /dev/null
@@ -1,145 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Internal side input transforms and implementations.
-
-Important: this module is an implementation detail and should not be used
-directly by pipeline writers. Instead, users should use the helper methods
-AsSingleton, AsIter, AsList and AsDict in google.cloud.dataflow.pvalue.
-"""
-
-from __future__ import absolute_import
-
-from google.cloud.dataflow import pvalue
-from google.cloud.dataflow import typehints
-from google.cloud.dataflow.transforms.ptransform import PTransform
-
-
-class CreatePCollectionView(PTransform):
-  """Transform to materialize a given PCollectionView in the pipeline.
-
-  Important: this transform is an implementation detail and should not be used
-  directly by pipeline writers.
-  """
-
-  def __init__(self, view):
-    self.view = view
-    super(CreatePCollectionView, self).__init__()
-
-  def infer_output_type(self, input_type):
-    # TODO(ccy): Figure out if we want to create a new type of type hint, i.e.,
-    # typehints.View[...].
-    return input_type
-
-  def apply(self, pcoll):
-    return self.view
-
-
-class ViewAsSingleton(PTransform):
-  """Transform to view PCollection as a singleton PCollectionView.
-
-  Important: this transform is an implementation detail and should not be used
-  directly by pipeline writers. Use pvalue.AsSingleton(...) instead.
-  """
-
-  def __init__(self, has_default, default_value, label=None):
-    if label:
-      label = 'ViewAsSingleton(%s)' % label
-    super(ViewAsSingleton, self).__init__(label=label)
-    self.has_default = has_default
-    self.default_value = default_value
-
-  def apply(self, pcoll):
-    self._check_pcollection(pcoll)
-    input_type = pcoll.element_type
-    output_type = input_type
-    return (pcoll
-            | CreatePCollectionView(
-                pvalue.SingletonPCollectionView(
-                    pcoll.pipeline, self.has_default, self.default_value))
-            .with_input_types(input_type)
-            .with_output_types(output_type))
-
-
-class ViewAsIterable(PTransform):
-  """Transform to view PCollection as an iterable PCollectionView.
-
-  Important: this transform is an implementation detail and should not be used
-  directly by pipeline writers. Use pvalue.AsIter(...) instead.
-  """
-
-  def __init__(self, label=None):
-    if label:
-      label = 'ViewAsIterable(%s)' % label
-    super(ViewAsIterable, self).__init__(label=label)
-
-  def apply(self, pcoll):
-    self._check_pcollection(pcoll)
-    input_type = pcoll.element_type
-    output_type = typehints.Iterable[input_type]
-    return (pcoll
-            | CreatePCollectionView(
-                pvalue.IterablePCollectionView(pcoll.pipeline))
-            .with_input_types(input_type)
-            .with_output_types(output_type))
-
-
-class ViewAsList(PTransform):
-  """Transform to view PCollection as a list PCollectionView.
-
-  Important: this transform is an implementation detail and should not be used
-  directly by pipeline writers. Use pvalue.AsList(...) instead.
-  """
-
-  def __init__(self, label=None):
-    if label:
-      label = 'ViewAsList(%s)' % label
-    super(ViewAsList, self).__init__(label=label)
-
-  def apply(self, pcoll):
-    self._check_pcollection(pcoll)
-    input_type = pcoll.element_type
-    output_type = typehints.List[input_type]
-    return (pcoll
-            | CreatePCollectionView(pvalue.ListPCollectionView(pcoll.pipeline))
-            .with_input_types(input_type)
-            .with_output_types(output_type))
-
-K = typehints.TypeVariable('K')
-V = typehints.TypeVariable('V')
-@typehints.with_input_types(typehints.Tuple[K, V])
-@typehints.with_output_types(typehints.Dict[K, V])
-class ViewAsDict(PTransform):  # pylint: disable=g-wrong-blank-lines
-  """Transform to view PCollection as a dict PCollectionView.
-
-  Important: this transform is an implementation detail and should not be used
-  directly by pipeline writers. Use pvalue.AsDict(...) instead.
-  """
-
-  def __init__(self, label=None):
-    if label:
-      label = 'ViewAsDict(%s)' % label
-    super(ViewAsDict, self).__init__(label=label)
-
-  def apply(self, pcoll):
-    self._check_pcollection(pcoll)
-    input_type = pcoll.element_type
-    key_type, value_type = (
-        typehints.trivial_inference.key_value_types(input_type))
-    output_type = typehints.Dict[key_type, value_type]
-    return (pcoll
-            | CreatePCollectionView(
-                pvalue.DictPCollectionView(pcoll.pipeline))
-            .with_input_types(input_type)
-            .with_output_types(output_type))