You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:12:39 UTC

[04/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/window.py b/sdks/python/google/cloud/dataflow/transforms/window.py
deleted file mode 100644
index 6c0c2e8..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/window.py
+++ /dev/null
@@ -1,383 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Windowing concepts.
-
-A WindowInto transform logically divides up or groups the elements of a
-PCollection into finite windows according to a windowing function (derived from
-WindowFn).
-
-The output of WindowInto contains the same elements as input, but they have been
-logically assigned to windows. The next GroupByKey(s) transforms, including one
-within a composite transform, will group by the combination of keys and windows.
-
-Windowing a PCollection allows chunks of it to be processed individually, before
-the entire PCollection is available.  This is especially important for
-PCollection(s) with unbounded size, since the full PCollection is never
-available at once, since more data is continually arriving. For PCollection(s)
-with a bounded size (aka. conventional batch mode), by default, all data is
-implicitly in a single window (see GlobalWindows), unless WindowInto is
-applied.
-
-For example, a simple form of windowing divides up the data into fixed-width
-time intervals, using FixedWindows.
-
-Seconds are used as the time unit for the built-in windowing primitives here.
-Integer or floating point seconds can be passed to these primitives.
-
-Internally, seconds, with microsecond granularity, are stored as
-timeutil.Timestamp and timeutil.Duration objects. This is done to avoid
-precision errors that would occur with floating point representations.
-
-Custom windowing function classes can be created, by subclassing from
-WindowFn.
-"""
-
-from __future__ import absolute_import
-
-from google.cloud.dataflow import coders
-from google.cloud.dataflow.transforms import timeutil
-from google.cloud.dataflow.transforms.timeutil import Duration
-from google.cloud.dataflow.transforms.timeutil import MAX_TIMESTAMP
-from google.cloud.dataflow.transforms.timeutil import MIN_TIMESTAMP
-from google.cloud.dataflow.transforms.timeutil import Timestamp
-
-
-# TODO(ccy): revisit naming and semantics once Java Apache Beam finalizes their
-# behavior.
-class OutputTimeFn(object):
-  """Determines how output timestamps of grouping operations are assigned."""
-
-  OUTPUT_AT_EOW = 'OUTPUT_AT_EOW'
-  OUTPUT_AT_EARLIEST = 'OUTPUT_AT_EARLIEST'
-  OUTPUT_AT_LATEST = 'OUTPUT_AT_LATEST'
-  OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED'
-
-  @staticmethod
-  def get_impl(output_time_fn, window_fn):
-    if output_time_fn == OutputTimeFn.OUTPUT_AT_EOW:
-      return timeutil.OutputAtEndOfWindowImpl()
-    elif output_time_fn == OutputTimeFn.OUTPUT_AT_EARLIEST:
-      return timeutil.OutputAtEarliestInputTimestampImpl()
-    elif output_time_fn == OutputTimeFn.OUTPUT_AT_LATEST:
-      return timeutil.OutputAtLatestInputTimestampImpl()
-    elif output_time_fn == OutputTimeFn.OUTPUT_AT_EARLIEST_TRANSFORMED:
-      return timeutil.OutputAtEarliestTransformedInputTimestampImpl(window_fn)
-    else:
-      raise ValueError('Invalid OutputTimeFn: %s.' % output_time_fn)
-
-
-class WindowFn(object):
-  """An abstract windowing function defining a basic assign and merge."""
-
-  class AssignContext(object):
-    """Context passed to WindowFn.assign()."""
-
-    def __init__(self, timestamp, element=None, existing_windows=None):
-      self.timestamp = Timestamp.of(timestamp)
-      self.element = element
-      self.existing_windows = existing_windows
-
-  def assign(self, assign_context):
-    """Associates a timestamp and set of windows to an element."""
-    raise NotImplementedError
-
-  class MergeContext(object):
-    """Context passed to WindowFn.merge() to perform merging, if any."""
-
-    def __init__(self, windows):
-      self.windows = list(windows)
-
-    def merge(self, to_be_merged, merge_result):
-      raise NotImplementedError
-
-  def merge(self, merge_context):
-    """Returns a window that is the result of merging a set of windows."""
-    raise NotImplementedError
-
-  def get_window_coder(self):
-    return coders.PickleCoder()
-
-  def get_transformed_output_time(self, window, input_timestamp):  # pylint: disable=unused-argument
-    """Given input time and output window, returns output time for window.
-
-    If OutputTimeFn.OUTPUT_AT_EARLIEST_TRANSFORMED is used in the Windowing,
-    the output timestamp for the given window will be the earliest of the
-    timestamps returned by get_transformed_output_time() for elements of the
-    window.
-
-    Arguments:
-      window: Output window of element.
-      input_timestamp: Input timestamp of element as a timeutil.Timestamp
-        object.
-
-    Returns:
-      Transformed timestamp.
-    """
-    # By default, just return the input timestamp.
-    return input_timestamp
-
-
-class BoundedWindow(object):
-  """A window for timestamps in range (-infinity, end).
-
-  Attributes:
-    end: End of window.
-  """
-
-  def __init__(self, end):
-    self.end = Timestamp.of(end)
-
-  def __cmp__(self, other):
-    # Order first by endpoint, then arbitrarily.
-    return cmp(self.end, other.end) or cmp(hash(self), hash(other))
-
-  def __eq__(self, other):
-    raise NotImplementedError
-
-  def __hash__(self):
-    return hash(self.end)
-
-  def __repr__(self):
-    return '[?, %s)' % float(self.end)
-
-
-class IntervalWindow(BoundedWindow):
-  """A window for timestamps in range [start, end).
-
-  Attributes:
-    start: Start of window as seconds since Unix epoch.
-    end: End of window as seconds since Unix epoch.
-  """
-
-  def __init__(self, start, end):
-    super(IntervalWindow, self).__init__(end)
-    self.start = Timestamp.of(start)
-
-  def __hash__(self):
-    return hash((self.start, self.end))
-
-  def __eq__(self, other):
-    return self.start == other.start and self.end == other.end
-
-  def __repr__(self):
-    return '[%s, %s)' % (float(self.start), float(self.end))
-
-  def intersects(self, other):
-    return other.start < self.end or self.start < other.end
-
-  def union(self, other):
-    return IntervalWindow(
-        min(self.start, other.start), max(self.end, other.end))
-
-
-class WindowedValue(object):
-  """A windowed value having a value, a timestamp and set of windows.
-
-  Attributes:
-    value: The underlying value of a windowed value.
-    timestamp: Timestamp associated with the value as seconds since Unix epoch.
-    windows: A set (iterable) of window objects for the value. The window
-      object are descendants of the BoundedWindow class.
-  """
-
-  def __init__(self, value, timestamp, windows):
-    self.value = value
-    self.timestamp = Timestamp.of(timestamp)
-    self.windows = windows
-
-  def __repr__(self):
-    return '(%s, %s, %s)' % (
-        repr(self.value),
-        'MIN_TIMESTAMP' if self.timestamp == MIN_TIMESTAMP else
-        'MAX_TIMESTAMP' if self.timestamp == MAX_TIMESTAMP else
-        float(self.timestamp),
-        self.windows)
-
-  def __hash__(self):
-    return hash((self.value, self.timestamp, self.windows))
-
-  def __eq__(self, other):
-    return (type(self) == type(other)
-            and self.value == other.value
-            and self.timestamp == other.timestamp
-            and self.windows == other.windows)
-
-  def with_value(self, new_value):
-    return WindowedValue(new_value, self.timestamp, self.windows)
-
-
-class TimestampedValue(object):
-  """A timestamped value having a value and a timestamp.
-
-  Attributes:
-    value: The underlying value.
-    timestamp: Timestamp associated with the value as seconds since Unix epoch.
-  """
-
-  def __init__(self, value, timestamp):
-    self.value = value
-    self.timestamp = Timestamp.of(timestamp)
-
-
-class GlobalWindow(BoundedWindow):
-  """The default window into which all data is placed (via GlobalWindows)."""
-  _instance = None
-
-  def __new__(cls):
-    if cls._instance is None:
-      cls._instance = super(GlobalWindow, cls).__new__(cls)
-    return cls._instance
-
-  def __init__(self):
-    super(GlobalWindow, self).__init__(MAX_TIMESTAMP)
-    self.start = MIN_TIMESTAMP
-
-  def __repr__(self):
-    return 'GlobalWindow'
-
-  def __hash__(self):
-    return hash(type(self))
-
-  def __eq__(self, other):
-    # Global windows are always and only equal to each other.
-    return self is other or type(self) is type(other)
-
-
-class GlobalWindows(WindowFn):
-  """A windowing function that assigns everything to one global window."""
-
-  @classmethod
-  def windowed_value(cls, value, timestamp=MIN_TIMESTAMP):
-    return WindowedValue(value, timestamp, [GlobalWindow()])
-
-  def assign(self, assign_context):
-    return [GlobalWindow()]
-
-  def merge(self, merge_context):
-    pass  # No merging.
-
-  def get_window_coder(self):
-    return coders.SingletonCoder(GlobalWindow())
-
-  def __hash__(self):
-    return hash(type(self))
-
-  def __eq__(self, other):
-    # Global windowfn is always and only equal to each other.
-    return self is other or type(self) is type(other)
-
-  def __ne__(self, other):
-    return not self == other
-
-
-class FixedWindows(WindowFn):
-  """A windowing function that assigns each element to one time interval.
-
-  The attributes size and offset determine in what time interval a timestamp
-  will be slotted. The time intervals have the following formula:
-  [N * size + offset, (N + 1) * size + offset)
-
-  Attributes:
-    size: Size of the window as seconds.
-    offset: Offset of this window as seconds since Unix epoch. Windows start at
-      t=N * size + offset where t=0 is the epoch. The offset must be a value
-      in range [0, size). If it is not it will be normalized to this range.
-  """
-
-  def __init__(self, size, offset=0):
-    if size <= 0:
-      raise ValueError('The size parameter must be strictly positive.')
-    self.size = Duration.of(size)
-    self.offset = Timestamp.of(offset) % self.size
-
-  def assign(self, context):
-    timestamp = context.timestamp
-    start = timestamp - (timestamp - self.offset) % self.size
-    return [IntervalWindow(start, start + self.size)]
-
-  def merge(self, merge_context):
-    pass  # No merging.
-
-
-class SlidingWindows(WindowFn):
-  """A windowing function that assigns each element to a set of sliding windows.
-
-  The attributes size and offset determine in what time interval a timestamp
-  will be slotted. The time intervals have the following formula:
-  [N * period + offset, N * period + offset + size)
-
-  Attributes:
-    size: Size of the window as seconds.
-    period: Period of the windows as seconds.
-    offset: Offset of this window as seconds since Unix epoch. Windows start at
-      t=N * period + offset where t=0 is the epoch. The offset must be a value
-      in range [0, period). If it is not it will be normalized to this range.
-  """
-
-  def __init__(self, size, period, offset=0):
-    if size <= 0:
-      raise ValueError('The size parameter must be strictly positive.')
-    self.size = Duration.of(size)
-    self.period = Duration.of(period)
-    self.offset = Timestamp.of(offset) % size
-
-  def assign(self, context):
-    timestamp = context.timestamp
-    start = timestamp - (timestamp - self.offset) % self.period
-    return [IntervalWindow(Timestamp.of(s), Timestamp.of(s) + self.size)
-            for s in range(start, start - self.size, -self.period)]
-
-  def merge(self, merge_context):
-    pass  # No merging.
-
-
-class Sessions(WindowFn):
-  """A windowing function that groups elements into sessions.
-
-  A session is defined as a series of consecutive events
-  separated by a specified gap size.
-
-  Attributes:
-    gap_size: Size of the gap between windows as floating-point seconds.
-  """
-
-  def __init__(self, gap_size):
-    if gap_size <= 0:
-      raise ValueError('The size parameter must be strictly positive.')
-    self.gap_size = Duration.of(gap_size)
-
-  def assign(self, context):
-    timestamp = context.timestamp
-    return [IntervalWindow(timestamp, timestamp + self.gap_size)]
-
-  def merge(self, merge_context):
-    to_merge = []
-    for w in sorted(merge_context.windows, key=lambda w: w.start):
-      if to_merge:
-        if end > w.start:
-          to_merge.append(w)
-          if w.end > end:
-            end = w.end
-        else:
-          if len(to_merge) > 1:
-            merge_context.merge(to_merge,
-                                IntervalWindow(to_merge[0].start, end))
-          to_merge = [w]
-          end = w.end
-      else:
-        to_merge = [w]
-        end = w.end
-    if len(to_merge) > 1:
-      merge_context.merge(to_merge, IntervalWindow(to_merge[0].start, end))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/window_test.py b/sdks/python/google/cloud/dataflow/transforms/window_test.py
deleted file mode 100644
index 155239f..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/window_test.py
+++ /dev/null
@@ -1,201 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for the windowing classes."""
-
-import unittest
-
-from google.cloud.dataflow.pipeline import Pipeline
-from google.cloud.dataflow.transforms import CombinePerKey
-from google.cloud.dataflow.transforms import combiners
-from google.cloud.dataflow.transforms import core
-from google.cloud.dataflow.transforms import Create
-from google.cloud.dataflow.transforms import GroupByKey
-from google.cloud.dataflow.transforms import Map
-from google.cloud.dataflow.transforms import window
-from google.cloud.dataflow.transforms import WindowInto
-from google.cloud.dataflow.transforms.util import assert_that, equal_to
-from google.cloud.dataflow.transforms.window import FixedWindows
-from google.cloud.dataflow.transforms.window import IntervalWindow
-from google.cloud.dataflow.transforms.window import Sessions
-from google.cloud.dataflow.transforms.window import SlidingWindows
-from google.cloud.dataflow.transforms.window import TimestampedValue
-from google.cloud.dataflow.transforms.window import WindowedValue
-from google.cloud.dataflow.transforms.window import WindowFn
-
-
-def context(element, timestamp, windows):
-  return WindowFn.AssignContext(timestamp, element, windows)
-
-
-sort_values = Map(lambda (k, vs): (k, sorted(vs)))
-
-
-class ReifyWindowsFn(core.DoFn):
-  def process(self, context):
-    key, values = context.element
-    for window in context.windows:
-      yield "%s @ %s" % (key, window), values
-reify_windows = core.ParDo(ReifyWindowsFn())
-
-class WindowTest(unittest.TestCase):
-
-  def test_fixed_windows(self):
-    # Test windows with offset: 2, 7, 12, 17, ...
-    windowfn = window.FixedWindows(size=5, offset=2)
-    self.assertEqual([window.IntervalWindow(7, 12)],
-                     windowfn.assign(context('v', 7, [])))
-    self.assertEqual([window.IntervalWindow(7, 12)],
-                     windowfn.assign(context('v', 11, [])))
-    self.assertEqual([window.IntervalWindow(12, 17)],
-                     windowfn.assign(context('v', 12, [])))
-
-    # Test windows without offset: 0, 5, 10, 15, ...
-    windowfn = window.FixedWindows(size=5)
-    self.assertEqual([window.IntervalWindow(5, 10)],
-                     windowfn.assign(context('v', 5, [])))
-    self.assertEqual([window.IntervalWindow(5, 10)],
-                     windowfn.assign(context('v', 9, [])))
-    self.assertEqual([window.IntervalWindow(10, 15)],
-                     windowfn.assign(context('v', 10, [])))
-
-    # Test windows with offset out of range.
-    windowfn = window.FixedWindows(size=5, offset=12)
-    self.assertEqual([window.IntervalWindow(7, 12)],
-                     windowfn.assign(context('v', 11, [])))
-
-  def test_sliding_windows_assignment(self):
-    windowfn = SlidingWindows(size=15, period=5, offset=2)
-    expected = [IntervalWindow(7, 22),
-                IntervalWindow(2, 17),
-                IntervalWindow(-3, 12)]
-    self.assertEqual(expected, windowfn.assign(context('v', 7, [])))
-    self.assertEqual(expected, windowfn.assign(context('v', 8, [])))
-    self.assertEqual(expected, windowfn.assign(context('v', 11, [])))
-
-  def test_sessions_merging(self):
-    windowfn = Sessions(10)
-
-    def merge(*timestamps):
-      windows = [windowfn.assign(context(None, t, [])) for t in timestamps]
-      running = set()
-
-      class TestMergeContext(WindowFn.MergeContext):
-
-        def __init__(self):
-          super(TestMergeContext, self).__init__(running)
-
-        def merge(self, to_be_merged, merge_result):
-          for w in to_be_merged:
-            if w in running:
-              running.remove(w)
-          running.add(merge_result)
-
-      for ws in windows:
-        running.update(ws)
-        windowfn.merge(TestMergeContext())
-      windowfn.merge(TestMergeContext())
-      return sorted(running)
-
-    self.assertEqual([IntervalWindow(2, 12)], merge(2))
-    self.assertEqual([IntervalWindow(2, 12), IntervalWindow(19, 29)],
-                     merge(2, 19))
-
-    self.assertEqual([IntervalWindow(2, 19)], merge(2, 9))
-    self.assertEqual([IntervalWindow(2, 19)], merge(9, 2))
-
-    self.assertEqual([IntervalWindow(2, 19), IntervalWindow(19, 29)],
-                     merge(2, 9, 19))
-    self.assertEqual([IntervalWindow(2, 19), IntervalWindow(19, 29)],
-                     merge(19, 9, 2))
-
-    self.assertEqual([IntervalWindow(2, 25)], merge(2, 15, 10))
-
-  def timestamped_key_values(self, pipeline, key, *timestamps):
-    return (pipeline | Create('start', timestamps)
-            | Map(lambda x: WindowedValue((key, x), x, [])))
-
-  def test_sliding_windows(self):
-    p = Pipeline('DirectPipelineRunner')
-    pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3)
-    result = (pcoll
-              | WindowInto('w', SlidingWindows(period=2, size=4))
-              | GroupByKey()
-              | reify_windows)
-    expected = [('key @ [-2.0, 2.0)', [1]),
-                ('key @ [0.0, 4.0)', [1, 2, 3]),
-                ('key @ [2.0, 6.0)', [2, 3])]
-    assert_that(result, equal_to(expected))
-    p.run()
-
-  def test_sessions(self):
-    p = Pipeline('DirectPipelineRunner')
-    pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)
-    result = (pcoll
-              | WindowInto('w', Sessions(10))
-              | GroupByKey()
-              | sort_values
-              | reify_windows)
-    expected = [('key @ [1.0, 13.0)', [1, 2, 3]),
-                ('key @ [20.0, 45.0)', [20, 27, 35])]
-    assert_that(result, equal_to(expected))
-    p.run()
-
-  def test_timestamped_value(self):
-    p = Pipeline('DirectPipelineRunner')
-    result = (p
-              | Create('start', [(k, k) for k in range(10)])
-              | Map(lambda (x, t): TimestampedValue(x, t))
-              | WindowInto('w', FixedWindows(5))
-              | Map(lambda v: ('key', v))
-              | GroupByKey())
-    assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]),
-                                  ('key', [5, 6, 7, 8, 9])]))
-    p.run()
-
-  def test_timestamped_with_combiners(self):
-    p = Pipeline('DirectPipelineRunner')
-    result = (p
-              # Create some initial test values.
-              | Create('start', [(k, k) for k in range(10)])
-              # The purpose of the WindowInto transform is to establish a
-              # FixedWindows windowing function for the PCollection.
-              # It does not bucket elements into windows since the timestamps
-              # from Create are not spaced 5 ms apart and very likely they all
-              # fall into the same window.
-              | WindowInto('w', FixedWindows(5))
-              # Generate timestamped values using the values as timestamps.
-              # Now there are values 5 ms apart and since Map propagates the
-              # windowing function from input to output the output PCollection
-              # will have elements falling into different 5ms windows.
-              | Map(lambda (x, t): TimestampedValue(x, t))
-              # We add a 'key' to each value representing the index of the
-              # window. This is important since there is no guarantee of
-              # order for the elements of a PCollection.
-              | Map(lambda v: (v / 5, v)))
-    # Sum all elements associated with a key and window. Although it
-    # is called CombinePerKey it is really CombinePerKeyAndWindow the
-    # same way GroupByKey is really GroupByKeyAndWindow.
-    sum_per_window = result | CombinePerKey(sum)
-    # Compute mean per key and window.
-    mean_per_window = result | combiners.Mean.PerKey()
-    assert_that(sum_per_window, equal_to([(0, 10), (1, 35)]),
-                label='assert:sum')
-    assert_that(mean_per_window, equal_to([(0, 2.0), (1, 7.0)]),
-                label='assert:mean')
-    p.run()
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/write_ptransform_test.py b/sdks/python/google/cloud/dataflow/transforms/write_ptransform_test.py
deleted file mode 100644
index ef8e191..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/write_ptransform_test.py
+++ /dev/null
@@ -1,124 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Unit tests for the write transform."""
-
-import logging
-import unittest
-
-import google.cloud.dataflow as df
-
-from google.cloud.dataflow.io import iobase
-from google.cloud.dataflow.pipeline import Pipeline
-from google.cloud.dataflow.transforms.ptransform import PTransform
-from google.cloud.dataflow.transforms.util import assert_that, is_empty
-from google.cloud.dataflow.utils.options import PipelineOptions
-
-
-class _TestSink(iobase.Sink):
-  TEST_INIT_RESULT = 'test_init_result'
-
-  def __init__(self, return_init_result=True, return_write_results=True):
-    self.return_init_result = return_init_result
-    self.return_write_results = return_write_results
-
-  def initialize_write(self):
-    if self.return_init_result:
-      return _TestSink.TEST_INIT_RESULT
-
-  def finalize_write(self, init_result, writer_results):
-    self.init_result_at_finalize = init_result
-    self.write_results_at_finalize = writer_results
-
-  def open_writer(self, init_result, uid):
-    writer = _TestWriter(init_result, uid, self.return_write_results)
-    return writer
-
-
-class _TestWriter(iobase.Writer):
-  STATE_UNSTARTED, STATE_WRITTEN, STATE_CLOSED = 0, 1, 2
-  TEST_WRITE_RESULT = 'test_write_result'
-
-  def __init__(self, init_result, uid, return_write_results=True):
-    self.state = _TestWriter.STATE_UNSTARTED
-    self.init_result = init_result
-    self.uid = uid
-    self.write_output = []
-    self.return_write_results = return_write_results
-
-  def close(self):
-    assert self.state in (
-        _TestWriter.STATE_WRITTEN, _TestWriter.STATE_UNSTARTED)
-    self.state = _TestWriter.STATE_CLOSED
-    if self.return_write_results:
-      return _TestWriter.TEST_WRITE_RESULT
-
-  def write(self, value):
-    if self.write_output:
-      assert self.state == _TestWriter.STATE_WRITTEN
-    else:
-      assert self.state == _TestWriter.STATE_UNSTARTED
-
-    self.state = _TestWriter.STATE_WRITTEN
-    self.write_output.append(value)
-
-
-class WriteToTestSink(PTransform):
-
-  def __init__(self, return_init_result=True, return_write_results=True):
-    self.return_init_result = return_init_result
-    self.return_write_results = return_write_results
-    self.last_sink = None
-    self.label = 'write_to_test_sink'
-
-  def apply(self, pcoll):
-    self.last_sink = _TestSink(return_init_result=self.return_init_result,
-                               return_write_results=self.return_write_results)
-    return pcoll | df.io.Write(self.last_sink)
-
-
-class WriteTest(unittest.TestCase):
-  DATA = ['some data', 'more data', 'another data', 'yet another data']
-
-  def _run_write_test(self,
-                      data,
-                      return_init_result=True,
-                      return_write_results=True):
-    write_to_test_sink = WriteToTestSink(return_init_result,
-                                         return_write_results)
-    p = Pipeline(options=PipelineOptions([]))
-    result = p | df.Create('start', data) | write_to_test_sink
-
-    assert_that(result, is_empty())
-    p.run()
-
-    sink = write_to_test_sink.last_sink
-    self.assertIsNotNone(sink)
-
-  def test_write(self):
-    self._run_write_test(WriteTest.DATA)
-
-  def test_write_with_empty_pcollection(self):
-    data = []
-    self._run_write_test(data)
-
-  def test_write_no_init_result(self):
-    self._run_write_test(WriteTest.DATA, return_init_result=False)
-
-  def test_write_no_write_results(self):
-    self._run_write_test(WriteTest.DATA, return_write_results=False)
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/typehints/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/typehints/__init__.py b/sdks/python/google/cloud/dataflow/typehints/__init__.py
deleted file mode 100644
index 1585ad5..0000000
--- a/sdks/python/google/cloud/dataflow/typehints/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A package defining the syntax and decorator semantics for type-hints."""
-
-# pylint: disable=wildcard-import
-from google.cloud.dataflow.typehints.typehints import *
-from google.cloud.dataflow.typehints.decorators import *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/typehints/decorators.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/typehints/decorators.py b/sdks/python/google/cloud/dataflow/typehints/decorators.py
deleted file mode 100644
index 4e8182d..0000000
--- a/sdks/python/google/cloud/dataflow/typehints/decorators.py
+++ /dev/null
@@ -1,530 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Type hinting decorators allowing static or runtime type-checking for the SDK.
-
-This module defines decorators which utilize the type-hints defined in
-'type_hints.py' to allow annotation of the types of function arguments and
-return values.
-
-Type-hints for functions are annotated using two separate decorators. One is for
-type-hinting the types of function arguments, the other for type-hinting the
-function return value. Type-hints can either be specified in the form of
-positional arguments::
-
-  @with_input_types(int, int)
-  def add(a, b):
-    return a + b
-
-Keyword arguments::
-
-  @with_input_types(a=int, b=int)
-  def add(a, b):
-    return a + b
-
-Or even a mix of both::
-
-  @with_input_types(int, b=int)
-  def add(a, b):
-    return a + b
-
-Example usage for type-hinting arguments only::
-
-  @with_input_types(s=str)
-  def to_lower(a):
-    return a.lower()
-
-Example usage for type-hinting return values only::
-
-  @with_output_types(Tuple[int, bool])
-  def compress_point(ec_point):
-    return ec_point.x, ec_point.y < 0
-
-Example usage for type-hinting both arguments and return values::
-
-  @with_input_types(a=int)
-  @with_output_types(str)
-  def int_to_str(a):
-    return str(a)
-
-Type-hinting a function with arguments that unpack tuples are also supported. As
-an example, such a function would be defined as::
-
-  def foo((a, b)):
-    ...
-
-The valid type-hint for such as function looks like the following::
-
-  @with_input_types(a=int, b=int)
-  def foo((a, b)):
-    ...
-
-Notice that we hint the type of each unpacked argument independently, rather
-than hinting the type of the tuple as a whole (Tuple[int, int]).
-
-Optionally, type-hints can be type-checked at runtime. To toggle this behavior
-this module defines two functions: 'enable_run_time_type_checking' and
-'disable_run_time_type_checking'. NOTE: for this toggle behavior to work
-properly it must appear at the top of the module where all functions are
-defined, or before importing a module containing type-hinted functions.
-"""
-
-import inspect
-import types
-
-from google.cloud.dataflow.typehints import check_constraint
-from google.cloud.dataflow.typehints import CompositeTypeHintError
-from google.cloud.dataflow.typehints import SimpleTypeHintError
-from google.cloud.dataflow.typehints import typehints
-from google.cloud.dataflow.typehints import validate_composite_type_param
-
-# This is missing in the builtin types module.  str.upper is arbitrary, any
-# method on a C-implemented type will do.
-# pylint: disable=invalid-name
-_MethodDescriptorType = type(str.upper)
-# pylint: enable=invalid-name
-
-
-# Monkeypatch inspect.getargspec to allow passing non-function objects.
-# This is needed to use higher-level functions such as getcallargs.
-_original_getargspec = inspect.getargspec
-
-
-def getargspec(func):
-  try:
-    return _original_getargspec(func)
-  except TypeError:
-    if isinstance(func, (type, types.ClassType)):
-      argspec = getargspec(func.__init__)
-      del argspec.args[0]
-      return argspec
-    elif callable(func):
-      try:
-        return _original_getargspec(func.__call__)
-      except TypeError:
-        # Return an ArgSpec with at least one positional argument,
-        # and any number of other (positional or keyword) arguments
-        # whose name won't match any real agument.
-        # Arguments with the %unknown% prefix will be ignored in the type
-        # checking code.
-        return inspect.ArgSpec(
-            ['_'], '%unknown%varargs', '%unknown%keywords', ())
-    else:
-      raise
-
-inspect.getargspec = getargspec
-
-
-class IOTypeHints(object):
-  """Encapsulates all type hint information about a Dataflow construct.
-
-  This should primarily be used via the WithTypeHints mixin class, though
-  may also be attached to other objects (such as Python functions).
-  """
-  __slots__ = ('input_types', 'output_types')
-
-  def __init__(self, input_types=None, output_types=None):
-    self.input_types = input_types
-    self.output_types = output_types
-
-  def set_input_types(self, *args, **kwargs):
-    self.input_types = args, kwargs
-
-  def set_output_types(self, *args, **kwargs):
-    self.output_types = args, kwargs
-
-  def simple_output_type(self, context):
-    if self.output_types:
-      args, kwargs = self.output_types
-      if len(args) != 1 or kwargs:
-        raise TypeError('Expected simple output type hint for %s' % context)
-      return args[0]
-
-  def copy(self):
-    return IOTypeHints(self.input_types, self.output_types)
-
-  def with_defaults(self, hints):
-    if not hints:
-      return self
-    elif not self:
-      return hints
-    else:
-      return IOTypeHints(self.input_types or hints.input_types,
-                         self.output_types or hints.output_types)
-
-  def __nonzero__(self):
-    return bool(self.input_types or self.output_types)
-
-  def __repr__(self):
-    return 'IOTypeHints[inputs=%s, outputs=%s]' % (
-        self.input_types, self.output_types)
-
-
-class WithTypeHints(object):
-  """A mixin class that provides the ability to set and retrieve type hints.
-  """
-
-  def __init__(self, *unused_args, **unused_kwargs):
-    self._type_hints = IOTypeHints()
-
-  def _get_or_create_type_hints(self):
-    # __init__ may have not been called
-    try:
-      return self._type_hints
-    except AttributeError:
-      self._type_hints = IOTypeHints()
-      return self._type_hints
-
-  def get_type_hints(self):
-    return (self._get_or_create_type_hints()
-            .with_defaults(self.default_type_hints())
-            .with_defaults(get_type_hints(self.__class__)))
-
-  def default_type_hints(self):
-    return None
-
-  def with_input_types(self, *arg_hints, **kwarg_hints):
-    self._get_or_create_type_hints().set_input_types(*arg_hints, **kwarg_hints)
-    return self
-
-  def with_output_types(self, *arg_hints, **kwarg_hints):
-    self._get_or_create_type_hints().set_output_types(*arg_hints, **kwarg_hints)
-    return self
-
-
-class TypeCheckError(Exception):
-  pass
-
-
-def _positional_arg_hints(arg, hints):
-  """Returns the type of a (possibly tuple-packed) positional argument.
-
-  E.g. for lambda ((a, b), c): None the single positional argument is (as
-  returned by inspect) [[a, b], c] which should have type
-  Tuple[Tuple[Int, Any], float] when applied to the type hints
-  {a: int, b: Any, c: float}.
-  """
-  if isinstance(arg, list):
-    return typehints.Tuple[[_positional_arg_hints(a, hints) for a in arg]]
-  else:
-    return hints.get(arg, typehints.Any)
-
-
-def _unpack_positional_arg_hints(arg, hint):
-  """Unpacks the given hint according to the nested structure of arg.
-
-  For example, if arg is [[a, b], c] and hint is Tuple[Any, int], than
-  this function would return ((Any, Any), int) so it can be used in conjunction
-  with inspect.getcallargs.
-  """
-  if isinstance(arg, list):
-    tuple_constraint = typehints.Tuple[[typehints.Any] * len(arg)]
-    if not typehints.is_consistent_with(hint, tuple_constraint):
-      raise typehints.TypeCheckError(
-          'Bad tuple arguments for %s: expected %s, got %s' % (
-              arg, tuple_constraint, hint))
-    if isinstance(hint, typehints.TupleConstraint):
-      return tuple(_unpack_positional_arg_hints(a, t)
-                   for a, t in zip(arg, hint.tuple_types))
-    else:
-      return (typehints.Any,) * len(arg)
-  else:
-    return hint
-
-
-def getcallargs_forhints(func, *typeargs, **typekwargs):
-  """Like inspect.getcallargs, but understands that Tuple[] and an Any unpack.
-  """
-  argspec = inspect.getargspec(func)
-  # Turn Tuple[x, y] into (x, y) so getcallargs can do the proper unpacking.
-  packed_typeargs = [_unpack_positional_arg_hints(arg, hint)
-                     for (arg, hint) in zip(argspec.args, typeargs)]
-  packed_typeargs += list(typeargs[len(packed_typeargs):])
-  try:
-    callargs = inspect.getcallargs(func, *packed_typeargs, **typekwargs)
-  except TypeError, e:
-    raise TypeCheckError(e)
-  if argspec.defaults:
-    # Declare any default arguments to be Any.
-    for k, var in enumerate(reversed(argspec.args)):
-      if k >= len(argspec.defaults):
-        break
-      if callargs.get(var, None) is argspec.defaults[-k]:
-        callargs[var] = typehints.Any
-  # Patch up varargs and keywords
-  if argspec.varargs:
-    callargs[argspec.varargs] = typekwargs.get(
-        argspec.varargs, typehints.Tuple[typehints.Any, ...])
-  if argspec.keywords:
-    # TODO(robertwb): Consider taking the union of key and value types.
-    callargs[argspec.keywords] = typekwargs.get(
-        argspec.keywords, typehints.Dict[typehints.Any, typehints.Any])
-  return callargs
-
-
-def get_type_hints(fn):
-  """Gets the type hint associated with an arbitrary object fn.
-
-  Always returns a valid IOTypeHints object, creating one if necissary.
-  """
-  # pylint: disable=protected-access
-  if not hasattr(fn, '_type_hints'):
-    try:
-      fn._type_hints = IOTypeHints()
-    except (AttributeError, TypeError):
-      # Can't add arbitrary attributes to this object,
-      # but might have some restrictions anyways...
-      hints = IOTypeHints()
-      if isinstance(fn, _MethodDescriptorType):
-        hints.set_input_types(fn.__objclass__)
-      return hints
-  return fn._type_hints
-  # pylint: enable=protected-access
-
-
-def with_input_types(*positional_hints, **keyword_hints):
-  """A decorator that type-checks defined type-hints with passed func arguments.
-
-  All type-hinted arguments can be specified using positional arguments,
-  keyword arguments, or a mix of both. Additionaly, all function arguments must
-  be type-hinted in totality if even one parameter is type-hinted.
-
-  Once fully decorated, if the arguments passed to the resulting function
-  violate the type-hint constraints defined, a TypeCheckError detailing the
-  error will be raised.
-
-  To be used as::
-
-    * @with_input_types(s=str)  # just @with_input_types(str) will work too.
-      def upper(s):
-        return s.upper()
-
-  Or::
-
-    * @with_input_types(ls=List[Tuple[int, int])
-      def increment(ls):
-        [(i + 1, j + 1) for (i,j) in ls]
-
-  Args:
-    *positional_hints: Positional type-hints having identical order as the
-      function's formal arguments. Values for this argument must either be a
-      built-in Python type or an instance of a TypeContraint created by
-      'indexing' a CompositeTypeHint instance with a type parameter.
-    **keyword_hints: Keyword arguments mirroring the names of the parameters to
-      the decorated functions. The value of each keyword argument must either
-      be one of the allowed built-in Python types, a custom class, or an
-      instance of a TypeContraint created by 'indexing' a CompositeTypeHint
-      instance with a type parameter.
-
-  Raises:
-    ValueError: If not all function arguments have corresponding type-hints
-      specified. Or if the inner wrapper function isn't passed a function
-      object.
-    TypeCheckError: If the any of the passed type-hint constraints are not a
-      type or TypeContraint instance.
-
-  Returns:
-    The original function decorated such that it enforces type-hint constraints
-    for all received function arguments.
-  """
-
-  def annotate(f):
-    if isinstance(f, types.FunctionType):
-      for t in list(positional_hints) + list(keyword_hints.values()):
-        validate_composite_type_param(
-            t, error_msg_prefix='All type hint arguments')
-
-    get_type_hints(f).set_input_types(*positional_hints, **keyword_hints)
-    return f
-  return annotate
-
-
-def with_output_types(*return_type_hint, **kwargs):
-  """A decorator that type-checks defined type-hints for return values(s).
-
-  This decorator will type-check the return value(s) of the decorated function.
-
-  Only a single type-hint is accepted to specify the return type of the return
-  value. If the function to be decorated has multiple return values, then one
-  should use: 'Tuple[type_1, type_2]' to annotate the types of the return
-  values.
-
-  If the ultimate return value for the function violates the specified type-hint
-  a TypeCheckError will be raised detailing the type-constraint violation.
-
-  This decorator is intended to be used like::
-
-    * @with_output_types(Set[Coordinate])
-      def parse_ints(ints):
-        ....
-        return [Coordinate.from_int(i) for i in ints]
-
-  Or with a simple type-hint::
-
-    * @with_output_types(bool)
-      def negate(p):
-        return not p if p else p
-
-  Args:
-    *return_type_hint: A type-hint specifying the proper return type of the
-      function. This argument should either be a built-in Python type or an
-      instance of a 'TypeConstraint' created by 'indexing' a
-      'CompositeTypeHint'.
-    **kwargs: Not used.
-
-  Raises:
-    ValueError: If any kwarg parameters are passed in, or the length of
-      'return_type_hint' is greater than 1. Or if the inner wrapper function
-      isn't passed a function object.
-    TypeCheckError: If the 'return_type_hint' object is in invalid type-hint.
-
-  Returns:
-    The original function decorated such that it enforces type-hint constraints
-    for all return values.
-  """
-  if kwargs:
-    raise ValueError("All arguments for the 'returns' decorator must be "
-                     "positional arguments.")
-
-  if len(return_type_hint) != 1:
-    raise ValueError("'returns' accepts only a single positional argument. In "
-                     "order to specify multiple return types, use the 'Tuple' "
-                     "type-hint.")
-
-  return_type_hint = return_type_hint[0]
-
-  validate_composite_type_param(
-      return_type_hint,
-      error_msg_prefix='All type hint arguments'
-  )
-
-  def annotate(f):
-    get_type_hints(f).set_output_types(return_type_hint)
-    return f
-  return annotate
-
-
-def _check_instance_type(
-    type_constraint, instance, var_name=None, verbose=False):
-  """A helper function to report type-hint constraint violations.
-
-  Args:
-    type_constraint: An instance of a 'TypeConstraint' or a built-in Python
-      type.
-    instance: The candidate object which will be checked by to satisfy
-      'type_constraint'.
-    var_name: If 'instance' is an argument, then the actual name for the
-      parameter in the original function definition.
-
-  Raises:
-    TypeCheckError: If 'instance' fails to meet the type-constraint of
-      'type_constraint'.
-  """
-  hint_type = (
-      "argument: '%s'" % var_name if var_name is not None else 'return type')
-
-  try:
-    check_constraint(type_constraint, instance)
-  except SimpleTypeHintError:
-    if verbose:
-      verbose_instance = '%s, ' % instance
-    else:
-      verbose_instance = ''
-    raise TypeCheckError('Type-hint for %s violated. Expected an '
-                         'instance of %s, instead found %san instance of %s.'
-                         % (hint_type, type_constraint,
-                            verbose_instance, type(instance)))
-  except CompositeTypeHintError as e:
-    raise TypeCheckError('Type-hint for %s violated: %s' % (hint_type, e))
-
-
-def _interleave_type_check(type_constraint, var_name=None):
-  """Lazily type-check the type-hint for a lazily generated sequence type.
-
-  This function can be applied as a decorator or called manually in a curried
-  manner:
-    * @_interleave_type_check(List[int])
-      def gen():
-        yield 5
-
-    or
-
-     * gen = _interleave_type_check(Tuple[int, int], 'coord_gen')(gen)
-
-  As a result, all type-checking for the passed generator will occur at 'yield'
-  time. This way, we avoid having to depleat the generator in order to
-  type-check it.
-
-  Args:
-    type_constraint: An instance of a TypeConstraint. The output yielded of
-      'gen' will be type-checked according to this type constraint.
-    var_name: The variable name binded to 'gen' if type-checking a function
-      argument. Used solely for templating in error message generation.
-
-  Returns:
-    A function which takes a generator as an argument and returns a wrapped
-    version of the generator that interleaves type-checking at 'yield'
-    iteration. If the generator received is already wrapped, then it is simply
-    returned to avoid nested wrapping.
-  """
-  def wrapper(gen):
-    if isinstance(gen, GeneratorWrapper):
-      return gen
-    else:
-      return GeneratorWrapper(
-          gen,
-          lambda x: _check_instance_type(type_constraint, x, var_name)
-      )
-  return wrapper
-
-
-class GeneratorWrapper(object):
-  """A wrapper around a generator, allows execution of a callback per yield.
-
-  Additionally, wrapping a generator with this class allows one to assign
-  arbitary attributes to a generator object just as with a function object.
-
-  Attributes:
-    internal_gen: A instance of a generator object. As part of 'step' of the
-      generator, the yielded object will be passed to 'interleave_func'.
-    interleave_func: A callback accepting a single argument. This function will
-      be called with the result of each yielded 'step' in the internal
-      generator.
-  """
-
-  def __init__(self, gen, interleave_func):
-    self.internal_gen = gen
-    self.interleave_func = interleave_func
-
-  def __getattr__(self, attr):
-    # TODO(laolu): May also want to intercept 'send' in the future if we move to
-    # a GeneratorHint with 3 type-params:
-    #   * Generator[send_type, return_type, yield_type]
-    if attr == '__next__':
-      return self.__next__()
-    elif attr == '__iter__':
-      return self.__iter__()
-    else:
-      return getattr(self.internal_gen, attr)
-
-  def next(self):
-    next_val = next(self.internal_gen)
-    self.interleave_func(next_val)
-    return next_val
-
-  def __iter__(self):
-    while True:
-      x = next(self.internal_gen)
-      self.interleave_func(x)
-      yield x

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/typehints/opcodes.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/typehints/opcodes.py b/sdks/python/google/cloud/dataflow/typehints/opcodes.py
deleted file mode 100644
index 9b5fd52..0000000
--- a/sdks/python/google/cloud/dataflow/typehints/opcodes.py
+++ /dev/null
@@ -1,331 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Defines the actions various bytecodes have on the frame.
-
-Each function here corresponds to a bytecode documented in
-https://docs.python.org/2/library/dis.html.  The first argument is a (mutable)
-FrameState object, the second the integer opcode argument.
-
-Bytecodes with more complicated behavior (e.g. modifying the program counter)
-are handled inline rather than here.
-"""
-import types
-
-from trivial_inference import union, element_type, Const, BoundMethod
-import typehints
-from typehints import Any, Dict, Iterable, List, Tuple, Union
-
-
-def pop_one(state, unused_arg):
-  del state.stack[-1:]
-
-
-def pop_two(state, unused_arg):
-  del state.stack[-2:]
-
-
-def pop_three(state, unused_arg):
-  del state.stack[-3:]
-
-
-def push_value(v):
-
-  def pusher(state, unused_arg):
-    state.stack.append(v)
-
-  return pusher
-
-
-def nop(unused_state, unused_arg):
-  pass
-
-
-def pop_top(state, unused_arg):
-  state.stack.pop()
-
-
-def rot_n(state, n):
-  state.stack[-n:] = [state.stack[-1]] + state.stack[-n:-1]
-
-
-def rot_two(state, unused_arg):
-  rot_n(state, 2)
-
-
-def rot_three(state, unused_arg):
-  rot_n(state, 3)
-
-
-def rot_four(state, unused_arg):
-  rot_n(state, 4)
-
-
-def dup_top(state, unused_arg):
-  state.stack.append(state.stack[-1])
-
-
-def unary(state, unused_arg):
-  state.stack[-1] = Const.unwrap(state.stack[-1])
-
-
-unary_positive = unary_negative = unary_invert = unary
-
-
-def unary_not(state, unused_arg):
-  state.stack[-1] = bool
-
-
-def unary_convert(state, unused_arg):
-  state.stack[-1] = str
-
-
-def get_iter(state, unused_arg):
-  state.stack.append(Iterable[element_type(state.stack.pop())])
-
-
-def symmetric_binary_op(state, unused_arg):
-  # TODO(robertwb): This may not be entirely correct...
-  b, a = state.stack.pop(), state.stack.pop()
-  if a == b:
-    state.stack.append(a)
-  elif type(a) == type(b) and isinstance(a, typehints.SequenceTypeConstraint):
-    state.stack.append(type(a)(union(element_type(a), element_type(b))))
-  else:
-    state.stack.append(Any)
-# Except for int ** -int
-binary_power = inplace_power = symmetric_binary_op
-binary_multiply = inplace_multiply = symmetric_binary_op
-binary_divide = inplace_divide = symmetric_binary_op
-binary_floor_divide = inplace_floor_divide = symmetric_binary_op
-
-
-def binary_true_divide(state, unused_arg):
-  u = union(state.stack.pop(), state.stack.pop)
-  if u == int:
-    state.stack.append(float)
-  else:
-    state.stack.append(u)
-
-
-inplace_true_divide = binary_true_divide
-
-binary_modulo = inplace_modulo = symmetric_binary_op
-# TODO(robertwb): Tuple add.
-binary_add = inplace_add = symmetric_binary_op
-binary_subtract = inplace_subtract = symmetric_binary_op
-
-
-def binary_subscr(state, unused_arg):
-  tos = state.stack.pop()
-  if tos in (str, unicode):
-    out = tos
-  else:
-    out = element_type(tos)
-  state.stack.append(out)
-
-# As far as types are concerned.
-binary_lshift = inplace_lshift = binary_rshift = inplace_rshift = pop_top
-
-binary_and = inplace_and = symmetric_binary_op
-binary_xor = inplace_xor = symmetric_binary_op
-binary_or = inpalce_or = symmetric_binary_op
-
-# As far as types are concerned.
-slice_0 = nop
-slice_1 = slice_2 = pop_top
-slice_3 = pop_two
-store_slice_0 = store_slice_1 = store_slice_2 = store_slice_3 = nop
-delete_slice_0 = delete_slice_1 = delete_slice_2 = delete_slice_3 = nop
-
-
-def store_subscr(unused_state, unused_args):
-  # TODO(robertwb): Update element/value type of iterable/dict.
-  pass
-
-
-binary_divide = binary_floor_divide = binary_modulo = symmetric_binary_op
-binary_divide = binary_floor_divide = binary_modulo = symmetric_binary_op
-binary_divide = binary_floor_divide = binary_modulo = symmetric_binary_op
-
-# print_expr
-print_item = pop_top
-# print_item_to
-print_newline = nop
-
-# print_newline_to
-
-
-# break_loop
-# continue_loop
-def list_append(state, arg):
-  state.stack[-arg] = List[Union[element_type(state.stack[-arg]),
-                                 Const.unwrap(state.stack.pop())]]
-
-
-load_locals = push_value(Dict[str, Any])
-
-# return_value
-# yield_value
-# import_star
-exec_stmt = pop_three
-# pop_block
-# end_finally
-build_class = pop_three
-
-# setup_with
-# with_cleanup
-
-
-# store_name
-# delete_name
-def unpack_sequence(state, arg):
-  t = state.stack.pop()
-  if isinstance(t, Const):
-    try:
-      unpacked = [Const(ti) for ti in t.value]
-      if len(unpacked) != arg:
-        unpacked = [Any] * arg
-    except TypeError:
-      unpacked = [Any] * arg
-  elif (isinstance(t, typehints.TupleHint.TupleConstraint)
-        and len(t.tuple_types) == arg):
-    unpacked = list(t.tuple_types)
-  else:
-    unpacked = [element_type(t)] * arg
-  state.stack += reversed(unpacked)
-
-
-def dup_topx(state, arg):
-  state.stack += state[-arg:]
-
-
-store_attr = pop_top
-delete_attr = nop
-store_global = pop_top
-delete_global = nop
-
-
-def load_const(state, arg):
-  state.stack.append(state.const_type(arg))
-
-
-load_name = push_value(Any)
-
-
-def build_tuple(state, arg):
-  if arg == 0:
-    state.stack.append(Tuple[()])
-  else:
-    state.stack[-arg:] = [Tuple[[Const.unwrap(t) for t in state.stack[-arg:]]]]
-
-
-def build_list(state, arg):
-  if arg == 0:
-    state.stack.append(List[Union[()]])
-  else:
-    state.stack[-arg:] = [List[reduce(union, state.stack[-arg:], Union[()])]]
-
-
-build_map = push_value(Dict[Any, Any])
-
-
-def load_attr(state, arg):
-  o = state.stack.pop()
-  name = state.get_name(arg)
-  if isinstance(o, Const) and hasattr(o.value, name):
-    state.stack.append(Const(getattr(o.value, name)))
-  elif (isinstance(o, (type, types.ClassType))
-        and isinstance(getattr(o, name, None), types.MethodType)):
-    state.stack.append(Const(BoundMethod(getattr(o, name))))
-  else:
-    state.stack.append(Any)
-
-
-def compare_op(state, unused_arg):
-  # Could really be anything...
-  state.stack[-2:] = [bool]
-
-
-def import_name(state, unused_arg):
-  state.stack[-2:] = [Any]
-
-
-import_from = push_value(Any)
-
-# jump
-
-# for_iter
-
-
-def load_global(state, arg):
-  state.stack.append(state.get_global(arg))
-
-# setup_loop
-# setup_except
-# setup_finally
-
-store_map = pop_two
-
-
-def load_fast(state, arg):
-  state.stack.append(state.vars[arg])
-
-
-def store_fast(state, arg):
-  state.vars[arg] = state.stack.pop()
-
-
-def delete_fast(state, arg):
-  state.vars[arg] = Any  # really an error
-
-
-def load_closure(state, unused_arg):
-  state.stack.append(Any)  # really a Cell
-
-
-def load_deref(state, arg):
-  state.stack.append(state.closure_type(arg))
-# raise_varargs
-
-
-def call_function(state, arg, has_var=False, has_kw=False):
-  # TODO(robertwb): Recognize builtins and dataflow objects
-  # (especially special return values).
-  pop_count = (arg & 0xF) + (arg & 0xF0) / 8 + 1 + has_var + has_kw
-  state.stack[-pop_count:] = [Any]
-
-
-def make_function(state, arg):
-  state.stack[-arg - 1:] = [Any]  # a callable
-
-
-def make_closure(state, arg):
-  state.stack[-arg - 2:] = [Any]  # a callable
-
-
-def build_slice(state, arg):
-  state.stack[-arg:] = [Any]  # a slice object
-
-
-def call_function_var(state, arg):
-  call_function(state, arg, has_var=True)
-
-
-def call_function_kw(state, arg):
-  call_function(state, arg, has_kw=True)
-
-
-def call_function_var_wk(state, arg):
-  call_function(state, arg, has_var=True, has_kw=True)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/typehints/trivial_inference.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/typehints/trivial_inference.py b/sdks/python/google/cloud/dataflow/typehints/trivial_inference.py
deleted file mode 100644
index dd117d3..0000000
--- a/sdks/python/google/cloud/dataflow/typehints/trivial_inference.py
+++ /dev/null
@@ -1,415 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Trivial type inference for simple functions.
-"""
-import __builtin__
-import collections
-import dis
-import pprint
-import sys
-import types
-
-from google.cloud.dataflow.typehints import Any
-from google.cloud.dataflow.typehints import typehints
-
-
-class TypeInferenceError(ValueError):
-  """Error to raise when type inference failed."""
-  pass
-
-
-def instance_to_type(o):
-  """Given a Python object o, return the corresponding type hint.
-  """
-  t = type(o)
-  if o is None:
-    # TODO(robertwb): Eliminate inconsistent use of None vs. NoneType.
-    return None
-  elif t not in typehints.DISALLOWED_PRIMITIVE_TYPES:
-    if t == types.InstanceType:
-      return o.__class__
-    elif t == BoundMethod:
-      return types.MethodType
-    else:
-      return t
-  elif t == tuple:
-    return typehints.Tuple[[instance_to_type(item) for item in o]]
-  elif t == list:
-    return typehints.List[
-        typehints.Union[[instance_to_type(item) for item in o]]
-    ]
-  elif t == set:
-    return typehints.Set[
-        typehints.Union[[instance_to_type(item) for item in o]]
-    ]
-  elif t == dict:
-    return typehints.Dict[
-        typehints.Union[[instance_to_type(k) for k, v in o.items()]],
-        typehints.Union[[instance_to_type(v) for k, v in o.items()]],
-    ]
-  else:
-    raise TypeInferenceError('Unknown forbidden type: %s' % t)
-
-
-def union_list(xs, ys):
-  assert len(xs) == len(ys)
-  return [union(x, y) for x, y in zip(xs, ys)]
-
-
-class Const(object):
-
-  def __init__(self, value):
-    self.value = value
-    self.type = instance_to_type(value)
-
-  def __eq__(self, other):
-    return isinstance(other, Const) and self.value == other.value
-
-  def __hash__(self):
-    return hash(self.value)
-
-  def __repr__(self):
-    return 'Const[%s]' % str(self.value)[:100]
-
-  @staticmethod
-  def unwrap(x):
-    if isinstance(x, Const):
-      return x.type
-    else:
-      return x
-
-  @staticmethod
-  def unwrap_all(xs):
-    return [Const.unwrap(x) for x in xs]
-
-
-class FrameState(object):
-  """Stores the state of the frame at a particular point of execution.
-  """
-
-  def __init__(self, f, local_vars=None, stack=()):
-    self.f = f
-    self.co = f.func_code
-    self.vars = list(local_vars)
-    self.stack = list(stack)
-
-  def __eq__(self, other):
-    return self.__dict__ == other.__dict__
-
-  def copy(self):
-    return FrameState(self.f, self.vars, self.stack)
-
-  def const_type(self, i):
-    return Const(self.co.co_consts[i])
-
-  def closure_type(self, i):
-    ncellvars = len(self.co.co_cellvars)
-    if i < ncellvars:
-      return Any
-    else:
-      return Const(self.f.func_closure[i - ncellvars].cell_contents)
-
-  def get_global(self, i):
-    name = self.get_name(i)
-    if name in self.f.func_globals:
-      return Const(self.f.func_globals[name])
-    if name in __builtin__.__dict__:
-      return Const(__builtin__.__dict__[name])
-    else:
-      return Any
-
-  def get_name(self, i):
-    return self.co.co_names[i]
-
-  def __repr__(self):
-    return 'Stack: %s Vars: %s' % (self.stack, self.vars)
-
-  def __or__(self, other):
-    if self is None:
-      return other.copy()
-    elif other is None:
-      return self.copy()
-    else:
-      return FrameState(self.f, union_list(self.vars, other.vars), union_list(
-          self.stack, other.stack))
-
-  def __ror__(self, left):
-    return self | left
-
-
-def union(a, b):
-  """Returns the union of two types or Const values.
-  """
-  if a == b:
-    return a
-  elif not a:
-    return b
-  elif not b:
-    return a
-  a = Const.unwrap(a)
-  b = Const.unwrap(b)
-  # TODO(robertwb): Work this into the Union code in a more generic way.
-  if type(a) == type(b) and element_type(a) == typehints.Union[()]:
-    return b
-  elif type(a) == type(b) and element_type(b) == typehints.Union[()]:
-    return a
-  else:
-    return typehints.Union[a, b]
-
-
-def element_type(hint):
-  """Returns the element type of a composite type.
-  """
-  hint = Const.unwrap(hint)
-  if isinstance(hint, typehints.SequenceTypeConstraint):
-    return hint.inner_type
-  elif isinstance(hint, typehints.TupleHint.TupleConstraint):
-    return typehints.Union[hint.tuple_types]
-  else:
-    return Any
-
-
-def key_value_types(kv_type):
-  """Returns the key and value type of a KV type.
-  """
-  # TODO(robertwb): Unions of tuples, etc.
-  # TODO(robertwb): Assert?
-  if (isinstance(kv_type, typehints.TupleHint.TupleConstraint)
-      and len(kv_type.tuple_types) == 2):
-    return kv_type.tuple_types
-  return Any, Any
-
-
-known_return_types = {len: int, hash: int,}
-
-
-class BoundMethod(object):
-  """Used to create a bound method when we only know the type of the instance.
-  """
-
-  def __init__(self, unbound):
-    self.unbound = unbound
-
-
-def hashable(c):
-  try:
-    hash(c)
-    return True
-  except TypeError:
-    return False
-
-
-def infer_return_type(c, input_types, debug=False, depth=5):
-  """Analyses a callable to deduce its return type.
-
-  Args:
-    f: A Python function object to infer the return type of.
-    input_types: A sequence of inputs corresponding to the input types.
-    debug: Whether to print verbose debugging information.
-
-  Returns:
-    A TypeConstraint that that the return value of this function will (likely)
-    satisfy given the specified inputs.
-  """
-  try:
-    if hashable(c) and c in known_return_types:
-      return known_return_types[c]
-    elif isinstance(c, types.FunctionType):
-      return infer_return_type_func(c, input_types, debug, depth)
-    elif isinstance(c, types.MethodType):
-      if c.im_self is not None:
-        input_types = [Const(c.im_self)] + input_types
-      return infer_return_type_func(c.im_func, input_types, debug, depth)
-    elif isinstance(c, BoundMethod):
-      input_types = [c.unbound.im_class] + input_types
-      return infer_return_type_func(c.unbound.im_func, input_types, debug, depth)
-    elif isinstance(c, (type, types.ClassType)):
-      if c in typehints.DISALLOWED_PRIMITIVE_TYPES:
-        return {
-            list: typehints.List[Any],
-            set: typehints.Set[Any],
-            tuple: typehints.Tuple[Any, ...],
-            dict: typehints.Dict[Any, Any]
-        }[c]
-      else:
-        return c
-    else:
-      return Any
-  except TypeInferenceError:
-    return Any
-  except Exception:
-    if debug:
-      sys.stdout.flush()
-      raise
-    else:
-      return Any
-
-
-def infer_return_type_func(f, input_types, debug=False, depth=0):
-  """Analyses a function to deduce its return type.
-
-  Args:
-    f: A Python function object to infer the return type of.
-    input_types: A sequence of inputs corresponding to the input types.
-    debug: Whether to print verbose debugging information.
-
-  Returns:
-    A TypeConstraint that that the return value of this function will (likely)
-    satisfy given the specified inputs.
-
-  Raises:
-    TypeInferenceError: if no type can be inferred.
-  """
-  if debug:
-    print
-    print f, id(f), input_types
-  import opcodes
-  simple_ops = dict((k.upper(), v) for k, v in opcodes.__dict__.items())
-
-  co = f.func_code
-  code = co.co_code
-  end = len(code)
-  pc = 0
-  extended_arg = 0
-  free = None
-
-  yields = set()
-  returns = set()
-  # TODO(robertwb): Default args via inspect module.
-  local_vars = list(input_types) + [typehints.Union[()]] * (len(co.co_varnames)
-                                                            - len(input_types))
-  state = FrameState(f, local_vars)
-  states = collections.defaultdict(lambda: None)
-  jumps = collections.defaultdict(int)
-
-  last_pc = -1
-  while pc < end:
-    start = pc
-    op = ord(code[pc])
-
-    if debug:
-      print '-->' if pc == last_pc else '    ',
-      print repr(pc).rjust(4),
-      print dis.opname[op].ljust(20),
-    pc += 1
-    if op >= dis.HAVE_ARGUMENT:
-      arg = ord(code[pc]) + ord(code[pc + 1]) * 256 + extended_arg
-      extended_arg = 0
-      pc += 2
-      if op == dis.EXTENDED_ARG:
-        extended_arg = arg * 65536L
-      if debug:
-        print str(arg).rjust(5),
-        if op in dis.hasconst:
-          print '(' + repr(co.co_consts[arg]) + ')',
-        elif op in dis.hasname:
-          print '(' + co.co_names[arg] + ')',
-        elif op in dis.hasjrel:
-          print '(to ' + repr(pc + arg) + ')',
-        elif op in dis.haslocal:
-          print '(' + co.co_varnames[arg] + ')',
-        elif op in dis.hascompare:
-          print '(' + dis.cmp_op[arg] + ')',
-        elif op in dis.hasfree:
-          if free is None:
-            free = co.co_cellvars + co.co_freevars
-          print '(' + free[arg] + ')',
-
-    # Acutally emulate the op.
-    if state is None and states[start] is None:
-      # No control reaches here (yet).
-      if debug:
-        print
-      continue
-    state |= states[start]
-
-    opname = dis.opname[op]
-    jmp = jmp_state = None
-    if opname.startswith('CALL_FUNCTION'):
-      standard_args = (arg & 0xF) + (arg & 0xF0) / 8
-      var_args = 'VAR' in opname
-      kw_args = 'KW' in opname
-      pop_count = standard_args + var_args + kw_args + 1
-      if depth <= 0:
-        return_type = Any
-      elif arg & 0xF0:
-        # TODO(robertwb): Handle this case.
-        return_type = Any
-      elif isinstance(state.stack[-pop_count], Const):
-        # TODO(robertwb): Handle this better.
-        if var_args or kw_args:
-          state.stack[-1] = Any
-          state.stack[-var_args - kw_args] = Any
-        inputs = [] if pop_count == 1 else state.stack[1 - pop_count:]
-        return_type = infer_return_type(state.stack[-pop_count].value,
-                                        state.stack[1 - pop_count:],
-                                        debug=debug,
-                                        depth=depth - 1)
-      else:
-        return_type = Any
-      state.stack[-pop_count:] = [return_type]
-    elif opname in simple_ops:
-      simple_ops[opname](state, arg)
-    elif opname == 'RETURN_VALUE':
-      returns.add(state.stack[-1])
-      state = None
-    elif opname == 'YIELD_VALUE':
-      yields.add(state.stack[-1])
-    elif opname == 'JUMP_FORWARD':
-      jmp = pc + arg
-      jmp_state = state
-      state = None
-    elif opname == 'JUMP_ABSOLUTE':
-      jmp = arg
-      jmp_state = state
-      state = None
-    elif opname in ('POP_JUMP_IF_TRUE', 'POP_JUMP_IF_FALSE'):
-      state.stack.pop()
-      jmp = arg
-      jmp_state = state.copy()
-    elif opname in ('JUMP_IF_TRUE_OR_POP', 'JUMP_IF_FALSE_OR_POP'):
-      jmp = arg
-      jmp_state = state.copy()
-      state.stack.pop()
-    elif opname == 'FOR_ITER':
-      jmp = pc + arg
-      jmp_state = state.copy()
-      jmp_state.stack.pop()
-      state.stack.append(element_type(state.stack[-1]))
-    else:
-      raise TypeInferenceError('unable to handle %s' % opname)
-
-    if jmp is not None:
-      # TODO(robertwb): Is this guerenteed to converge?
-      new_state = states[jmp] | jmp_state
-      if jmp < pc and new_state != states[jmp] and jumps[pc] < 5:
-        jumps[pc] += 1
-        pc = jmp
-      states[jmp] = new_state
-
-    if debug:
-      print
-      print state
-      pprint.pprint(dict(item for item in states.items() if item[1]))
-
-  if yields:
-    result = typehints.Iterable[reduce(union, Const.unwrap_all(yields))]
-  else:
-    result = reduce(union, Const.unwrap_all(returns))
-
-  if debug:
-    print f, id(f), input_types, '->', result
-  return result
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/typehints/trivial_inference_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/typehints/trivial_inference_test.py b/sdks/python/google/cloud/dataflow/typehints/trivial_inference_test.py
deleted file mode 100644
index 5d945ba..0000000
--- a/sdks/python/google/cloud/dataflow/typehints/trivial_inference_test.py
+++ /dev/null
@@ -1,148 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Tests for google.cloud.dataflow.typehints.trivial_inference."""
-import unittest
-
-
-from google.cloud.dataflow.typehints import trivial_inference
-from google.cloud.dataflow.typehints import typehints
-
-global_int = 1
-
-
-class TrivialInferenceTest(unittest.TestCase):
-
-  def assertReturnType(self, expected, f, inputs=()):
-    self.assertEquals(expected, trivial_inference.infer_return_type(f, inputs))
-
-  def testIdentity(self):
-    self.assertReturnType(int, lambda x: x, [int])
-
-  def testTuples(self):
-    self.assertReturnType(
-        typehints.Tuple[typehints.Tuple[()], int], lambda x: ((), x), [int])
-    self.assertReturnType(
-        typehints.Tuple[str, int, float], lambda x: (x, 0, 1.0), [str])
-
-  def testUnpack(self):
-    def reverse((a, b)):
-      return b, a
-    any_tuple = typehints.Tuple[typehints.Any, typehints.Any]
-    self.assertReturnType(
-        typehints.Tuple[int, float], reverse, [typehints.Tuple[float, int]])
-    self.assertReturnType(
-        typehints.Tuple[int, int], reverse, [typehints.Tuple[int, ...]])
-    self.assertReturnType(
-        typehints.Tuple[int, int], reverse, [typehints.List[int]])
-    self.assertReturnType(
-        typehints.Tuple[typehints.Union[int, float, str],
-                        typehints.Union[int, float, str]],
-        reverse, [typehints.Tuple[int, float, str]])
-    self.assertReturnType(any_tuple, reverse, [typehints.Any])
-
-    self.assertReturnType(typehints.Tuple[int, float],
-                          reverse, [trivial_inference.Const((1.0, 1))])
-    self.assertReturnType(any_tuple,
-                          reverse, [trivial_inference.Const((1, 2, 3))])
-
-  def testListComprehension(self):
-    self.assertReturnType(
-        typehints.List[int],
-        lambda xs: [x for x in xs],
-        [typehints.Tuple[int, ...]])
-
-  def testTupleListComprehension(self):
-    self.assertReturnType(
-        typehints.List[int],
-        lambda xs: [x for x in xs],
-        [typehints.Tuple[int, int, int]])
-    self.assertReturnType(
-        typehints.List[typehints.Union[int, float]],
-        lambda xs: [x for x in xs],
-        [typehints.Tuple[int, float]])
-
-  def testGenerator(self):
-
-    def foo(x, y):
-      yield x
-      yield y
-
-    self.assertReturnType(typehints.Iterable[int], foo, [int, int])
-    self.assertReturnType(
-        typehints.Iterable[typehints.Union[int, float]], foo, [int, float])
-
-  def testBinOp(self):
-    self.assertReturnType(int, lambda a, b: a + b, [int, int])
-    self.assertReturnType(
-        typehints.Any, lambda a, b: a + b, [int, typehints.Any])
-    self.assertReturnType(
-        typehints.List[typehints.Union[int, str]], lambda a, b: a + b,
-        [typehints.List[int], typehints.List[str]])
-
-  def testCall(self):
-    f = lambda x, *args: x
-    self.assertReturnType(
-        typehints.Tuple[int, float], lambda: (f(1), f(2.0, 3)))
-
-  def testClosure(self):
-    x = 1
-    y = 1.0
-    self.assertReturnType(typehints.Tuple[int, float], lambda: (x, y))
-
-  def testGlobals(self):
-    self.assertReturnType(int, lambda: global_int)
-
-  def testBuiltins(self):
-    self.assertReturnType(int, lambda x: len(x), [typehints.Any])
-
-  def testGetAttr(self):
-    self.assertReturnType(
-        typehints.Tuple[str, typehints.Any],
-        lambda: (typehints.__doc__, typehints.fake))
-
-  def testMethod(self):
-
-    class A(object):
-
-      def m(self, x):
-        return x
-
-    self.assertReturnType(int, lambda: A().m(3))
-    self.assertReturnType(float, lambda: A.m(A(), 3.0))
-
-  def testAlwaysReturnsEarly(self):
-
-    def some_fn(v):
-      if v:
-        return 1
-      else:
-        return 2
-
-    self.assertReturnType(int, some_fn)
-
-  def testDict(self):
-    self.assertReturnType(
-        typehints.Dict[typehints.Any, typehints.Any], lambda: {})
-
-  def testDictComprehension(self):
-    # Just ensure it doesn't crash.
-    fields = []
-    self.assertReturnType(
-        typehints.Any,
-        lambda row: {f: row[f] for f in fields}, [typehints.Any])
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/typehints/typecheck.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/typehints/typecheck.py b/sdks/python/google/cloud/dataflow/typehints/typecheck.py
deleted file mode 100644
index 7dad46e..0000000
--- a/sdks/python/google/cloud/dataflow/typehints/typecheck.py
+++ /dev/null
@@ -1,161 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Runtime type checking support."""
-
-import collections
-import inspect
-import sys
-import types
-
-from google.cloud.dataflow.pvalue import SideOutputValue
-from google.cloud.dataflow.transforms.core import DoFn
-from google.cloud.dataflow.transforms.window import WindowedValue
-from google.cloud.dataflow.typehints import check_constraint
-from google.cloud.dataflow.typehints import CompositeTypeHintError
-from google.cloud.dataflow.typehints import GeneratorWrapper
-from google.cloud.dataflow.typehints import SimpleTypeHintError
-from google.cloud.dataflow.typehints import TypeCheckError
-from google.cloud.dataflow.typehints.decorators import _check_instance_type
-from google.cloud.dataflow.typehints.decorators import getcallargs_forhints
-
-
-class TypeCheckWrapperDoFn(DoFn):
-  """A wrapper around a DoFn which performs type-checking of input and output.
-  """
-
-  def __init__(self, dofn, type_hints, label=None):
-    super(TypeCheckWrapperDoFn, self).__init__()
-    self._dofn = dofn
-    self._label = label
-    self._process_fn = self._dofn.process_argspec_fn()
-    if type_hints.input_types:
-      input_args, input_kwargs = type_hints.input_types
-      self._input_hints = getcallargs_forhints(
-          self._process_fn, *input_args, **input_kwargs)
-    else:
-      self._input_hints = None
-    # TODO(robertwb): Actually extract this.
-    self.context_var = 'context'
-    # TODO(robertwb): Multi-output.
-    self._output_type_hint = type_hints.simple_output_type(label)
-
-  def start_bundle(self, context, *args, **kwargs):
-    return self._type_check_result(
-        self._dofn.start_bundle(context, *args, **kwargs))
-
-  def finish_bundle(self, context, *args, **kwargs):
-    return self._type_check_result(
-        self._dofn.finish_bundle(context, *args, **kwargs))
-
-  def process(self, context, *args, **kwargs):
-    if self._input_hints:
-      actual_inputs = inspect.getcallargs(
-          self._process_fn, context.element, *args, **kwargs)
-      for var, hint in self._input_hints.items():
-        if hint is actual_inputs[var]:
-          # self parameter
-          continue
-        var_name = var + '.element' if var == self.context_var else var
-        _check_instance_type(hint, actual_inputs[var], var_name, True)
-    return self._type_check_result(self._dofn.process(context, *args, **kwargs))
-
-  def _type_check_result(self, transform_results):
-    if self._output_type_hint is None or transform_results is None:
-      return transform_results
-
-    def type_check_output(o):
-      # TODO(robertwb): Multi-output.
-      x = o.value if isinstance(o, (SideOutputValue, WindowedValue)) else o
-      self._type_check(self._output_type_hint, x, is_input=False)
-
-    # If the return type is a generator, then we will need to interleave our
-    # type-checking with its normal iteration so we don't deplete the
-    # generator initially just by type-checking its yielded contents.
-    if isinstance(transform_results, types.GeneratorType):
-      return GeneratorWrapper(transform_results, type_check_output)
-    else:
-      for o in transform_results:
-        type_check_output(o)
-      return transform_results
-
-  def _type_check(self, type_constraint, datum, is_input):
-    """Typecheck a PTransform related datum according to a type constraint.
-
-    This function is used to optionally type-check either an input or an output
-    to a PTransform.
-
-    Args:
-        type_constraint: An instance of a typehints.TypeContraint, one of the
-          white-listed builtin Python types, or a custom user class.
-        datum: An instance of a Python object.
-        is_input: True if 'datum' is an input to a PTransform's DoFn. False
-          otherwise.
-
-    Raises:
-      TypeError: If 'datum' fails to type-check according to 'type_constraint'.
-    """
-    datum_type = 'input' if is_input else 'output'
-
-    try:
-      check_constraint(type_constraint, datum)
-    except CompositeTypeHintError as e:
-      raise TypeCheckError, e.message, sys.exc_info()[2]
-    except SimpleTypeHintError:
-      error_msg = ("According to type-hint expected %s should be of type %s. "
-                   "Instead, received '%s', an instance of type %s."
-                   % (datum_type, type_constraint, datum, type(datum)))
-      raise TypeCheckError, error_msg, sys.exc_info()[2]
-
-
-class OutputCheckWrapperDoFn(DoFn):
-  """A DoFn that verifies against common errors in the output type."""
-
-  def __init__(self, dofn, full_label):
-    self.dofn = dofn
-    self.full_label = full_label
-
-  def run(self, method, context, args, kwargs):
-    try:
-      result = method(context, *args, **kwargs)
-    except TypeCheckError as e:
-      error_msg = ('Runtime type violation detected within ParDo(%s): '
-                   '%s' % (self.full_label, e))
-      raise TypeCheckError, error_msg, sys.exc_info()[2]
-    else:
-      return self._check_type(result)
-
-  def start_bundle(self, context, *args, **kwargs):
-    return self.run(self.dofn.start_bundle, context, args, kwargs)
-
-  def finish_bundle(self, context, *args, **kwargs):
-    return self.run(self.dofn.finish_bundle, context, args, kwargs)
-
-  def process(self, context, *args, **kwargs):
-    return self.run(self.dofn.process, context, args, kwargs)
-
-  def _check_type(self, output):
-    if output is None:
-      return output
-    elif isinstance(output, (dict, basestring)):
-      object_type = type(output).__name__
-      raise TypeCheckError('Returning a %s from a ParDo or FlatMap is '
-                           'discouraged. Please use list("%s") if you really '
-                           'want this behavior.' %
-                           (object_type, output))
-    elif not isinstance(output, collections.Iterable):
-      raise TypeCheckError('FlatMap and ParDo must return an '
-                           'iterable. %s was returned instead.'
-                           % type(output))
-    return output