You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:12:39 UTC
[04/50] [abbrv] incubator-beam git commit: Move all files to
apache_beam folder
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/window.py b/sdks/python/google/cloud/dataflow/transforms/window.py
deleted file mode 100644
index 6c0c2e8..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/window.py
+++ /dev/null
@@ -1,383 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Windowing concepts.
-
-A WindowInto transform logically divides up or groups the elements of a
-PCollection into finite windows according to a windowing function (derived from
-WindowFn).
-
-The output of WindowInto contains the same elements as input, but they have been
-logically assigned to windows. The next GroupByKey(s) transforms, including one
-within a composite transform, will group by the combination of keys and windows.
-
-Windowing a PCollection allows chunks of it to be processed individually, before
-the entire PCollection is available. This is especially important for
-PCollection(s) with unbounded size, since the full PCollection is never
-available at once, since more data is continually arriving. For PCollection(s)
-with a bounded size (aka. conventional batch mode), by default, all data is
-implicitly in a single window (see GlobalWindows), unless WindowInto is
-applied.
-
-For example, a simple form of windowing divides up the data into fixed-width
-time intervals, using FixedWindows.
-
-Seconds are used as the time unit for the built-in windowing primitives here.
-Integer or floating point seconds can be passed to these primitives.
-
-Internally, seconds, with microsecond granularity, are stored as
-timeutil.Timestamp and timeutil.Duration objects. This is done to avoid
-precision errors that would occur with floating point representations.
-
-Custom windowing function classes can be created, by subclassing from
-WindowFn.
-"""
-
-from __future__ import absolute_import
-
-from google.cloud.dataflow import coders
-from google.cloud.dataflow.transforms import timeutil
-from google.cloud.dataflow.transforms.timeutil import Duration
-from google.cloud.dataflow.transforms.timeutil import MAX_TIMESTAMP
-from google.cloud.dataflow.transforms.timeutil import MIN_TIMESTAMP
-from google.cloud.dataflow.transforms.timeutil import Timestamp
-
-
-# TODO(ccy): revisit naming and semantics once Java Apache Beam finalizes their
-# behavior.
-class OutputTimeFn(object):
- """Determines how output timestamps of grouping operations are assigned."""
-
- OUTPUT_AT_EOW = 'OUTPUT_AT_EOW'
- OUTPUT_AT_EARLIEST = 'OUTPUT_AT_EARLIEST'
- OUTPUT_AT_LATEST = 'OUTPUT_AT_LATEST'
- OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED'
-
- @staticmethod
- def get_impl(output_time_fn, window_fn):
- if output_time_fn == OutputTimeFn.OUTPUT_AT_EOW:
- return timeutil.OutputAtEndOfWindowImpl()
- elif output_time_fn == OutputTimeFn.OUTPUT_AT_EARLIEST:
- return timeutil.OutputAtEarliestInputTimestampImpl()
- elif output_time_fn == OutputTimeFn.OUTPUT_AT_LATEST:
- return timeutil.OutputAtLatestInputTimestampImpl()
- elif output_time_fn == OutputTimeFn.OUTPUT_AT_EARLIEST_TRANSFORMED:
- return timeutil.OutputAtEarliestTransformedInputTimestampImpl(window_fn)
- else:
- raise ValueError('Invalid OutputTimeFn: %s.' % output_time_fn)
-
-
-class WindowFn(object):
- """An abstract windowing function defining a basic assign and merge."""
-
- class AssignContext(object):
- """Context passed to WindowFn.assign()."""
-
- def __init__(self, timestamp, element=None, existing_windows=None):
- self.timestamp = Timestamp.of(timestamp)
- self.element = element
- self.existing_windows = existing_windows
-
- def assign(self, assign_context):
- """Associates a timestamp and set of windows to an element."""
- raise NotImplementedError
-
- class MergeContext(object):
- """Context passed to WindowFn.merge() to perform merging, if any."""
-
- def __init__(self, windows):
- self.windows = list(windows)
-
- def merge(self, to_be_merged, merge_result):
- raise NotImplementedError
-
- def merge(self, merge_context):
- """Returns a window that is the result of merging a set of windows."""
- raise NotImplementedError
-
- def get_window_coder(self):
- return coders.PickleCoder()
-
- def get_transformed_output_time(self, window, input_timestamp): # pylint: disable=unused-argument
- """Given input time and output window, returns output time for window.
-
- If OutputTimeFn.OUTPUT_AT_EARLIEST_TRANSFORMED is used in the Windowing,
- the output timestamp for the given window will be the earliest of the
- timestamps returned by get_transformed_output_time() for elements of the
- window.
-
- Arguments:
- window: Output window of element.
- input_timestamp: Input timestamp of element as a timeutil.Timestamp
- object.
-
- Returns:
- Transformed timestamp.
- """
- # By default, just return the input timestamp.
- return input_timestamp
-
-
-class BoundedWindow(object):
- """A window for timestamps in range (-infinity, end).
-
- Attributes:
- end: End of window.
- """
-
- def __init__(self, end):
- self.end = Timestamp.of(end)
-
- def __cmp__(self, other):
- # Order first by endpoint, then arbitrarily.
- return cmp(self.end, other.end) or cmp(hash(self), hash(other))
-
- def __eq__(self, other):
- raise NotImplementedError
-
- def __hash__(self):
- return hash(self.end)
-
- def __repr__(self):
- return '[?, %s)' % float(self.end)
-
-
-class IntervalWindow(BoundedWindow):
- """A window for timestamps in range [start, end).
-
- Attributes:
- start: Start of window as seconds since Unix epoch.
- end: End of window as seconds since Unix epoch.
- """
-
- def __init__(self, start, end):
- super(IntervalWindow, self).__init__(end)
- self.start = Timestamp.of(start)
-
- def __hash__(self):
- return hash((self.start, self.end))
-
- def __eq__(self, other):
- return self.start == other.start and self.end == other.end
-
- def __repr__(self):
- return '[%s, %s)' % (float(self.start), float(self.end))
-
- def intersects(self, other):
- return other.start < self.end or self.start < other.end
-
- def union(self, other):
- return IntervalWindow(
- min(self.start, other.start), max(self.end, other.end))
-
-
-class WindowedValue(object):
- """A windowed value having a value, a timestamp and set of windows.
-
- Attributes:
- value: The underlying value of a windowed value.
- timestamp: Timestamp associated with the value as seconds since Unix epoch.
- windows: A set (iterable) of window objects for the value. The window
- object are descendants of the BoundedWindow class.
- """
-
- def __init__(self, value, timestamp, windows):
- self.value = value
- self.timestamp = Timestamp.of(timestamp)
- self.windows = windows
-
- def __repr__(self):
- return '(%s, %s, %s)' % (
- repr(self.value),
- 'MIN_TIMESTAMP' if self.timestamp == MIN_TIMESTAMP else
- 'MAX_TIMESTAMP' if self.timestamp == MAX_TIMESTAMP else
- float(self.timestamp),
- self.windows)
-
- def __hash__(self):
- return hash((self.value, self.timestamp, self.windows))
-
- def __eq__(self, other):
- return (type(self) == type(other)
- and self.value == other.value
- and self.timestamp == other.timestamp
- and self.windows == other.windows)
-
- def with_value(self, new_value):
- return WindowedValue(new_value, self.timestamp, self.windows)
-
-
-class TimestampedValue(object):
- """A timestamped value having a value and a timestamp.
-
- Attributes:
- value: The underlying value.
- timestamp: Timestamp associated with the value as seconds since Unix epoch.
- """
-
- def __init__(self, value, timestamp):
- self.value = value
- self.timestamp = Timestamp.of(timestamp)
-
-
-class GlobalWindow(BoundedWindow):
- """The default window into which all data is placed (via GlobalWindows)."""
- _instance = None
-
- def __new__(cls):
- if cls._instance is None:
- cls._instance = super(GlobalWindow, cls).__new__(cls)
- return cls._instance
-
- def __init__(self):
- super(GlobalWindow, self).__init__(MAX_TIMESTAMP)
- self.start = MIN_TIMESTAMP
-
- def __repr__(self):
- return 'GlobalWindow'
-
- def __hash__(self):
- return hash(type(self))
-
- def __eq__(self, other):
- # Global windows are always and only equal to each other.
- return self is other or type(self) is type(other)
-
-
-class GlobalWindows(WindowFn):
- """A windowing function that assigns everything to one global window."""
-
- @classmethod
- def windowed_value(cls, value, timestamp=MIN_TIMESTAMP):
- return WindowedValue(value, timestamp, [GlobalWindow()])
-
- def assign(self, assign_context):
- return [GlobalWindow()]
-
- def merge(self, merge_context):
- pass # No merging.
-
- def get_window_coder(self):
- return coders.SingletonCoder(GlobalWindow())
-
- def __hash__(self):
- return hash(type(self))
-
- def __eq__(self, other):
- # Global windowfn is always and only equal to each other.
- return self is other or type(self) is type(other)
-
- def __ne__(self, other):
- return not self == other
-
-
-class FixedWindows(WindowFn):
- """A windowing function that assigns each element to one time interval.
-
- The attributes size and offset determine in what time interval a timestamp
- will be slotted. The time intervals have the following formula:
- [N * size + offset, (N + 1) * size + offset)
-
- Attributes:
- size: Size of the window as seconds.
- offset: Offset of this window as seconds since Unix epoch. Windows start at
- t=N * size + offset where t=0 is the epoch. The offset must be a value
- in range [0, size). If it is not it will be normalized to this range.
- """
-
- def __init__(self, size, offset=0):
- if size <= 0:
- raise ValueError('The size parameter must be strictly positive.')
- self.size = Duration.of(size)
- self.offset = Timestamp.of(offset) % self.size
-
- def assign(self, context):
- timestamp = context.timestamp
- start = timestamp - (timestamp - self.offset) % self.size
- return [IntervalWindow(start, start + self.size)]
-
- def merge(self, merge_context):
- pass # No merging.
-
-
-class SlidingWindows(WindowFn):
- """A windowing function that assigns each element to a set of sliding windows.
-
- The attributes size and offset determine in what time interval a timestamp
- will be slotted. The time intervals have the following formula:
- [N * period + offset, N * period + offset + size)
-
- Attributes:
- size: Size of the window as seconds.
- period: Period of the windows as seconds.
- offset: Offset of this window as seconds since Unix epoch. Windows start at
- t=N * period + offset where t=0 is the epoch. The offset must be a value
- in range [0, period). If it is not it will be normalized to this range.
- """
-
- def __init__(self, size, period, offset=0):
- if size <= 0:
- raise ValueError('The size parameter must be strictly positive.')
- self.size = Duration.of(size)
- self.period = Duration.of(period)
- self.offset = Timestamp.of(offset) % size
-
- def assign(self, context):
- timestamp = context.timestamp
- start = timestamp - (timestamp - self.offset) % self.period
- return [IntervalWindow(Timestamp.of(s), Timestamp.of(s) + self.size)
- for s in range(start, start - self.size, -self.period)]
-
- def merge(self, merge_context):
- pass # No merging.
-
-
-class Sessions(WindowFn):
- """A windowing function that groups elements into sessions.
-
- A session is defined as a series of consecutive events
- separated by a specified gap size.
-
- Attributes:
- gap_size: Size of the gap between windows as floating-point seconds.
- """
-
- def __init__(self, gap_size):
- if gap_size <= 0:
- raise ValueError('The size parameter must be strictly positive.')
- self.gap_size = Duration.of(gap_size)
-
- def assign(self, context):
- timestamp = context.timestamp
- return [IntervalWindow(timestamp, timestamp + self.gap_size)]
-
- def merge(self, merge_context):
- to_merge = []
- for w in sorted(merge_context.windows, key=lambda w: w.start):
- if to_merge:
- if end > w.start:
- to_merge.append(w)
- if w.end > end:
- end = w.end
- else:
- if len(to_merge) > 1:
- merge_context.merge(to_merge,
- IntervalWindow(to_merge[0].start, end))
- to_merge = [w]
- end = w.end
- else:
- to_merge = [w]
- end = w.end
- if len(to_merge) > 1:
- merge_context.merge(to_merge, IntervalWindow(to_merge[0].start, end))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/window_test.py b/sdks/python/google/cloud/dataflow/transforms/window_test.py
deleted file mode 100644
index 155239f..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/window_test.py
+++ /dev/null
@@ -1,201 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for the windowing classes."""
-
-import unittest
-
-from google.cloud.dataflow.pipeline import Pipeline
-from google.cloud.dataflow.transforms import CombinePerKey
-from google.cloud.dataflow.transforms import combiners
-from google.cloud.dataflow.transforms import core
-from google.cloud.dataflow.transforms import Create
-from google.cloud.dataflow.transforms import GroupByKey
-from google.cloud.dataflow.transforms import Map
-from google.cloud.dataflow.transforms import window
-from google.cloud.dataflow.transforms import WindowInto
-from google.cloud.dataflow.transforms.util import assert_that, equal_to
-from google.cloud.dataflow.transforms.window import FixedWindows
-from google.cloud.dataflow.transforms.window import IntervalWindow
-from google.cloud.dataflow.transforms.window import Sessions
-from google.cloud.dataflow.transforms.window import SlidingWindows
-from google.cloud.dataflow.transforms.window import TimestampedValue
-from google.cloud.dataflow.transforms.window import WindowedValue
-from google.cloud.dataflow.transforms.window import WindowFn
-
-
-def context(element, timestamp, windows):
- return WindowFn.AssignContext(timestamp, element, windows)
-
-
-sort_values = Map(lambda (k, vs): (k, sorted(vs)))
-
-
-class ReifyWindowsFn(core.DoFn):
- def process(self, context):
- key, values = context.element
- for window in context.windows:
- yield "%s @ %s" % (key, window), values
-reify_windows = core.ParDo(ReifyWindowsFn())
-
-class WindowTest(unittest.TestCase):
-
- def test_fixed_windows(self):
- # Test windows with offset: 2, 7, 12, 17, ...
- windowfn = window.FixedWindows(size=5, offset=2)
- self.assertEqual([window.IntervalWindow(7, 12)],
- windowfn.assign(context('v', 7, [])))
- self.assertEqual([window.IntervalWindow(7, 12)],
- windowfn.assign(context('v', 11, [])))
- self.assertEqual([window.IntervalWindow(12, 17)],
- windowfn.assign(context('v', 12, [])))
-
- # Test windows without offset: 0, 5, 10, 15, ...
- windowfn = window.FixedWindows(size=5)
- self.assertEqual([window.IntervalWindow(5, 10)],
- windowfn.assign(context('v', 5, [])))
- self.assertEqual([window.IntervalWindow(5, 10)],
- windowfn.assign(context('v', 9, [])))
- self.assertEqual([window.IntervalWindow(10, 15)],
- windowfn.assign(context('v', 10, [])))
-
- # Test windows with offset out of range.
- windowfn = window.FixedWindows(size=5, offset=12)
- self.assertEqual([window.IntervalWindow(7, 12)],
- windowfn.assign(context('v', 11, [])))
-
- def test_sliding_windows_assignment(self):
- windowfn = SlidingWindows(size=15, period=5, offset=2)
- expected = [IntervalWindow(7, 22),
- IntervalWindow(2, 17),
- IntervalWindow(-3, 12)]
- self.assertEqual(expected, windowfn.assign(context('v', 7, [])))
- self.assertEqual(expected, windowfn.assign(context('v', 8, [])))
- self.assertEqual(expected, windowfn.assign(context('v', 11, [])))
-
- def test_sessions_merging(self):
- windowfn = Sessions(10)
-
- def merge(*timestamps):
- windows = [windowfn.assign(context(None, t, [])) for t in timestamps]
- running = set()
-
- class TestMergeContext(WindowFn.MergeContext):
-
- def __init__(self):
- super(TestMergeContext, self).__init__(running)
-
- def merge(self, to_be_merged, merge_result):
- for w in to_be_merged:
- if w in running:
- running.remove(w)
- running.add(merge_result)
-
- for ws in windows:
- running.update(ws)
- windowfn.merge(TestMergeContext())
- windowfn.merge(TestMergeContext())
- return sorted(running)
-
- self.assertEqual([IntervalWindow(2, 12)], merge(2))
- self.assertEqual([IntervalWindow(2, 12), IntervalWindow(19, 29)],
- merge(2, 19))
-
- self.assertEqual([IntervalWindow(2, 19)], merge(2, 9))
- self.assertEqual([IntervalWindow(2, 19)], merge(9, 2))
-
- self.assertEqual([IntervalWindow(2, 19), IntervalWindow(19, 29)],
- merge(2, 9, 19))
- self.assertEqual([IntervalWindow(2, 19), IntervalWindow(19, 29)],
- merge(19, 9, 2))
-
- self.assertEqual([IntervalWindow(2, 25)], merge(2, 15, 10))
-
- def timestamped_key_values(self, pipeline, key, *timestamps):
- return (pipeline | Create('start', timestamps)
- | Map(lambda x: WindowedValue((key, x), x, [])))
-
- def test_sliding_windows(self):
- p = Pipeline('DirectPipelineRunner')
- pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3)
- result = (pcoll
- | WindowInto('w', SlidingWindows(period=2, size=4))
- | GroupByKey()
- | reify_windows)
- expected = [('key @ [-2.0, 2.0)', [1]),
- ('key @ [0.0, 4.0)', [1, 2, 3]),
- ('key @ [2.0, 6.0)', [2, 3])]
- assert_that(result, equal_to(expected))
- p.run()
-
- def test_sessions(self):
- p = Pipeline('DirectPipelineRunner')
- pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)
- result = (pcoll
- | WindowInto('w', Sessions(10))
- | GroupByKey()
- | sort_values
- | reify_windows)
- expected = [('key @ [1.0, 13.0)', [1, 2, 3]),
- ('key @ [20.0, 45.0)', [20, 27, 35])]
- assert_that(result, equal_to(expected))
- p.run()
-
- def test_timestamped_value(self):
- p = Pipeline('DirectPipelineRunner')
- result = (p
- | Create('start', [(k, k) for k in range(10)])
- | Map(lambda (x, t): TimestampedValue(x, t))
- | WindowInto('w', FixedWindows(5))
- | Map(lambda v: ('key', v))
- | GroupByKey())
- assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]),
- ('key', [5, 6, 7, 8, 9])]))
- p.run()
-
- def test_timestamped_with_combiners(self):
- p = Pipeline('DirectPipelineRunner')
- result = (p
- # Create some initial test values.
- | Create('start', [(k, k) for k in range(10)])
- # The purpose of the WindowInto transform is to establish a
- # FixedWindows windowing function for the PCollection.
- # It does not bucket elements into windows since the timestamps
- # from Create are not spaced 5 ms apart and very likely they all
- # fall into the same window.
- | WindowInto('w', FixedWindows(5))
- # Generate timestamped values using the values as timestamps.
- # Now there are values 5 ms apart and since Map propagates the
- # windowing function from input to output the output PCollection
- # will have elements falling into different 5ms windows.
- | Map(lambda (x, t): TimestampedValue(x, t))
- # We add a 'key' to each value representing the index of the
- # window. This is important since there is no guarantee of
- # order for the elements of a PCollection.
- | Map(lambda v: (v / 5, v)))
- # Sum all elements associated with a key and window. Although it
- # is called CombinePerKey it is really CombinePerKeyAndWindow the
- # same way GroupByKey is really GroupByKeyAndWindow.
- sum_per_window = result | CombinePerKey(sum)
- # Compute mean per key and window.
- mean_per_window = result | combiners.Mean.PerKey()
- assert_that(sum_per_window, equal_to([(0, 10), (1, 35)]),
- label='assert:sum')
- assert_that(mean_per_window, equal_to([(0, 2.0), (1, 7.0)]),
- label='assert:mean')
- p.run()
-
-
-if __name__ == '__main__':
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/write_ptransform_test.py b/sdks/python/google/cloud/dataflow/transforms/write_ptransform_test.py
deleted file mode 100644
index ef8e191..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/write_ptransform_test.py
+++ /dev/null
@@ -1,124 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Unit tests for the write transform."""
-
-import logging
-import unittest
-
-import google.cloud.dataflow as df
-
-from google.cloud.dataflow.io import iobase
-from google.cloud.dataflow.pipeline import Pipeline
-from google.cloud.dataflow.transforms.ptransform import PTransform
-from google.cloud.dataflow.transforms.util import assert_that, is_empty
-from google.cloud.dataflow.utils.options import PipelineOptions
-
-
-class _TestSink(iobase.Sink):
- TEST_INIT_RESULT = 'test_init_result'
-
- def __init__(self, return_init_result=True, return_write_results=True):
- self.return_init_result = return_init_result
- self.return_write_results = return_write_results
-
- def initialize_write(self):
- if self.return_init_result:
- return _TestSink.TEST_INIT_RESULT
-
- def finalize_write(self, init_result, writer_results):
- self.init_result_at_finalize = init_result
- self.write_results_at_finalize = writer_results
-
- def open_writer(self, init_result, uid):
- writer = _TestWriter(init_result, uid, self.return_write_results)
- return writer
-
-
-class _TestWriter(iobase.Writer):
- STATE_UNSTARTED, STATE_WRITTEN, STATE_CLOSED = 0, 1, 2
- TEST_WRITE_RESULT = 'test_write_result'
-
- def __init__(self, init_result, uid, return_write_results=True):
- self.state = _TestWriter.STATE_UNSTARTED
- self.init_result = init_result
- self.uid = uid
- self.write_output = []
- self.return_write_results = return_write_results
-
- def close(self):
- assert self.state in (
- _TestWriter.STATE_WRITTEN, _TestWriter.STATE_UNSTARTED)
- self.state = _TestWriter.STATE_CLOSED
- if self.return_write_results:
- return _TestWriter.TEST_WRITE_RESULT
-
- def write(self, value):
- if self.write_output:
- assert self.state == _TestWriter.STATE_WRITTEN
- else:
- assert self.state == _TestWriter.STATE_UNSTARTED
-
- self.state = _TestWriter.STATE_WRITTEN
- self.write_output.append(value)
-
-
-class WriteToTestSink(PTransform):
-
- def __init__(self, return_init_result=True, return_write_results=True):
- self.return_init_result = return_init_result
- self.return_write_results = return_write_results
- self.last_sink = None
- self.label = 'write_to_test_sink'
-
- def apply(self, pcoll):
- self.last_sink = _TestSink(return_init_result=self.return_init_result,
- return_write_results=self.return_write_results)
- return pcoll | df.io.Write(self.last_sink)
-
-
-class WriteTest(unittest.TestCase):
- DATA = ['some data', 'more data', 'another data', 'yet another data']
-
- def _run_write_test(self,
- data,
- return_init_result=True,
- return_write_results=True):
- write_to_test_sink = WriteToTestSink(return_init_result,
- return_write_results)
- p = Pipeline(options=PipelineOptions([]))
- result = p | df.Create('start', data) | write_to_test_sink
-
- assert_that(result, is_empty())
- p.run()
-
- sink = write_to_test_sink.last_sink
- self.assertIsNotNone(sink)
-
- def test_write(self):
- self._run_write_test(WriteTest.DATA)
-
- def test_write_with_empty_pcollection(self):
- data = []
- self._run_write_test(data)
-
- def test_write_no_init_result(self):
- self._run_write_test(WriteTest.DATA, return_init_result=False)
-
- def test_write_no_write_results(self):
- self._run_write_test(WriteTest.DATA, return_write_results=False)
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/typehints/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/typehints/__init__.py b/sdks/python/google/cloud/dataflow/typehints/__init__.py
deleted file mode 100644
index 1585ad5..0000000
--- a/sdks/python/google/cloud/dataflow/typehints/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A package defining the syntax and decorator semantics for type-hints."""
-
-# pylint: disable=wildcard-import
-from google.cloud.dataflow.typehints.typehints import *
-from google.cloud.dataflow.typehints.decorators import *
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/typehints/decorators.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/typehints/decorators.py b/sdks/python/google/cloud/dataflow/typehints/decorators.py
deleted file mode 100644
index 4e8182d..0000000
--- a/sdks/python/google/cloud/dataflow/typehints/decorators.py
+++ /dev/null
@@ -1,530 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Type hinting decorators allowing static or runtime type-checking for the SDK.
-
-This module defines decorators which utilize the type-hints defined in
-'type_hints.py' to allow annotation of the types of function arguments and
-return values.
-
-Type-hints for functions are annotated using two separate decorators. One is for
-type-hinting the types of function arguments, the other for type-hinting the
-function return value. Type-hints can either be specified in the form of
-positional arguments::
-
- @with_input_types(int, int)
- def add(a, b):
- return a + b
-
-Keyword arguments::
-
- @with_input_types(a=int, b=int)
- def add(a, b):
- return a + b
-
-Or even a mix of both::
-
- @with_input_types(int, b=int)
- def add(a, b):
- return a + b
-
-Example usage for type-hinting arguments only::
-
- @with_input_types(s=str)
- def to_lower(a):
- return a.lower()
-
-Example usage for type-hinting return values only::
-
- @with_output_types(Tuple[int, bool])
- def compress_point(ec_point):
- return ec_point.x, ec_point.y < 0
-
-Example usage for type-hinting both arguments and return values::
-
- @with_input_types(a=int)
- @with_output_types(str)
- def int_to_str(a):
- return str(a)
-
-Type-hinting a function with arguments that unpack tuples are also supported. As
-an example, such a function would be defined as::
-
- def foo((a, b)):
- ...
-
-The valid type-hint for such as function looks like the following::
-
- @with_input_types(a=int, b=int)
- def foo((a, b)):
- ...
-
-Notice that we hint the type of each unpacked argument independently, rather
-than hinting the type of the tuple as a whole (Tuple[int, int]).
-
-Optionally, type-hints can be type-checked at runtime. To toggle this behavior
-this module defines two functions: 'enable_run_time_type_checking' and
-'disable_run_time_type_checking'. NOTE: for this toggle behavior to work
-properly it must appear at the top of the module where all functions are
-defined, or before importing a module containing type-hinted functions.
-"""
-
-import inspect
-import types
-
-from google.cloud.dataflow.typehints import check_constraint
-from google.cloud.dataflow.typehints import CompositeTypeHintError
-from google.cloud.dataflow.typehints import SimpleTypeHintError
-from google.cloud.dataflow.typehints import typehints
-from google.cloud.dataflow.typehints import validate_composite_type_param
-
-# This is missing in the builtin types module. str.upper is arbitrary, any
-# method on a C-implemented type will do.
-# pylint: disable=invalid-name
-_MethodDescriptorType = type(str.upper)
-# pylint: enable=invalid-name
-
-
-# Monkeypatch inspect.getargspec to allow passing non-function objects.
-# This is needed to use higher-level functions such as getcallargs.
-_original_getargspec = inspect.getargspec
-
-
-def getargspec(func):
- try:
- return _original_getargspec(func)
- except TypeError:
- if isinstance(func, (type, types.ClassType)):
- argspec = getargspec(func.__init__)
- del argspec.args[0]
- return argspec
- elif callable(func):
- try:
- return _original_getargspec(func.__call__)
- except TypeError:
- # Return an ArgSpec with at least one positional argument,
- # and any number of other (positional or keyword) arguments
- # whose name won't match any real agument.
- # Arguments with the %unknown% prefix will be ignored in the type
- # checking code.
- return inspect.ArgSpec(
- ['_'], '%unknown%varargs', '%unknown%keywords', ())
- else:
- raise
-
-inspect.getargspec = getargspec
-
-
-class IOTypeHints(object):
- """Encapsulates all type hint information about a Dataflow construct.
-
- This should primarily be used via the WithTypeHints mixin class, though
- may also be attached to other objects (such as Python functions).
- """
- __slots__ = ('input_types', 'output_types')
-
- def __init__(self, input_types=None, output_types=None):
- self.input_types = input_types
- self.output_types = output_types
-
- def set_input_types(self, *args, **kwargs):
- self.input_types = args, kwargs
-
- def set_output_types(self, *args, **kwargs):
- self.output_types = args, kwargs
-
- def simple_output_type(self, context):
- if self.output_types:
- args, kwargs = self.output_types
- if len(args) != 1 or kwargs:
- raise TypeError('Expected simple output type hint for %s' % context)
- return args[0]
-
- def copy(self):
- return IOTypeHints(self.input_types, self.output_types)
-
- def with_defaults(self, hints):
- if not hints:
- return self
- elif not self:
- return hints
- else:
- return IOTypeHints(self.input_types or hints.input_types,
- self.output_types or hints.output_types)
-
- def __nonzero__(self):
- return bool(self.input_types or self.output_types)
-
- def __repr__(self):
- return 'IOTypeHints[inputs=%s, outputs=%s]' % (
- self.input_types, self.output_types)
-
-
-class WithTypeHints(object):
- """A mixin class that provides the ability to set and retrieve type hints.
- """
-
- def __init__(self, *unused_args, **unused_kwargs):
- self._type_hints = IOTypeHints()
-
- def _get_or_create_type_hints(self):
- # __init__ may have not been called
- try:
- return self._type_hints
- except AttributeError:
- self._type_hints = IOTypeHints()
- return self._type_hints
-
- def get_type_hints(self):
- return (self._get_or_create_type_hints()
- .with_defaults(self.default_type_hints())
- .with_defaults(get_type_hints(self.__class__)))
-
- def default_type_hints(self):
- return None
-
- def with_input_types(self, *arg_hints, **kwarg_hints):
- self._get_or_create_type_hints().set_input_types(*arg_hints, **kwarg_hints)
- return self
-
- def with_output_types(self, *arg_hints, **kwarg_hints):
- self._get_or_create_type_hints().set_output_types(*arg_hints, **kwarg_hints)
- return self
-
-
-class TypeCheckError(Exception):
- pass
-
-
-def _positional_arg_hints(arg, hints):
- """Returns the type of a (possibly tuple-packed) positional argument.
-
- E.g. for lambda ((a, b), c): None the single positional argument is (as
- returned by inspect) [[a, b], c] which should have type
- Tuple[Tuple[Int, Any], float] when applied to the type hints
- {a: int, b: Any, c: float}.
- """
- if isinstance(arg, list):
- return typehints.Tuple[[_positional_arg_hints(a, hints) for a in arg]]
- else:
- return hints.get(arg, typehints.Any)
-
-
-def _unpack_positional_arg_hints(arg, hint):
- """Unpacks the given hint according to the nested structure of arg.
-
- For example, if arg is [[a, b], c] and hint is Tuple[Any, int], than
- this function would return ((Any, Any), int) so it can be used in conjunction
- with inspect.getcallargs.
- """
- if isinstance(arg, list):
- tuple_constraint = typehints.Tuple[[typehints.Any] * len(arg)]
- if not typehints.is_consistent_with(hint, tuple_constraint):
- raise typehints.TypeCheckError(
- 'Bad tuple arguments for %s: expected %s, got %s' % (
- arg, tuple_constraint, hint))
- if isinstance(hint, typehints.TupleConstraint):
- return tuple(_unpack_positional_arg_hints(a, t)
- for a, t in zip(arg, hint.tuple_types))
- else:
- return (typehints.Any,) * len(arg)
- else:
- return hint
-
-
-def getcallargs_forhints(func, *typeargs, **typekwargs):
- """Like inspect.getcallargs, but understands that Tuple[] and an Any unpack.
- """
- argspec = inspect.getargspec(func)
- # Turn Tuple[x, y] into (x, y) so getcallargs can do the proper unpacking.
- packed_typeargs = [_unpack_positional_arg_hints(arg, hint)
- for (arg, hint) in zip(argspec.args, typeargs)]
- packed_typeargs += list(typeargs[len(packed_typeargs):])
- try:
- callargs = inspect.getcallargs(func, *packed_typeargs, **typekwargs)
- except TypeError, e:
- raise TypeCheckError(e)
- if argspec.defaults:
- # Declare any default arguments to be Any.
- for k, var in enumerate(reversed(argspec.args)):
- if k >= len(argspec.defaults):
- break
- if callargs.get(var, None) is argspec.defaults[-k]:
- callargs[var] = typehints.Any
- # Patch up varargs and keywords
- if argspec.varargs:
- callargs[argspec.varargs] = typekwargs.get(
- argspec.varargs, typehints.Tuple[typehints.Any, ...])
- if argspec.keywords:
- # TODO(robertwb): Consider taking the union of key and value types.
- callargs[argspec.keywords] = typekwargs.get(
- argspec.keywords, typehints.Dict[typehints.Any, typehints.Any])
- return callargs
-
-
-def get_type_hints(fn):
- """Gets the type hint associated with an arbitrary object fn.
-
- Always returns a valid IOTypeHints object, creating one if necissary.
- """
- # pylint: disable=protected-access
- if not hasattr(fn, '_type_hints'):
- try:
- fn._type_hints = IOTypeHints()
- except (AttributeError, TypeError):
- # Can't add arbitrary attributes to this object,
- # but might have some restrictions anyways...
- hints = IOTypeHints()
- if isinstance(fn, _MethodDescriptorType):
- hints.set_input_types(fn.__objclass__)
- return hints
- return fn._type_hints
- # pylint: enable=protected-access
-
-
-def with_input_types(*positional_hints, **keyword_hints):
- """A decorator that type-checks defined type-hints with passed func arguments.
-
- All type-hinted arguments can be specified using positional arguments,
- keyword arguments, or a mix of both. Additionaly, all function arguments must
- be type-hinted in totality if even one parameter is type-hinted.
-
- Once fully decorated, if the arguments passed to the resulting function
- violate the type-hint constraints defined, a TypeCheckError detailing the
- error will be raised.
-
- To be used as::
-
- * @with_input_types(s=str) # just @with_input_types(str) will work too.
- def upper(s):
- return s.upper()
-
- Or::
-
- * @with_input_types(ls=List[Tuple[int, int])
- def increment(ls):
- [(i + 1, j + 1) for (i,j) in ls]
-
- Args:
- *positional_hints: Positional type-hints having identical order as the
- function's formal arguments. Values for this argument must either be a
- built-in Python type or an instance of a TypeContraint created by
- 'indexing' a CompositeTypeHint instance with a type parameter.
- **keyword_hints: Keyword arguments mirroring the names of the parameters to
- the decorated functions. The value of each keyword argument must either
- be one of the allowed built-in Python types, a custom class, or an
- instance of a TypeContraint created by 'indexing' a CompositeTypeHint
- instance with a type parameter.
-
- Raises:
- ValueError: If not all function arguments have corresponding type-hints
- specified. Or if the inner wrapper function isn't passed a function
- object.
- TypeCheckError: If the any of the passed type-hint constraints are not a
- type or TypeContraint instance.
-
- Returns:
- The original function decorated such that it enforces type-hint constraints
- for all received function arguments.
- """
-
- def annotate(f):
- if isinstance(f, types.FunctionType):
- for t in list(positional_hints) + list(keyword_hints.values()):
- validate_composite_type_param(
- t, error_msg_prefix='All type hint arguments')
-
- get_type_hints(f).set_input_types(*positional_hints, **keyword_hints)
- return f
- return annotate
-
-
-def with_output_types(*return_type_hint, **kwargs):
- """A decorator that type-checks defined type-hints for return values(s).
-
- This decorator will type-check the return value(s) of the decorated function.
-
- Only a single type-hint is accepted to specify the return type of the return
- value. If the function to be decorated has multiple return values, then one
- should use: 'Tuple[type_1, type_2]' to annotate the types of the return
- values.
-
- If the ultimate return value for the function violates the specified type-hint
- a TypeCheckError will be raised detailing the type-constraint violation.
-
- This decorator is intended to be used like::
-
- * @with_output_types(Set[Coordinate])
- def parse_ints(ints):
- ....
- return [Coordinate.from_int(i) for i in ints]
-
- Or with a simple type-hint::
-
- * @with_output_types(bool)
- def negate(p):
- return not p if p else p
-
- Args:
- *return_type_hint: A type-hint specifying the proper return type of the
- function. This argument should either be a built-in Python type or an
- instance of a 'TypeConstraint' created by 'indexing' a
- 'CompositeTypeHint'.
- **kwargs: Not used.
-
- Raises:
- ValueError: If any kwarg parameters are passed in, or the length of
- 'return_type_hint' is greater than 1. Or if the inner wrapper function
- isn't passed a function object.
- TypeCheckError: If the 'return_type_hint' object is in invalid type-hint.
-
- Returns:
- The original function decorated such that it enforces type-hint constraints
- for all return values.
- """
- if kwargs:
- raise ValueError("All arguments for the 'returns' decorator must be "
- "positional arguments.")
-
- if len(return_type_hint) != 1:
- raise ValueError("'returns' accepts only a single positional argument. In "
- "order to specify multiple return types, use the 'Tuple' "
- "type-hint.")
-
- return_type_hint = return_type_hint[0]
-
- validate_composite_type_param(
- return_type_hint,
- error_msg_prefix='All type hint arguments'
- )
-
- def annotate(f):
- get_type_hints(f).set_output_types(return_type_hint)
- return f
- return annotate
-
-
-def _check_instance_type(
- type_constraint, instance, var_name=None, verbose=False):
- """A helper function to report type-hint constraint violations.
-
- Args:
- type_constraint: An instance of a 'TypeConstraint' or a built-in Python
- type.
- instance: The candidate object which will be checked by to satisfy
- 'type_constraint'.
- var_name: If 'instance' is an argument, then the actual name for the
- parameter in the original function definition.
-
- Raises:
- TypeCheckError: If 'instance' fails to meet the type-constraint of
- 'type_constraint'.
- """
- hint_type = (
- "argument: '%s'" % var_name if var_name is not None else 'return type')
-
- try:
- check_constraint(type_constraint, instance)
- except SimpleTypeHintError:
- if verbose:
- verbose_instance = '%s, ' % instance
- else:
- verbose_instance = ''
- raise TypeCheckError('Type-hint for %s violated. Expected an '
- 'instance of %s, instead found %san instance of %s.'
- % (hint_type, type_constraint,
- verbose_instance, type(instance)))
- except CompositeTypeHintError as e:
- raise TypeCheckError('Type-hint for %s violated: %s' % (hint_type, e))
-
-
-def _interleave_type_check(type_constraint, var_name=None):
- """Lazily type-check the type-hint for a lazily generated sequence type.
-
- This function can be applied as a decorator or called manually in a curried
- manner:
- * @_interleave_type_check(List[int])
- def gen():
- yield 5
-
- or
-
- * gen = _interleave_type_check(Tuple[int, int], 'coord_gen')(gen)
-
- As a result, all type-checking for the passed generator will occur at 'yield'
- time. This way, we avoid having to depleat the generator in order to
- type-check it.
-
- Args:
- type_constraint: An instance of a TypeConstraint. The output yielded of
- 'gen' will be type-checked according to this type constraint.
- var_name: The variable name binded to 'gen' if type-checking a function
- argument. Used solely for templating in error message generation.
-
- Returns:
- A function which takes a generator as an argument and returns a wrapped
- version of the generator that interleaves type-checking at 'yield'
- iteration. If the generator received is already wrapped, then it is simply
- returned to avoid nested wrapping.
- """
- def wrapper(gen):
- if isinstance(gen, GeneratorWrapper):
- return gen
- else:
- return GeneratorWrapper(
- gen,
- lambda x: _check_instance_type(type_constraint, x, var_name)
- )
- return wrapper
-
-
-class GeneratorWrapper(object):
- """A wrapper around a generator, allows execution of a callback per yield.
-
- Additionally, wrapping a generator with this class allows one to assign
- arbitary attributes to a generator object just as with a function object.
-
- Attributes:
- internal_gen: A instance of a generator object. As part of 'step' of the
- generator, the yielded object will be passed to 'interleave_func'.
- interleave_func: A callback accepting a single argument. This function will
- be called with the result of each yielded 'step' in the internal
- generator.
- """
-
- def __init__(self, gen, interleave_func):
- self.internal_gen = gen
- self.interleave_func = interleave_func
-
- def __getattr__(self, attr):
- # TODO(laolu): May also want to intercept 'send' in the future if we move to
- # a GeneratorHint with 3 type-params:
- # * Generator[send_type, return_type, yield_type]
- if attr == '__next__':
- return self.__next__()
- elif attr == '__iter__':
- return self.__iter__()
- else:
- return getattr(self.internal_gen, attr)
-
- def next(self):
- next_val = next(self.internal_gen)
- self.interleave_func(next_val)
- return next_val
-
- def __iter__(self):
- while True:
- x = next(self.internal_gen)
- self.interleave_func(x)
- yield x
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/typehints/opcodes.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/typehints/opcodes.py b/sdks/python/google/cloud/dataflow/typehints/opcodes.py
deleted file mode 100644
index 9b5fd52..0000000
--- a/sdks/python/google/cloud/dataflow/typehints/opcodes.py
+++ /dev/null
@@ -1,331 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Defines the actions various bytecodes have on the frame.
-
-Each function here corresponds to a bytecode documented in
-https://docs.python.org/2/library/dis.html. The first argument is a (mutable)
-FrameState object, the second the integer opcode argument.
-
-Bytecodes with more complicated behavior (e.g. modifying the program counter)
-are handled inline rather than here.
-"""
-import types
-
-from trivial_inference import union, element_type, Const, BoundMethod
-import typehints
-from typehints import Any, Dict, Iterable, List, Tuple, Union
-
-
-def pop_one(state, unused_arg):
- del state.stack[-1:]
-
-
-def pop_two(state, unused_arg):
- del state.stack[-2:]
-
-
-def pop_three(state, unused_arg):
- del state.stack[-3:]
-
-
-def push_value(v):
-
- def pusher(state, unused_arg):
- state.stack.append(v)
-
- return pusher
-
-
-def nop(unused_state, unused_arg):
- pass
-
-
-def pop_top(state, unused_arg):
- state.stack.pop()
-
-
-def rot_n(state, n):
- state.stack[-n:] = [state.stack[-1]] + state.stack[-n:-1]
-
-
-def rot_two(state, unused_arg):
- rot_n(state, 2)
-
-
-def rot_three(state, unused_arg):
- rot_n(state, 3)
-
-
-def rot_four(state, unused_arg):
- rot_n(state, 4)
-
-
-def dup_top(state, unused_arg):
- state.stack.append(state.stack[-1])
-
-
-def unary(state, unused_arg):
- state.stack[-1] = Const.unwrap(state.stack[-1])
-
-
-unary_positive = unary_negative = unary_invert = unary
-
-
-def unary_not(state, unused_arg):
- state.stack[-1] = bool
-
-
-def unary_convert(state, unused_arg):
- state.stack[-1] = str
-
-
-def get_iter(state, unused_arg):
- state.stack.append(Iterable[element_type(state.stack.pop())])
-
-
-def symmetric_binary_op(state, unused_arg):
- # TODO(robertwb): This may not be entirely correct...
- b, a = state.stack.pop(), state.stack.pop()
- if a == b:
- state.stack.append(a)
- elif type(a) == type(b) and isinstance(a, typehints.SequenceTypeConstraint):
- state.stack.append(type(a)(union(element_type(a), element_type(b))))
- else:
- state.stack.append(Any)
-# Except for int ** -int
-binary_power = inplace_power = symmetric_binary_op
-binary_multiply = inplace_multiply = symmetric_binary_op
-binary_divide = inplace_divide = symmetric_binary_op
-binary_floor_divide = inplace_floor_divide = symmetric_binary_op
-
-
-def binary_true_divide(state, unused_arg):
- u = union(state.stack.pop(), state.stack.pop)
- if u == int:
- state.stack.append(float)
- else:
- state.stack.append(u)
-
-
-inplace_true_divide = binary_true_divide
-
-binary_modulo = inplace_modulo = symmetric_binary_op
-# TODO(robertwb): Tuple add.
-binary_add = inplace_add = symmetric_binary_op
-binary_subtract = inplace_subtract = symmetric_binary_op
-
-
-def binary_subscr(state, unused_arg):
- tos = state.stack.pop()
- if tos in (str, unicode):
- out = tos
- else:
- out = element_type(tos)
- state.stack.append(out)
-
-# As far as types are concerned.
-binary_lshift = inplace_lshift = binary_rshift = inplace_rshift = pop_top
-
-binary_and = inplace_and = symmetric_binary_op
-binary_xor = inplace_xor = symmetric_binary_op
-binary_or = inpalce_or = symmetric_binary_op
-
-# As far as types are concerned.
-slice_0 = nop
-slice_1 = slice_2 = pop_top
-slice_3 = pop_two
-store_slice_0 = store_slice_1 = store_slice_2 = store_slice_3 = nop
-delete_slice_0 = delete_slice_1 = delete_slice_2 = delete_slice_3 = nop
-
-
-def store_subscr(unused_state, unused_args):
- # TODO(robertwb): Update element/value type of iterable/dict.
- pass
-
-
-binary_divide = binary_floor_divide = binary_modulo = symmetric_binary_op
-binary_divide = binary_floor_divide = binary_modulo = symmetric_binary_op
-binary_divide = binary_floor_divide = binary_modulo = symmetric_binary_op
-
-# print_expr
-print_item = pop_top
-# print_item_to
-print_newline = nop
-
-# print_newline_to
-
-
-# break_loop
-# continue_loop
-def list_append(state, arg):
- state.stack[-arg] = List[Union[element_type(state.stack[-arg]),
- Const.unwrap(state.stack.pop())]]
-
-
-load_locals = push_value(Dict[str, Any])
-
-# return_value
-# yield_value
-# import_star
-exec_stmt = pop_three
-# pop_block
-# end_finally
-build_class = pop_three
-
-# setup_with
-# with_cleanup
-
-
-# store_name
-# delete_name
-def unpack_sequence(state, arg):
- t = state.stack.pop()
- if isinstance(t, Const):
- try:
- unpacked = [Const(ti) for ti in t.value]
- if len(unpacked) != arg:
- unpacked = [Any] * arg
- except TypeError:
- unpacked = [Any] * arg
- elif (isinstance(t, typehints.TupleHint.TupleConstraint)
- and len(t.tuple_types) == arg):
- unpacked = list(t.tuple_types)
- else:
- unpacked = [element_type(t)] * arg
- state.stack += reversed(unpacked)
-
-
-def dup_topx(state, arg):
- state.stack += state[-arg:]
-
-
-store_attr = pop_top
-delete_attr = nop
-store_global = pop_top
-delete_global = nop
-
-
-def load_const(state, arg):
- state.stack.append(state.const_type(arg))
-
-
-load_name = push_value(Any)
-
-
-def build_tuple(state, arg):
- if arg == 0:
- state.stack.append(Tuple[()])
- else:
- state.stack[-arg:] = [Tuple[[Const.unwrap(t) for t in state.stack[-arg:]]]]
-
-
-def build_list(state, arg):
- if arg == 0:
- state.stack.append(List[Union[()]])
- else:
- state.stack[-arg:] = [List[reduce(union, state.stack[-arg:], Union[()])]]
-
-
-build_map = push_value(Dict[Any, Any])
-
-
-def load_attr(state, arg):
- o = state.stack.pop()
- name = state.get_name(arg)
- if isinstance(o, Const) and hasattr(o.value, name):
- state.stack.append(Const(getattr(o.value, name)))
- elif (isinstance(o, (type, types.ClassType))
- and isinstance(getattr(o, name, None), types.MethodType)):
- state.stack.append(Const(BoundMethod(getattr(o, name))))
- else:
- state.stack.append(Any)
-
-
-def compare_op(state, unused_arg):
- # Could really be anything...
- state.stack[-2:] = [bool]
-
-
-def import_name(state, unused_arg):
- state.stack[-2:] = [Any]
-
-
-import_from = push_value(Any)
-
-# jump
-
-# for_iter
-
-
-def load_global(state, arg):
- state.stack.append(state.get_global(arg))
-
-# setup_loop
-# setup_except
-# setup_finally
-
-store_map = pop_two
-
-
-def load_fast(state, arg):
- state.stack.append(state.vars[arg])
-
-
-def store_fast(state, arg):
- state.vars[arg] = state.stack.pop()
-
-
-def delete_fast(state, arg):
- state.vars[arg] = Any # really an error
-
-
-def load_closure(state, unused_arg):
- state.stack.append(Any) # really a Cell
-
-
-def load_deref(state, arg):
- state.stack.append(state.closure_type(arg))
-# raise_varargs
-
-
-def call_function(state, arg, has_var=False, has_kw=False):
- # TODO(robertwb): Recognize builtins and dataflow objects
- # (especially special return values).
- pop_count = (arg & 0xF) + (arg & 0xF0) / 8 + 1 + has_var + has_kw
- state.stack[-pop_count:] = [Any]
-
-
-def make_function(state, arg):
- state.stack[-arg - 1:] = [Any] # a callable
-
-
-def make_closure(state, arg):
- state.stack[-arg - 2:] = [Any] # a callable
-
-
-def build_slice(state, arg):
- state.stack[-arg:] = [Any] # a slice object
-
-
-def call_function_var(state, arg):
- call_function(state, arg, has_var=True)
-
-
-def call_function_kw(state, arg):
- call_function(state, arg, has_kw=True)
-
-
-def call_function_var_wk(state, arg):
- call_function(state, arg, has_var=True, has_kw=True)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/typehints/trivial_inference.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/typehints/trivial_inference.py b/sdks/python/google/cloud/dataflow/typehints/trivial_inference.py
deleted file mode 100644
index dd117d3..0000000
--- a/sdks/python/google/cloud/dataflow/typehints/trivial_inference.py
+++ /dev/null
@@ -1,415 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Trivial type inference for simple functions.
-"""
-import __builtin__
-import collections
-import dis
-import pprint
-import sys
-import types
-
-from google.cloud.dataflow.typehints import Any
-from google.cloud.dataflow.typehints import typehints
-
-
-class TypeInferenceError(ValueError):
- """Error to raise when type inference failed."""
- pass
-
-
-def instance_to_type(o):
- """Given a Python object o, return the corresponding type hint.
- """
- t = type(o)
- if o is None:
- # TODO(robertwb): Eliminate inconsistent use of None vs. NoneType.
- return None
- elif t not in typehints.DISALLOWED_PRIMITIVE_TYPES:
- if t == types.InstanceType:
- return o.__class__
- elif t == BoundMethod:
- return types.MethodType
- else:
- return t
- elif t == tuple:
- return typehints.Tuple[[instance_to_type(item) for item in o]]
- elif t == list:
- return typehints.List[
- typehints.Union[[instance_to_type(item) for item in o]]
- ]
- elif t == set:
- return typehints.Set[
- typehints.Union[[instance_to_type(item) for item in o]]
- ]
- elif t == dict:
- return typehints.Dict[
- typehints.Union[[instance_to_type(k) for k, v in o.items()]],
- typehints.Union[[instance_to_type(v) for k, v in o.items()]],
- ]
- else:
- raise TypeInferenceError('Unknown forbidden type: %s' % t)
-
-
-def union_list(xs, ys):
- assert len(xs) == len(ys)
- return [union(x, y) for x, y in zip(xs, ys)]
-
-
-class Const(object):
-
- def __init__(self, value):
- self.value = value
- self.type = instance_to_type(value)
-
- def __eq__(self, other):
- return isinstance(other, Const) and self.value == other.value
-
- def __hash__(self):
- return hash(self.value)
-
- def __repr__(self):
- return 'Const[%s]' % str(self.value)[:100]
-
- @staticmethod
- def unwrap(x):
- if isinstance(x, Const):
- return x.type
- else:
- return x
-
- @staticmethod
- def unwrap_all(xs):
- return [Const.unwrap(x) for x in xs]
-
-
-class FrameState(object):
- """Stores the state of the frame at a particular point of execution.
- """
-
- def __init__(self, f, local_vars=None, stack=()):
- self.f = f
- self.co = f.func_code
- self.vars = list(local_vars)
- self.stack = list(stack)
-
- def __eq__(self, other):
- return self.__dict__ == other.__dict__
-
- def copy(self):
- return FrameState(self.f, self.vars, self.stack)
-
- def const_type(self, i):
- return Const(self.co.co_consts[i])
-
- def closure_type(self, i):
- ncellvars = len(self.co.co_cellvars)
- if i < ncellvars:
- return Any
- else:
- return Const(self.f.func_closure[i - ncellvars].cell_contents)
-
- def get_global(self, i):
- name = self.get_name(i)
- if name in self.f.func_globals:
- return Const(self.f.func_globals[name])
- if name in __builtin__.__dict__:
- return Const(__builtin__.__dict__[name])
- else:
- return Any
-
- def get_name(self, i):
- return self.co.co_names[i]
-
- def __repr__(self):
- return 'Stack: %s Vars: %s' % (self.stack, self.vars)
-
- def __or__(self, other):
- if self is None:
- return other.copy()
- elif other is None:
- return self.copy()
- else:
- return FrameState(self.f, union_list(self.vars, other.vars), union_list(
- self.stack, other.stack))
-
- def __ror__(self, left):
- return self | left
-
-
-def union(a, b):
- """Returns the union of two types or Const values.
- """
- if a == b:
- return a
- elif not a:
- return b
- elif not b:
- return a
- a = Const.unwrap(a)
- b = Const.unwrap(b)
- # TODO(robertwb): Work this into the Union code in a more generic way.
- if type(a) == type(b) and element_type(a) == typehints.Union[()]:
- return b
- elif type(a) == type(b) and element_type(b) == typehints.Union[()]:
- return a
- else:
- return typehints.Union[a, b]
-
-
-def element_type(hint):
- """Returns the element type of a composite type.
- """
- hint = Const.unwrap(hint)
- if isinstance(hint, typehints.SequenceTypeConstraint):
- return hint.inner_type
- elif isinstance(hint, typehints.TupleHint.TupleConstraint):
- return typehints.Union[hint.tuple_types]
- else:
- return Any
-
-
-def key_value_types(kv_type):
- """Returns the key and value type of a KV type.
- """
- # TODO(robertwb): Unions of tuples, etc.
- # TODO(robertwb): Assert?
- if (isinstance(kv_type, typehints.TupleHint.TupleConstraint)
- and len(kv_type.tuple_types) == 2):
- return kv_type.tuple_types
- return Any, Any
-
-
-known_return_types = {len: int, hash: int,}
-
-
-class BoundMethod(object):
- """Used to create a bound method when we only know the type of the instance.
- """
-
- def __init__(self, unbound):
- self.unbound = unbound
-
-
-def hashable(c):
- try:
- hash(c)
- return True
- except TypeError:
- return False
-
-
-def infer_return_type(c, input_types, debug=False, depth=5):
- """Analyses a callable to deduce its return type.
-
- Args:
- f: A Python function object to infer the return type of.
- input_types: A sequence of inputs corresponding to the input types.
- debug: Whether to print verbose debugging information.
-
- Returns:
- A TypeConstraint that that the return value of this function will (likely)
- satisfy given the specified inputs.
- """
- try:
- if hashable(c) and c in known_return_types:
- return known_return_types[c]
- elif isinstance(c, types.FunctionType):
- return infer_return_type_func(c, input_types, debug, depth)
- elif isinstance(c, types.MethodType):
- if c.im_self is not None:
- input_types = [Const(c.im_self)] + input_types
- return infer_return_type_func(c.im_func, input_types, debug, depth)
- elif isinstance(c, BoundMethod):
- input_types = [c.unbound.im_class] + input_types
- return infer_return_type_func(c.unbound.im_func, input_types, debug, depth)
- elif isinstance(c, (type, types.ClassType)):
- if c in typehints.DISALLOWED_PRIMITIVE_TYPES:
- return {
- list: typehints.List[Any],
- set: typehints.Set[Any],
- tuple: typehints.Tuple[Any, ...],
- dict: typehints.Dict[Any, Any]
- }[c]
- else:
- return c
- else:
- return Any
- except TypeInferenceError:
- return Any
- except Exception:
- if debug:
- sys.stdout.flush()
- raise
- else:
- return Any
-
-
-def infer_return_type_func(f, input_types, debug=False, depth=0):
- """Analyses a function to deduce its return type.
-
- Args:
- f: A Python function object to infer the return type of.
- input_types: A sequence of inputs corresponding to the input types.
- debug: Whether to print verbose debugging information.
-
- Returns:
- A TypeConstraint that that the return value of this function will (likely)
- satisfy given the specified inputs.
-
- Raises:
- TypeInferenceError: if no type can be inferred.
- """
- if debug:
- print
- print f, id(f), input_types
- import opcodes
- simple_ops = dict((k.upper(), v) for k, v in opcodes.__dict__.items())
-
- co = f.func_code
- code = co.co_code
- end = len(code)
- pc = 0
- extended_arg = 0
- free = None
-
- yields = set()
- returns = set()
- # TODO(robertwb): Default args via inspect module.
- local_vars = list(input_types) + [typehints.Union[()]] * (len(co.co_varnames)
- - len(input_types))
- state = FrameState(f, local_vars)
- states = collections.defaultdict(lambda: None)
- jumps = collections.defaultdict(int)
-
- last_pc = -1
- while pc < end:
- start = pc
- op = ord(code[pc])
-
- if debug:
- print '-->' if pc == last_pc else ' ',
- print repr(pc).rjust(4),
- print dis.opname[op].ljust(20),
- pc += 1
- if op >= dis.HAVE_ARGUMENT:
- arg = ord(code[pc]) + ord(code[pc + 1]) * 256 + extended_arg
- extended_arg = 0
- pc += 2
- if op == dis.EXTENDED_ARG:
- extended_arg = arg * 65536L
- if debug:
- print str(arg).rjust(5),
- if op in dis.hasconst:
- print '(' + repr(co.co_consts[arg]) + ')',
- elif op in dis.hasname:
- print '(' + co.co_names[arg] + ')',
- elif op in dis.hasjrel:
- print '(to ' + repr(pc + arg) + ')',
- elif op in dis.haslocal:
- print '(' + co.co_varnames[arg] + ')',
- elif op in dis.hascompare:
- print '(' + dis.cmp_op[arg] + ')',
- elif op in dis.hasfree:
- if free is None:
- free = co.co_cellvars + co.co_freevars
- print '(' + free[arg] + ')',
-
- # Acutally emulate the op.
- if state is None and states[start] is None:
- # No control reaches here (yet).
- if debug:
- print
- continue
- state |= states[start]
-
- opname = dis.opname[op]
- jmp = jmp_state = None
- if opname.startswith('CALL_FUNCTION'):
- standard_args = (arg & 0xF) + (arg & 0xF0) / 8
- var_args = 'VAR' in opname
- kw_args = 'KW' in opname
- pop_count = standard_args + var_args + kw_args + 1
- if depth <= 0:
- return_type = Any
- elif arg & 0xF0:
- # TODO(robertwb): Handle this case.
- return_type = Any
- elif isinstance(state.stack[-pop_count], Const):
- # TODO(robertwb): Handle this better.
- if var_args or kw_args:
- state.stack[-1] = Any
- state.stack[-var_args - kw_args] = Any
- inputs = [] if pop_count == 1 else state.stack[1 - pop_count:]
- return_type = infer_return_type(state.stack[-pop_count].value,
- state.stack[1 - pop_count:],
- debug=debug,
- depth=depth - 1)
- else:
- return_type = Any
- state.stack[-pop_count:] = [return_type]
- elif opname in simple_ops:
- simple_ops[opname](state, arg)
- elif opname == 'RETURN_VALUE':
- returns.add(state.stack[-1])
- state = None
- elif opname == 'YIELD_VALUE':
- yields.add(state.stack[-1])
- elif opname == 'JUMP_FORWARD':
- jmp = pc + arg
- jmp_state = state
- state = None
- elif opname == 'JUMP_ABSOLUTE':
- jmp = arg
- jmp_state = state
- state = None
- elif opname in ('POP_JUMP_IF_TRUE', 'POP_JUMP_IF_FALSE'):
- state.stack.pop()
- jmp = arg
- jmp_state = state.copy()
- elif opname in ('JUMP_IF_TRUE_OR_POP', 'JUMP_IF_FALSE_OR_POP'):
- jmp = arg
- jmp_state = state.copy()
- state.stack.pop()
- elif opname == 'FOR_ITER':
- jmp = pc + arg
- jmp_state = state.copy()
- jmp_state.stack.pop()
- state.stack.append(element_type(state.stack[-1]))
- else:
- raise TypeInferenceError('unable to handle %s' % opname)
-
- if jmp is not None:
- # TODO(robertwb): Is this guerenteed to converge?
- new_state = states[jmp] | jmp_state
- if jmp < pc and new_state != states[jmp] and jumps[pc] < 5:
- jumps[pc] += 1
- pc = jmp
- states[jmp] = new_state
-
- if debug:
- print
- print state
- pprint.pprint(dict(item for item in states.items() if item[1]))
-
- if yields:
- result = typehints.Iterable[reduce(union, Const.unwrap_all(yields))]
- else:
- result = reduce(union, Const.unwrap_all(returns))
-
- if debug:
- print f, id(f), input_types, '->', result
- return result
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/typehints/trivial_inference_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/typehints/trivial_inference_test.py b/sdks/python/google/cloud/dataflow/typehints/trivial_inference_test.py
deleted file mode 100644
index 5d945ba..0000000
--- a/sdks/python/google/cloud/dataflow/typehints/trivial_inference_test.py
+++ /dev/null
@@ -1,148 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Tests for google.cloud.dataflow.typehints.trivial_inference."""
-import unittest
-
-
-from google.cloud.dataflow.typehints import trivial_inference
-from google.cloud.dataflow.typehints import typehints
-
-global_int = 1
-
-
-class TrivialInferenceTest(unittest.TestCase):
-
- def assertReturnType(self, expected, f, inputs=()):
- self.assertEquals(expected, trivial_inference.infer_return_type(f, inputs))
-
- def testIdentity(self):
- self.assertReturnType(int, lambda x: x, [int])
-
- def testTuples(self):
- self.assertReturnType(
- typehints.Tuple[typehints.Tuple[()], int], lambda x: ((), x), [int])
- self.assertReturnType(
- typehints.Tuple[str, int, float], lambda x: (x, 0, 1.0), [str])
-
- def testUnpack(self):
- def reverse((a, b)):
- return b, a
- any_tuple = typehints.Tuple[typehints.Any, typehints.Any]
- self.assertReturnType(
- typehints.Tuple[int, float], reverse, [typehints.Tuple[float, int]])
- self.assertReturnType(
- typehints.Tuple[int, int], reverse, [typehints.Tuple[int, ...]])
- self.assertReturnType(
- typehints.Tuple[int, int], reverse, [typehints.List[int]])
- self.assertReturnType(
- typehints.Tuple[typehints.Union[int, float, str],
- typehints.Union[int, float, str]],
- reverse, [typehints.Tuple[int, float, str]])
- self.assertReturnType(any_tuple, reverse, [typehints.Any])
-
- self.assertReturnType(typehints.Tuple[int, float],
- reverse, [trivial_inference.Const((1.0, 1))])
- self.assertReturnType(any_tuple,
- reverse, [trivial_inference.Const((1, 2, 3))])
-
- def testListComprehension(self):
- self.assertReturnType(
- typehints.List[int],
- lambda xs: [x for x in xs],
- [typehints.Tuple[int, ...]])
-
- def testTupleListComprehension(self):
- self.assertReturnType(
- typehints.List[int],
- lambda xs: [x for x in xs],
- [typehints.Tuple[int, int, int]])
- self.assertReturnType(
- typehints.List[typehints.Union[int, float]],
- lambda xs: [x for x in xs],
- [typehints.Tuple[int, float]])
-
- def testGenerator(self):
-
- def foo(x, y):
- yield x
- yield y
-
- self.assertReturnType(typehints.Iterable[int], foo, [int, int])
- self.assertReturnType(
- typehints.Iterable[typehints.Union[int, float]], foo, [int, float])
-
- def testBinOp(self):
- self.assertReturnType(int, lambda a, b: a + b, [int, int])
- self.assertReturnType(
- typehints.Any, lambda a, b: a + b, [int, typehints.Any])
- self.assertReturnType(
- typehints.List[typehints.Union[int, str]], lambda a, b: a + b,
- [typehints.List[int], typehints.List[str]])
-
- def testCall(self):
- f = lambda x, *args: x
- self.assertReturnType(
- typehints.Tuple[int, float], lambda: (f(1), f(2.0, 3)))
-
- def testClosure(self):
- x = 1
- y = 1.0
- self.assertReturnType(typehints.Tuple[int, float], lambda: (x, y))
-
- def testGlobals(self):
- self.assertReturnType(int, lambda: global_int)
-
- def testBuiltins(self):
- self.assertReturnType(int, lambda x: len(x), [typehints.Any])
-
- def testGetAttr(self):
- self.assertReturnType(
- typehints.Tuple[str, typehints.Any],
- lambda: (typehints.__doc__, typehints.fake))
-
- def testMethod(self):
-
- class A(object):
-
- def m(self, x):
- return x
-
- self.assertReturnType(int, lambda: A().m(3))
- self.assertReturnType(float, lambda: A.m(A(), 3.0))
-
- def testAlwaysReturnsEarly(self):
-
- def some_fn(v):
- if v:
- return 1
- else:
- return 2
-
- self.assertReturnType(int, some_fn)
-
- def testDict(self):
- self.assertReturnType(
- typehints.Dict[typehints.Any, typehints.Any], lambda: {})
-
- def testDictComprehension(self):
- # Just ensure it doesn't crash.
- fields = []
- self.assertReturnType(
- typehints.Any,
- lambda row: {f: row[f] for f in fields}, [typehints.Any])
-
-
-if __name__ == '__main__':
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/typehints/typecheck.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/typehints/typecheck.py b/sdks/python/google/cloud/dataflow/typehints/typecheck.py
deleted file mode 100644
index 7dad46e..0000000
--- a/sdks/python/google/cloud/dataflow/typehints/typecheck.py
+++ /dev/null
@@ -1,161 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Runtime type checking support."""
-
-import collections
-import inspect
-import sys
-import types
-
-from google.cloud.dataflow.pvalue import SideOutputValue
-from google.cloud.dataflow.transforms.core import DoFn
-from google.cloud.dataflow.transforms.window import WindowedValue
-from google.cloud.dataflow.typehints import check_constraint
-from google.cloud.dataflow.typehints import CompositeTypeHintError
-from google.cloud.dataflow.typehints import GeneratorWrapper
-from google.cloud.dataflow.typehints import SimpleTypeHintError
-from google.cloud.dataflow.typehints import TypeCheckError
-from google.cloud.dataflow.typehints.decorators import _check_instance_type
-from google.cloud.dataflow.typehints.decorators import getcallargs_forhints
-
-
-class TypeCheckWrapperDoFn(DoFn):
- """A wrapper around a DoFn which performs type-checking of input and output.
- """
-
- def __init__(self, dofn, type_hints, label=None):
- super(TypeCheckWrapperDoFn, self).__init__()
- self._dofn = dofn
- self._label = label
- self._process_fn = self._dofn.process_argspec_fn()
- if type_hints.input_types:
- input_args, input_kwargs = type_hints.input_types
- self._input_hints = getcallargs_forhints(
- self._process_fn, *input_args, **input_kwargs)
- else:
- self._input_hints = None
- # TODO(robertwb): Actually extract this.
- self.context_var = 'context'
- # TODO(robertwb): Multi-output.
- self._output_type_hint = type_hints.simple_output_type(label)
-
- def start_bundle(self, context, *args, **kwargs):
- return self._type_check_result(
- self._dofn.start_bundle(context, *args, **kwargs))
-
- def finish_bundle(self, context, *args, **kwargs):
- return self._type_check_result(
- self._dofn.finish_bundle(context, *args, **kwargs))
-
- def process(self, context, *args, **kwargs):
- if self._input_hints:
- actual_inputs = inspect.getcallargs(
- self._process_fn, context.element, *args, **kwargs)
- for var, hint in self._input_hints.items():
- if hint is actual_inputs[var]:
- # self parameter
- continue
- var_name = var + '.element' if var == self.context_var else var
- _check_instance_type(hint, actual_inputs[var], var_name, True)
- return self._type_check_result(self._dofn.process(context, *args, **kwargs))
-
- def _type_check_result(self, transform_results):
- if self._output_type_hint is None or transform_results is None:
- return transform_results
-
- def type_check_output(o):
- # TODO(robertwb): Multi-output.
- x = o.value if isinstance(o, (SideOutputValue, WindowedValue)) else o
- self._type_check(self._output_type_hint, x, is_input=False)
-
- # If the return type is a generator, then we will need to interleave our
- # type-checking with its normal iteration so we don't deplete the
- # generator initially just by type-checking its yielded contents.
- if isinstance(transform_results, types.GeneratorType):
- return GeneratorWrapper(transform_results, type_check_output)
- else:
- for o in transform_results:
- type_check_output(o)
- return transform_results
-
- def _type_check(self, type_constraint, datum, is_input):
- """Typecheck a PTransform related datum according to a type constraint.
-
- This function is used to optionally type-check either an input or an output
- to a PTransform.
-
- Args:
- type_constraint: An instance of a typehints.TypeContraint, one of the
- white-listed builtin Python types, or a custom user class.
- datum: An instance of a Python object.
- is_input: True if 'datum' is an input to a PTransform's DoFn. False
- otherwise.
-
- Raises:
- TypeError: If 'datum' fails to type-check according to 'type_constraint'.
- """
- datum_type = 'input' if is_input else 'output'
-
- try:
- check_constraint(type_constraint, datum)
- except CompositeTypeHintError as e:
- raise TypeCheckError, e.message, sys.exc_info()[2]
- except SimpleTypeHintError:
- error_msg = ("According to type-hint expected %s should be of type %s. "
- "Instead, received '%s', an instance of type %s."
- % (datum_type, type_constraint, datum, type(datum)))
- raise TypeCheckError, error_msg, sys.exc_info()[2]
-
-
-class OutputCheckWrapperDoFn(DoFn):
- """A DoFn that verifies against common errors in the output type."""
-
- def __init__(self, dofn, full_label):
- self.dofn = dofn
- self.full_label = full_label
-
- def run(self, method, context, args, kwargs):
- try:
- result = method(context, *args, **kwargs)
- except TypeCheckError as e:
- error_msg = ('Runtime type violation detected within ParDo(%s): '
- '%s' % (self.full_label, e))
- raise TypeCheckError, error_msg, sys.exc_info()[2]
- else:
- return self._check_type(result)
-
- def start_bundle(self, context, *args, **kwargs):
- return self.run(self.dofn.start_bundle, context, args, kwargs)
-
- def finish_bundle(self, context, *args, **kwargs):
- return self.run(self.dofn.finish_bundle, context, args, kwargs)
-
- def process(self, context, *args, **kwargs):
- return self.run(self.dofn.process, context, args, kwargs)
-
- def _check_type(self, output):
- if output is None:
- return output
- elif isinstance(output, (dict, basestring)):
- object_type = type(output).__name__
- raise TypeCheckError('Returning a %s from a ParDo or FlatMap is '
- 'discouraged. Please use list("%s") if you really '
- 'want this behavior.' %
- (object_type, output))
- elif not isinstance(output, collections.Iterable):
- raise TypeCheckError('FlatMap and ParDo must return an '
- 'iterable. %s was returned instead.'
- % type(output))
- return output