You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:12:43 UTC

[08/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/combiners_test.py b/sdks/python/google/cloud/dataflow/transforms/combiners_test.py
deleted file mode 100644
index b8142ea..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/combiners_test.py
+++ /dev/null
@@ -1,225 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for our libraries of combine PTransforms."""
-
-import unittest
-
-import google.cloud.dataflow as df
-from google.cloud.dataflow.pipeline import Pipeline
-from google.cloud.dataflow.transforms import combiners
-import google.cloud.dataflow.transforms.combiners as combine
-from google.cloud.dataflow.transforms.core import CombineGlobally
-from google.cloud.dataflow.transforms.core import Create
-from google.cloud.dataflow.transforms.core import Map
-from google.cloud.dataflow.transforms.ptransform import PTransform
-from google.cloud.dataflow.transforms.util import assert_that, equal_to
-
-
-class CombineTest(unittest.TestCase):
-
-  def test_builtin_combines(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-
-    vals = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
-    mean = sum(vals) / float(len(vals))
-    size = len(vals)
-
-    # First for global combines.
-    pcoll = pipeline | Create('start', vals)
-    result_mean = pcoll | combine.Mean.Globally('mean')
-    result_count = pcoll | combine.Count.Globally('count')
-    assert_that(result_mean, equal_to([mean]), label='assert:mean')
-    assert_that(result_count, equal_to([size]), label='assert:size')
-
-    # Again for per-key combines.
-    pcoll = pipeline | Create('start-perkey', [('a', x) for x in vals])
-    result_key_mean = pcoll | combine.Mean.PerKey('mean-perkey')
-    result_key_count = pcoll | combine.Count.PerKey('count-perkey')
-    assert_that(result_key_mean, equal_to([('a', mean)]), label='key:mean')
-    assert_that(result_key_count, equal_to([('a', size)]), label='key:size')
-    pipeline.run()
-
-  def test_top(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-
-    # A parameter we'll be sharing with a custom comparator.
-    names = {0: 'zo',
-             1: 'one',
-             2: 'twoo',
-             3: 'three',
-             5: 'fiiive',
-             6: 'sssssix',
-             9: 'nniiinne'}
-
-    # First for global combines.
-    pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
-    result_top = pcoll | combine.Top.Largest('top', 5)
-    result_bot = pcoll | combine.Top.Smallest('bot', 4)
-    result_cmp = pcoll | combine.Top.Of(
-        'cmp',
-        6,
-        lambda a, b, names: len(names[a]) < len(names[b]),
-        names)  # Note parameter passed to comparator.
-    assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top')
-    assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot')
-    assert_that(result_cmp, equal_to([[9, 6, 6, 5, 3, 2]]), label='assert:cmp')
-
-    # Again for per-key combines.
-    pcoll = pipeline | Create(
-        'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
-    result_key_top = pcoll | combine.Top.LargestPerKey('top-perkey', 5)
-    result_key_bot = pcoll | combine.Top.SmallestPerKey('bot-perkey', 4)
-    result_key_cmp = pcoll | combine.Top.PerKey(
-        'cmp-perkey',
-        6,
-        lambda a, b, names: len(names[a]) < len(names[b]),
-        names)  # Note parameter passed to comparator.
-    assert_that(result_key_top, equal_to([('a', [9, 6, 6, 5, 3])]),
-                label='key:top')
-    assert_that(result_key_bot, equal_to([('a', [0, 1, 1, 1])]),
-                label='key:bot')
-    assert_that(result_key_cmp, equal_to([('a', [9, 6, 6, 5, 3, 2])]),
-                label='key:cmp')
-    pipeline.run()
-
-  def test_top_shorthands(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-
-    pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
-    result_top = pcoll | df.CombineGlobally('top', combiners.Largest(5))
-    result_bot = pcoll | df.CombineGlobally('bot', combiners.Smallest(4))
-    assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top')
-    assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot')
-
-    pcoll = pipeline | Create(
-        'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
-    result_ktop = pcoll | df.CombinePerKey('top-perkey', combiners.Largest(5))
-    result_kbot = pcoll | df.CombinePerKey('bot-perkey', combiners.Smallest(4))
-    assert_that(result_ktop, equal_to([('a', [9, 6, 6, 5, 3])]), label='k:top')
-    assert_that(result_kbot, equal_to([('a', [0, 1, 1, 1])]), label='k:bot')
-    pipeline.run()
-
-  def test_sample(self):
-
-    # First test global samples (lots of them).
-    for ix in xrange(300):
-      pipeline = Pipeline('DirectPipelineRunner')
-      pcoll = pipeline | Create('start', [1, 1, 2, 2])
-      result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3)
-      def matcher():
-        def match(actual):
-          # There is always exactly one result.
-          equal_to([1])([len(actual)])
-          # There are always exactly three samples in the result.
-          equal_to([3])([len(actual[0])])
-          # Sampling is without replacement.
-          num_ones = sum(1 for x in actual[0] if x == 1)
-          num_twos = sum(1 for x in actual[0] if x == 2)
-          equal_to([1, 2])([num_ones, num_twos])
-        return match
-      assert_that(result, matcher())
-      pipeline.run()
-
-    # Now test per-key samples.
-    pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | Create(
-        'start-perkey',
-        sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
-    result = pcoll | combine.Sample.FixedSizePerKey('sample', 3)
-    def matcher():
-      def match(actual):
-        for _, samples in actual:
-          equal_to([3])([len(samples)])
-          num_ones = sum(1 for x in samples if x == 1)
-          num_twos = sum(1 for x in samples if x == 2)
-          equal_to([1, 2])([num_ones, num_twos])
-      return match
-    assert_that(result, matcher())
-    pipeline.run()
-
-  def test_tuple_combine_fn(self):
-    p = Pipeline('DirectPipelineRunner')
-    result = (
-        p
-        | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
-        | df.CombineGlobally(combine.TupleCombineFn(max,
-                                                    combine.MeanCombineFn(),
-                                                    sum)).without_defaults())
-    assert_that(result, equal_to([('c', 111.0 / 3, 99.0)]))
-    p.run()
-
-  def test_tuple_combine_fn_without_defaults(self):
-    p = Pipeline('DirectPipelineRunner')
-    result = (
-        p
-        | Create([1, 1, 2, 3])
-        | df.CombineGlobally(
-            combine.TupleCombineFn(min, combine.MeanCombineFn(), max)
-            .with_common_input()).without_defaults())
-    assert_that(result, equal_to([(1, 7.0 / 4, 3)]))
-    p.run()
-
-  def test_to_list_and_to_dict(self):
-    pipeline = Pipeline('DirectPipelineRunner')
-    the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
-    pcoll = pipeline | Create('start', the_list)
-    result = pcoll | combine.ToList('to list')
-    def matcher(expected):
-      def match(actual):
-        equal_to(expected[0])(actual[0])
-      return match
-    assert_that(result, matcher([the_list]))
-    pipeline.run()
-
-    pipeline = Pipeline('DirectPipelineRunner')
-    pairs = [(1, 2), (3, 4), (5, 6)]
-    pcoll = pipeline | Create('start-pairs', pairs)
-    result = pcoll | combine.ToDict('to dict')
-    def matcher():
-      def match(actual):
-        equal_to([1])([len(actual)])
-        equal_to(pairs)(actual[0].iteritems())
-      return match
-    assert_that(result, matcher())
-    pipeline.run()
-
-  def test_combine_globally_with_default(self):
-    p = Pipeline('DirectPipelineRunner')
-    assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0]))
-    p.run()
-
-  def test_combine_globally_without_default(self):
-    p = Pipeline('DirectPipelineRunner')
-    result = p | Create([]) | CombineGlobally(sum).without_defaults()
-    assert_that(result, equal_to([]))
-    p.run()
-
-  def test_combine_globally_with_default_side_input(self):
-    class CombineWithSideInput(PTransform):
-      def apply(self, pcoll):
-        side = pcoll | CombineGlobally(sum).as_singleton_view()
-        main = pcoll.pipeline | Create([None])
-        return main | Map(lambda _, s: s, side)
-
-    p = Pipeline('DirectPipelineRunner')
-    result1 = p | Create('label1', []) | CombineWithSideInput('L1')
-    result2 = p | Create('label2', [1, 2, 3, 4]) | CombineWithSideInput('L2')
-    assert_that(result1, equal_to([0]), label='r1')
-    assert_that(result2, equal_to([10]), label='r2')
-    p.run()
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/core.py b/sdks/python/google/cloud/dataflow/transforms/core.py
deleted file mode 100644
index 6db0099..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/core.py
+++ /dev/null
@@ -1,1292 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Core PTransform subclasses, such as FlatMap, GroupByKey, and Map."""
-
-from __future__ import absolute_import
-
-import copy
-import uuid
-
-from google.cloud.dataflow import pvalue
-from google.cloud.dataflow import typehints
-from google.cloud.dataflow.coders import typecoders
-from google.cloud.dataflow.internal import util
-from google.cloud.dataflow.pvalue import AsIter
-from google.cloud.dataflow.pvalue import AsSingleton
-from google.cloud.dataflow.transforms import ptransform
-from google.cloud.dataflow.transforms import window
-from google.cloud.dataflow.transforms.ptransform import PTransform
-from google.cloud.dataflow.transforms.ptransform import ptransform_fn
-from google.cloud.dataflow.transforms.ptransform import PTransformWithSideInputs
-from google.cloud.dataflow.transforms.window import MIN_TIMESTAMP
-from google.cloud.dataflow.transforms.window import OutputTimeFn
-from google.cloud.dataflow.transforms.window import WindowedValue
-from google.cloud.dataflow.transforms.window import WindowFn
-from google.cloud.dataflow.typehints import Any
-from google.cloud.dataflow.typehints import get_type_hints
-from google.cloud.dataflow.typehints import is_consistent_with
-from google.cloud.dataflow.typehints import Iterable
-from google.cloud.dataflow.typehints import KV
-from google.cloud.dataflow.typehints import trivial_inference
-from google.cloud.dataflow.typehints import TypeCheckError
-from google.cloud.dataflow.typehints import Union
-from google.cloud.dataflow.typehints import WithTypeHints
-from google.cloud.dataflow.typehints.trivial_inference import element_type
-from google.cloud.dataflow.utils.options import TypeOptions
-
-
-class DoFnProcessContext(object):
-  """A processing context passed to DoFn methods during execution.
-
-  Most importantly, a DoFn.process method will access context.element
-  to get the element it is supposed to process.
-
-  Attributes:
-    label: label of the ParDo whose element is being processed.
-    element: element being processed
-      (in process method only; always None in start_bundle and finish_bundle)
-    timestamp: timestamp of the element
-      (in process method only; always None in start_bundle and finish_bundle)
-    windows: windows of the element
-      (in process method only; always None in start_bundle and finish_bundle)
-    state: a DoFnState object, which holds the runner's internal state
-      for this element.  For example, aggregator state is here.
-      Not used by the pipeline code.
-  """
-
-  def __init__(self, label, element=None, state=None):
-    """Initialize a processing context object with an element and state.
-
-    The element represents one value from a PCollection that will be accessed
-    by a DoFn object during pipeline execution, and state is an arbitrary object
-    where counters and other pipeline state information can be passed in.
-
-    DoFnProcessContext objects are also used as inputs to PartitionFn instances.
-
-    Args:
-      label: label of the PCollection whose element is being processed.
-      element: element of a PCollection being processed using this context.
-      state: a DoFnState object with state to be passed in to the DoFn object.
-    """
-    self.label = label
-    self.state = state
-    if element is not None:
-      self.set_element(element)
-
-  def set_element(self, windowed_value):
-    if windowed_value is None:
-      # Not currently processing an element.
-      if hasattr(self, 'element'):
-        del self.element
-        del self.timestamp
-        del self.windows
-    else:
-      self.element = windowed_value.value
-      self.timestamp = windowed_value.timestamp
-      self.windows = windowed_value.windows
-
-  def aggregate_to(self, aggregator, input_value):
-    """Provide a new input value for the aggregator.
-
-    Args:
-      aggregator: the aggregator to update
-      input_value: the new value to input to the combine_fn of this aggregator.
-    """
-    self.state.counter_for(aggregator).update(input_value)
-
-
-class DoFn(WithTypeHints):
-  """A function object used by a transform with custom processing.
-
-  The ParDo transform is such a transform. The ParDo.apply
-  method will take an object of type DoFn and apply it to all elements of a
-  PCollection object.
-
-  In order to have concrete DoFn objects one has to subclass from DoFn and
-  define the desired behavior (start_bundle/finish_bundle and process) or wrap a
-  callable object using the CallableWrapperDoFn class.
-  """
-
-  def default_label(self):
-    return self.__class__.__name__
-
-  def infer_output_type(self, input_type):
-    # TODO(robertwb): Side inputs types.
-    # TODO(robertwb): Assert compatibility with input type hint?
-    return self._strip_output_annotations(
-        trivial_inference.infer_return_type(self.process, [input_type]))
-
-  def start_bundle(self, context, *args, **kwargs):
-    """Called before a bundle of elements is processed on a worker.
-
-    Elements to be processed are split into bundles and distributed
-    to workers.  Before a worker calls process() on the first element
-    of its bundle, it calls this method.
-
-    Args:
-      context: a DoFnProcessContext object
-      *args: side inputs
-      **kwargs: keyword side inputs
-
-    """
-    pass
-
-  def finish_bundle(self, context, *args, **kwargs):
-    """Called after a bundle of elements is processed on a worker.
-
-    Args:
-      context: a DoFnProcessContext object
-      *args: side inputs
-      **kwargs: keyword side inputs
-    """
-    pass
-
-  def process(self, context, *args, **kwargs):
-    """Called for each element of a pipeline.
-
-    Args:
-      context: a DoFnProcessContext object containing, among other
-        attributes, the element to be processed.
-        See the DoFnProcessContext documentation for details.
-      *args: side inputs
-      **kwargs: keyword side inputs
-    """
-    raise NotImplementedError
-
-  @staticmethod
-  def from_callable(fn):
-    return CallableWrapperDoFn(fn)
-
-  def process_argspec_fn(self):
-    """Returns the Python callable that will eventually be invoked.
-
-    This should ideally be the user-level function that is called with
-    the main and (if any) side inputs, and is used to relate the type
-    hint parameters with the input parameters (e.g., by argument name).
-    """
-    return self.process
-
-  def _strip_output_annotations(self, type_hint):
-    annotations = (window.TimestampedValue, window.WindowedValue,
-                   pvalue.SideOutputValue)
-    # TODO(robertwb): These should be parameterized types that the
-    # type inferencer understands.
-    if (type_hint in annotations
-        or trivial_inference.element_type(type_hint) in annotations):
-      return Any
-    else:
-      return type_hint
-
-
-class CallableWrapperDoFn(DoFn):
-  """A DoFn (function) object wrapping a callable object.
-
-  The purpose of this class is to conveniently wrap simple functions and use
-  them in transforms.
-  """
-
-  def __init__(self, fn):
-    """Initializes a CallableWrapperDoFn object wrapping a callable.
-
-    Args:
-      fn: A callable object.
-
-    Raises:
-      TypeError: if fn parameter is not a callable type.
-    """
-    if not callable(fn):
-      raise TypeError('Expected a callable object instead of: %r' % fn)
-
-    self._fn = fn
-
-    super(CallableWrapperDoFn, self).__init__()
-
-  def __repr__(self):
-    return 'CallableWrapperDoFn(%s)' % self._fn
-
-  def default_type_hints(self):
-    type_hints = get_type_hints(self._fn)
-    # If the fn was a DoFn annotated with a type-hint that hinted a return
-    # type compatible with Iterable[Any], then we strip off the outer
-    # container type due to the 'flatten' portion of FlatMap.
-    # TODO(robertwb): Should we require an iterable specification for FlatMap?
-    if type_hints.output_types:
-      args, kwargs = type_hints.output_types
-      if len(args) == 1 and is_consistent_with(args[0], Iterable[Any]):
-        type_hints = type_hints.copy()
-        type_hints.set_output_types(element_type(args[0]), **kwargs)
-    return type_hints
-
-  def infer_output_type(self, input_type):
-    return self._strip_output_annotations(
-        trivial_inference.infer_return_type(self._fn, [input_type]))
-
-  def process(self, context, *args, **kwargs):
-    return self._fn(context.element, *args, **kwargs)
-
-  def process_argspec_fn(self):
-    return getattr(self._fn, '_argspec_fn', self._fn)
-
-
-class CombineFn(WithTypeHints):
-  """A function object used by a Combine transform with custom processing.
-
-  A CombineFn specifies how multiple values in all or part of a PCollection can
-  be merged into a single value---essentially providing the same kind of
-  information as the arguments to the Python "reduce" builtin (except for the
-  input argument, which is an instance of CombineFnProcessContext). The
-  combining process proceeds as follows:
-
-  1. Input values are partitioned into one or more batches.
-  2. For each batch, the create_accumulator method is invoked to create a fresh
-     initial "accumulator" value representing the combination of zero values.
-  3. For each input value in the batch, the add_inputs method is invoked to
-     combine more values with the accumulator for that batch.
-  4. The merge_accumulators method is invoked to combine accumulators from
-     separate batches into a single combined output accumulator value, once all
-     of the accumulators have had all the input value in their batches added to
-     them. This operation is invoked repeatedly, until there is only one
-     accumulator value left.
-  5. The extract_output operation is invoked on the final accumulator to get
-     the output value.
-  """
-
-  def default_label(self):
-    return self.__class__.__name__
-
-  def create_accumulator(self, *args, **kwargs):
-    """Return a fresh, empty accumulator for the combine operation.
-
-    Args:
-      *args: Additional arguments and side inputs.
-      **kwargs: Additional arguments and side inputs.
-    """
-    raise NotImplementedError(str(self))
-
-  def add_input(self, accumulator, element, *args, **kwargs):
-    """Return result of folding element into accumulator.
-
-    CombineFn implementors must override either add_input or add_inputs.
-
-    Args:
-      accumulator: the current accumulator
-      element: the element to add
-      *args: Additional arguments and side inputs.
-      **kwargs: Additional arguments and side inputs.
-    """
-    raise NotImplementedError(str(self))
-
-  def add_inputs(self, accumulator, elements, *args, **kwargs):
-    """Returns the result of folding each element in elements into accumulator.
-
-    This is provided in case the implementation affords more efficient
-    bulk addition of elements. The default implementation simply loops
-    over the inputs invoking add_input for each one.
-
-    Args:
-      accumulator: the current accumulator
-      elements: the elements to add
-      *args: Additional arguments and side inputs.
-      **kwargs: Additional arguments and side inputs.
-    """
-    for element in elements:
-      accumulator = self.add_input(accumulator, element, *args, **kwargs)
-    return accumulator
-
-  def merge_accumulators(self, accumulators, *args, **kwargs):
-    """Returns the result of merging several accumulators
-    to a single accumulator value.
-
-    Args:
-      accumulators: the accumulators to merge
-      *args: Additional arguments and side inputs.
-      **kwargs: Additional arguments and side inputs.
-    """
-    raise NotImplementedError(str(self))
-
-  def extract_output(self, accumulator, *args, **kwargs):
-    """Return result of converting accumulator into the output value.
-
-    Args:
-      accumulator: the final accumulator value computed by this CombineFn
-        for the entire input key or PCollection.
-      *args: Additional arguments and side inputs.
-      **kwargs: Additional arguments and side inputs.
-    """
-    raise NotImplementedError(str(self))
-
-  def apply(self, elements, *args, **kwargs):
-    """Returns result of applying this CombineFn to the input values.
-
-    Args:
-      elements: the set of values to combine.
-      *args: Additional arguments and side inputs.
-      **kwargs: Additional arguments and side inputs.
-    """
-    return self.extract_output(
-        self.add_inputs(
-            self.create_accumulator(*args, **kwargs), elements,
-            *args, **kwargs),
-        *args, **kwargs)
-
-  def for_input_type(self, input_type):
-    """Returns a specialized implementation of self, if it exists.
-
-    Otherwise, returns self.
-
-    Args:
-      input_type: the type of input elements.
-    """
-    return self
-
-  @staticmethod
-  def from_callable(fn):
-    return CallableWrapperCombineFn(fn)
-
-  @staticmethod
-  def maybe_from_callable(fn):
-    return fn if isinstance(fn, CombineFn) else CallableWrapperCombineFn(fn)
-
-
-class CallableWrapperCombineFn(CombineFn):
-  """A CombineFn (function) object wrapping a callable object.
-
-  The purpose of this class is to conveniently wrap simple functions and use
-  them in Combine transforms.
-  """
-  _EMPTY = object()
-
-  def __init__(self, fn):
-    """Initializes a CallableFn object wrapping a callable.
-
-    Args:
-      fn: A callable object that reduces elements of an iterable to a single
-        value (like the builtins sum and max). This callable must be capable of
-        receiving the kind of values it generates as output in its input, and
-        for best results, its operation must be commutative and associative.
-
-    Raises:
-      TypeError: if fn parameter is not a callable type.
-    """
-    if not callable(fn):
-      raise TypeError('Expected a callable object instead of: %r' % fn)
-
-    super(CallableWrapperCombineFn, self).__init__()
-    self._fn = fn
-
-  def __repr__(self):
-    return "CallableWrapperCombineFn(%s)" % self._fn
-
-  def create_accumulator(self, *args, **kwargs):
-    return self._EMPTY
-
-  def add_input(self, accumulator, element, *args, **kwargs):
-    if accumulator is self._EMPTY:
-      return element
-    else:
-      return self._fn([accumulator, element], *args, **kwargs)
-
-  def add_inputs(self, accumulator, elements, *args, **kwargs):
-    if accumulator is self._EMPTY:
-      return self._fn(elements, *args, **kwargs)
-    elif isinstance(elements, (list, tuple)):
-      return self._fn([accumulator] + elements, *args, **kwargs)
-    else:
-      def union():
-        yield accumulator
-        for e in elements:
-          yield e
-      return self._fn(union(), *args, **kwargs)
-
-  def merge_accumulators(self, accumulators, *args, **kwargs):
-    # It's (weakly) assumed that self._fn is associative.
-    return self._fn(accumulators, *args, **kwargs)
-
-  def extract_output(self, accumulator, *args, **kwargs):
-    return self._fn(()) if accumulator is self._EMPTY else accumulator
-
-  def default_type_hints(self):
-    fn_hints = get_type_hints(self._fn)
-    if fn_hints.input_types is None:
-      return fn_hints
-    else:
-      # fn(Iterable[V]) -> V becomes CombineFn(V) -> V
-      input_args, input_kwargs = fn_hints.input_types
-      if not input_args:
-        if len(input_kwargs) == 1:
-          input_args, input_kwargs = tuple(input_kwargs.values()), {}
-        else:
-          raise TypeError('Combiner input type must be specified positionally.')
-      if not is_consistent_with(input_args[0], Iterable[Any]):
-        raise TypeCheckError(
-            'All functions for a Combine PTransform must accept a '
-            'single argument compatible with: Iterable[Any]. '
-            'Instead a function with input type: %s was received.'
-             % input_args[0])
-      input_args = (element_type(input_args[0]),) + input_args[1:]
-      # TODO(robertwb): Assert output type is consistent with input type?
-      hints = fn_hints.copy()
-      hints.set_input_types(*input_args, **input_kwargs)
-      return hints
-
-  def for_input_type(self, input_type):
-    # Avoid circular imports.
-    from google.cloud.dataflow.transforms import cy_combiners
-    if self._fn is any:
-      return cy_combiners.AnyCombineFn()
-    elif self._fn is all:
-      return cy_combiners.AllCombineFn()
-    else:
-      known_types = {
-          (sum, int): cy_combiners.SumInt64Fn(),
-          (min, int): cy_combiners.MinInt64Fn(),
-          (max, int): cy_combiners.MaxInt64Fn(),
-          (sum, float): cy_combiners.SumFloatFn(),
-          (min, float): cy_combiners.MinFloatFn(),
-          (max, float): cy_combiners.MaxFloatFn(),
-      }
-    return known_types.get((self._fn, input_type), self)
-
-
-class PartitionFn(WithTypeHints):
-  """A function object used by a Partition transform.
-
-  A PartitionFn specifies how individual values in a PCollection will be placed
-  into separate partitions, indexed by an integer.
-  """
-
-  def default_label(self):
-    return self.__class__.__name__
-
-  def partition_for(self, context, num_partitions, *args, **kwargs):
-    """Specify which partition will receive this element.
-
-    Args:
-      context: A DoFnProcessContext containing an element of the
-        input PCollection.
-      num_partitions: Number of partitions, i.e., output PCollections.
-      *args: optional parameters and side inputs.
-      **kwargs: optional parameters and side inputs.
-
-    Returns:
-      An integer in [0, num_partitions).
-    """
-    pass
-
-
-class CallableWrapperPartitionFn(PartitionFn):
-  """A PartitionFn object wrapping a callable object.
-
-  Instances of this class wrap simple functions for use in Partition operations.
-  """
-
-  def __init__(self, fn):
-    """Initializes a PartitionFn object wrapping a callable.
-
-    Args:
-      fn: A callable object, which should accept the following arguments:
-            element - element to assign to a partition.
-            num_partitions - number of output partitions.
-          and may accept additional arguments and side inputs.
-
-    Raises:
-      TypeError: if fn is not a callable type.
-    """
-    if not callable(fn):
-      raise TypeError('Expected a callable object instead of: %r' % fn)
-    self._fn = fn
-
-  def partition_for(self, context, num_partitions, *args, **kwargs):
-    return self._fn(context.element, num_partitions, *args, **kwargs)
-
-
-class ParDo(PTransformWithSideInputs):
-  """A ParDo transform.
-
-  Processes an input PCollection by applying a DoFn to each element and
-  returning the accumulated results into an output PCollection. The type of the
-  elements is not fixed as long as the DoFn can deal with it. In reality
-  the type is restrained to some extent because the elements sometimes must be
-  persisted to external storage. See the apply() method comments for a detailed
-  description of all possible arguments.
-
-  Note that the DoFn must return an iterable for each element of the input
-  PCollection.  An easy way to do this is to use the yield keyword in the
-  process method.
-
-  Args:
-      label: name of this transform instance. Useful while monitoring and
-        debugging a pipeline execution.
-      pcoll: a PCollection to be processed.
-      dofn: a DoFn object to be applied to each element of pcoll argument.
-      *args: positional arguments passed to the dofn object.
-      **kwargs:  keyword arguments passed to the dofn object.
-
-  Note that the positional and keyword arguments will be processed in order
-  to detect PCollections that will be computed as side inputs to the
-  transform. During pipeline execution whenever the DoFn object gets executed
-  (its apply() method gets called) the PCollection arguments will be replaced
-  by values from the PCollection in the exact positions where they appear in
-  the argument lists.
-  """
-
-  def __init__(self, fn_or_label, *args, **kwargs):
-    super(ParDo, self).__init__(fn_or_label, *args, **kwargs)
-
-    if not isinstance(self.fn, DoFn):
-      raise TypeError('ParDo must be called with a DoFn instance.')
-
-  def default_type_hints(self):
-    return self.fn.get_type_hints()
-
-  def infer_output_type(self, input_type):
-    return trivial_inference.element_type(
-        self.fn.infer_output_type(input_type))
-
-  def make_fn(self, fn):
-    return fn if isinstance(fn, DoFn) else CallableWrapperDoFn(fn)
-
-  def process_argspec_fn(self):
-    return self.fn.process_argspec_fn()
-
-  def apply(self, pcoll):
-    self.side_output_tags = set()
-    # TODO(robertwb): Change all uses of the dofn attribute to use fn instead.
-    self.dofn = self.fn
-    return pvalue.PCollection(pcoll.pipeline)
-
-  def with_outputs(self, *tags, **main_kw):
-    """Returns a tagged tuple allowing access to the outputs of a ParDo.
-
-    The resulting object supports access to the
-    PCollection associated with a tag (e.g., o.tag, o[tag]) and iterating over
-    the available tags (e.g., for tag in o: ...).
-
-    Args:
-      *tags: if non-empty, list of valid tags. If a list of valid tags is given,
-        it will be an error to use an undeclared tag later in the pipeline.
-      **main_kw: dictionary empty or with one key 'main' defining the tag to be
-        used for the main output (which will not have a tag associated with it).
-
-    Returns:
-      An object of type DoOutputsTuple that bundles together all the outputs
-      of a ParDo transform and allows accessing the individual
-      PCollections for each output using an object.tag syntax.
-
-    Raises:
-      TypeError: if the self object is not a PCollection that is the result of
-        a ParDo transform.
-      ValueError: if main_kw contains any key other than 'main'.
-    """
-    main_tag = main_kw.pop('main', None)
-    if main_kw:
-      raise ValueError('Unexpected keyword arguments: %s' % main_kw.keys())
-    return _MultiParDo(self, tags, main_tag)
-
-
-class _MultiParDo(PTransform):
-
-  def __init__(self, do_transform, tags, main_tag):
-    super(_MultiParDo, self).__init__(do_transform.label)
-    self._do_transform = do_transform
-    self._tags = tags
-    self._main_tag = main_tag
-
-  def apply(self, pcoll):
-    _ = pcoll | self._do_transform
-    return pvalue.DoOutputsTuple(
-        pcoll.pipeline, self._do_transform, self._tags, self._main_tag)
-
-
-def FlatMap(fn_or_label, *args, **kwargs):  # pylint: disable=invalid-name
-  """FlatMap is like ParDo except it takes a callable to specify the
-  transformation.
-
-  The callable must return an iterable for each element of the input
-  PCollection.  The elements of these iterables will be flattened into
-  the output PCollection.
-
-  Args:
-    fn_or_label: name of this transform instance. Useful while monitoring and
-      debugging a pipeline execution.
-    *args: positional arguments passed to the transform callable.
-    **kwargs: keyword arguments passed to the transform callable.
-
-  Returns:
-    A PCollection containing the Map outputs.
-
-  Raises:
-    TypeError: If the fn passed as argument is not a callable. Typical error
-      is to pass a DoFn instance which is supported only for ParDo.
-  """
-  if fn_or_label is None or isinstance(fn_or_label, str):
-    label, fn, args = fn_or_label, args[0], args[1:]
-  else:
-    label, fn = None, fn_or_label
-  if not callable(fn):
-    raise TypeError(
-        'FlatMap can be used only with callable objects. '
-        'Received %r instead for %s argument.'
-        % (fn, 'first' if label is None else 'second'))
-
-  if label is None:
-    label = 'FlatMap(%s)' % ptransform.label_from_callable(fn)
-
-  return ParDo(label, CallableWrapperDoFn(fn), *args, **kwargs)
-
-
-def Map(fn_or_label, *args, **kwargs):  # pylint: disable=invalid-name
-  """Map is like FlatMap except its callable returns only a single element.
-
-  Args:
-    fn_or_label: name of this transform instance. Useful while monitoring and
-      debugging a pipeline execution.
-    *args: positional arguments passed to the transform callable.
-    **kwargs: keyword arguments passed to the transform callable.
-
-  Returns:
-    A PCollection containing the Map outputs.
-
-  Raises:
-    TypeError: If the fn passed as argument is not a callable. Typical error
-      is to pass a DoFn instance which is supported only for ParDo.
-  """
-  if isinstance(fn_or_label, str):
-    label, fn, args = fn_or_label, args[0], args[1:]
-  else:
-    label, fn = None, fn_or_label
-  if not callable(fn):
-    raise TypeError(
-        'Map can be used only with callable objects. '
-        'Received %r instead for %s argument.'
-        % (fn, 'first' if label is None else 'second'))
-  wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
-
-  # Proxy the type-hint information from the original function to this new
-  # wrapped function.
-  get_type_hints(wrapper).input_types = get_type_hints(fn).input_types
-  output_hint = get_type_hints(fn).simple_output_type(label)
-  if output_hint:
-    get_type_hints(wrapper).set_output_types(typehints.Iterable[output_hint])
-  # pylint: disable=protected-access
-  wrapper._argspec_fn = fn
-  # pylint: enable=protected-access
-
-  if label is None:
-    label = 'Map(%s)' % ptransform.label_from_callable(fn)
-
-  return FlatMap(label, wrapper, *args, **kwargs)
-
-
-def Filter(fn_or_label, *args, **kwargs):  # pylint: disable=invalid-name
-  """Filter is a FlatMap with its callable filtering out elements.
-
-  Args:
-    fn_or_label: name of this transform instance. Useful while monitoring and
-      debugging a pipeline execution.
-    *args: positional arguments passed to the transform callable.
-    **kwargs: keyword arguments passed to the transform callable.
-
-  Returns:
-    A PCollection containing the Filter outputs.
-
-  Raises:
-    TypeError: If the fn passed as argument is not a callable. Typical error
-      is to pass a DoFn instance which is supported only for FlatMap.
-  """
-  if isinstance(fn_or_label, str):
-    label, fn, args = fn_or_label, args[0], args[1:]
-  else:
-    label, fn = None, fn_or_label
-  if not callable(fn):
-    raise TypeError(
-        'Filter can be used only with callable objects. '
-        'Received %r instead for %s argument.'
-        % (fn, 'first' if label is None else 'second'))
-  wrapper = lambda x, *args, **kwargs: [x] if fn(x, *args, **kwargs) else []
-
-  # Proxy the type-hint information from the function being wrapped, setting the
-  # output type to be the same as the input type.
-  get_type_hints(wrapper).input_types = get_type_hints(fn).input_types
-  output_hint = get_type_hints(fn).simple_output_type(label)
-  if (output_hint is None
-      and get_type_hints(wrapper).input_types
-      and get_type_hints(wrapper).input_types[0]):
-    output_hint = get_type_hints(wrapper).input_types[0]
-  if output_hint:
-    get_type_hints(wrapper).set_output_types(typehints.Iterable[output_hint])
-  # pylint: disable=protected-access
-  wrapper._argspec_fn = fn
-  # pylint: enable=protected-access
-
-  if label is None:
-    label = 'Filter(%s)' % ptransform.label_from_callable(fn)
-
-  return FlatMap(label, wrapper, *args, **kwargs)
-
-
-class CombineGlobally(PTransform):
-  """A CombineGlobally transform.
-
-  Reduces a PCollection to a single value by progressively applying a CombineFn
-  to portions of the PCollection (and to intermediate values created thereby).
-  See documentation in CombineFn for details on the specifics on how CombineFns
-  are applied.
-
-  Args:
-    label: name of this transform instance. Useful while monitoring and
-      debugging a pipeline execution.
-    pcoll: a PCollection to be reduced into a single value.
-    fn: a CombineFn object that will be called to progressively reduce the
-      PCollection into single values, or a callable suitable for wrapping
-      by CallableWrapperCombineFn.
-    *args: positional arguments passed to the CombineFn object.
-    **kwargs: keyword arguments passed to the CombineFn object.
-
-  Raises:
-    TypeError: If the output type of the input PCollection is not compatible
-      with Iterable[A].
-
-  Returns:
-    A single-element PCollection containing the main output of the Combine
-    transform.
-
-  Note that the positional and keyword arguments will be processed in order
-  to detect PObjects that will be computed as side inputs to the transform.
-  During pipeline execution whenever the CombineFn object gets executed (i.e.,
-  any of the CombineFn methods get called), the PObject arguments will be
-  replaced by their actual value in the exact position where they appear in
-  the argument lists.
-  """
-  has_defaults = True
-  as_view = False
-
-  def __init__(self, label_or_fn, *args, **kwargs):
-    if label_or_fn is None or isinstance(label_or_fn, str):
-      label, fn, args = label_or_fn, args[0], args[1:]
-    else:
-      label, fn = None, label_or_fn
-
-    super(CombineGlobally, self).__init__(label)
-    self.fn = fn
-    self.args = args
-    self.kwargs = kwargs
-
-  def default_label(self):
-    return 'CombineGlobally(%s)' % ptransform.label_from_callable(self.fn)
-
-  def clone(self, **extra_attributes):
-    clone = copy.copy(self)
-    clone.__dict__.update(extra_attributes)
-    return clone
-
-  def with_defaults(self, has_defaults=True):
-    return self.clone(has_defaults=has_defaults)
-
-  def without_defaults(self):
-    return self.with_defaults(False)
-
-  def as_singleton_view(self):
-    return self.clone(as_view=True)
-
-  def apply(self, pcoll):
-    def add_input_types(transform):
-      type_hints = self.get_type_hints()
-      if type_hints.input_types:
-        return transform.with_input_types(type_hints.input_types[0][0])
-      else:
-        return transform
-
-    combined = (pcoll
-            | add_input_types(Map('KeyWithVoid', lambda v: (None, v))
-               .with_output_types(KV[None, pcoll.element_type]))
-            | CombinePerKey('CombinePerKey', self.fn, *self.args, **self.kwargs)
-            | Map('UnKey', lambda (k, v): v))
-
-    if not self.has_defaults and not self.as_view:
-      return combined
-
-    if self.has_defaults:
-      combine_fn = (
-          self.fn if isinstance(self.fn, CombineFn)
-          else CombineFn.from_callable(self.fn))
-      default_value = combine_fn.apply([], *self.args, **self.kwargs)
-    else:
-      default_value = pvalue._SINGLETON_NO_DEFAULT  # pylint: disable=protected-access
-    view = pvalue.AsSingleton(combined, default_value=default_value)
-    if self.as_view:
-      return view
-    else:
-      if pcoll.windowing.windowfn != window.GlobalWindows():
-        raise ValueError(
-            "Default values are not yet supported in CombineGlobally() if the "
-            "output  PCollection is not windowed by GlobalWindows. "
-            "Instead, use CombineGlobally().without_defaults() to output "
-            "an empty PCollection if the input PCollection is empty, "
-            "or CombineGlobally().as_singleton_view() to get the default "
-            "output of the CombineFn if the input PCollection is empty.")
-      def typed(transform):
-        # TODO(robertwb): We should infer this.
-        if combined.element_type:
-          return transform.with_output_types(combined.element_type)
-        else:
-          return transform
-      return (pcoll.pipeline
-              | Create('DoOnce', [None])
-              | typed(Map('InjectDefault', lambda _, s: s, view)))
-
-
-@ptransform_fn
-def CombinePerKey(label, pcoll, fn, *args, **kwargs):  # pylint: disable=invalid-name
-  """A per-key Combine transform.
-
-  Identifies sets of values associated with the same key in the input
-  PCollection, then applies a CombineFn to condense those sets to single
-  values. See documentation in CombineFn for details on the specifics on how
-  CombineFns are applied.
-
-  Args:
-    label: name of this transform instance. Useful while monitoring and
-      debugging a pipeline execution.
-    pcoll: input pcollection.
-    fn: instance of CombineFn to apply to all values under the same key in
-      pcoll, or a callable whose signature is f(iterable, *args, **kwargs)
-      (e.g., sum, max).
-    *args: arguments and side inputs, passed directly to the CombineFn.
-    **kwargs: arguments and side inputs, passed directly to the CombineFn.
-
-  Returns:
-    A PObject holding the result of the combine operation.
-  """
-  return pcoll | GroupByKey() | CombineValues('Combine', fn, *args, **kwargs)
-
-
-# TODO(robertwb): Rename to CombineGroupedValues?
-class CombineValues(PTransformWithSideInputs):
-
-  def make_fn(self, fn):
-    return fn if isinstance(fn, CombineFn) else CombineFn.from_callable(fn)
-
-  def apply(self, pcoll):
-    args, kwargs = util.insert_values_in_args(
-      self.args, self.kwargs, self.side_inputs)
-
-    input_type = pcoll.element_type
-    key_type = None
-    if input_type is not None:
-      key_type, _ = input_type.tuple_types
-
-    runtime_type_check = (
-        pcoll.pipeline.options is not None and
-        pcoll.pipeline.options.view_as(TypeOptions).runtime_type_check)
-    return pcoll | ParDo(
-        CombineValuesDoFn(key_type, self.fn, runtime_type_check),
-        *args, **kwargs)
-
-
-class CombineValuesDoFn(DoFn):
-  """DoFn for performing per-key Combine transforms."""
-
-  def __init__(self, input_pcoll_type, combinefn, runtime_type_check):
-    super(CombineValuesDoFn, self).__init__()
-    self.combinefn = combinefn
-    self.runtime_type_check = runtime_type_check
-
-  def process(self, p_context, *args, **kwargs):
-    # Expected elements input to this DoFn are 2-tuples of the form
-    # (key, iter), with iter an iterable of all the values associated with key
-    # in the input PCollection.
-    if self.runtime_type_check:
-      # Apply the combiner in a single operation rather than artificially
-      # breaking it up so that output type violations manifest as TypeCheck
-      # errors rather than type errors.
-      return [
-          (p_context.element[0],
-           self.combinefn.apply(p_context.element[1], *args, **kwargs))]
-    else:
-      # Add the elements into three accumulators (for testing of merge).
-      elements = p_context.element[1]
-      accumulators = []
-      for k in range(3):
-        if len(elements) <= k:
-          break
-        accumulators.append(
-            self.combinefn.add_inputs(
-                self.combinefn.create_accumulator(*args, **kwargs),
-                elements[k::3],
-                *args, **kwargs))
-      # Merge the accumulators.
-      accumulator = self.combinefn.merge_accumulators(
-          accumulators, *args, **kwargs)
-      # Convert accumulator to the final result.
-      return [(p_context.element[0],
-               self.combinefn.extract_output(accumulator, *args, **kwargs))]
-
-  def default_type_hints(self):
-    hints = self.combinefn.get_type_hints().copy()
-    if hints.input_types:
-      K = typehints.TypeVariable('K')
-      args, kwargs = hints.input_types
-      args = (typehints.Tuple[K, typehints.Iterable[args[0]]],) + args[1:]
-      hints.set_input_types(*args, **kwargs)
-    else:
-      K = typehints.Any
-    if hints.output_types:
-      main_output_type = hints.simple_output_type('')
-      hints.set_output_types(typehints.Tuple[K, main_output_type])
-    return hints
-
-
-K = typehints.TypeVariable('K')
-V = typehints.TypeVariable('V')
-@typehints.with_input_types(typehints.KV[K, V])
-@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
-class GroupByKey(PTransform):
-  """A group by key transform.
-
-  Processes an input PCollection consisting of key/value pairs represented as a
-  tuple pair. The result is a PCollection where values having a common key are
-  grouped together.  For example (a, 1), (b, 2), (a, 3) will result into
-  (a, [1, 3]), (b, [2]).
-
-  The implementation here is used only when run on the local direct runner.
-  """
-
-  class ReifyWindows(DoFn):
-
-    def process(self, context):
-      try:
-        k, v = context.element
-      except TypeError:
-        raise TypeCheckError('Input to GroupByKey must be a PCollection with '
-                             'elements compatible with KV[A, B]')
-
-      return [(k, window.WindowedValue(v, context.timestamp, context.windows))]
-
-    def infer_output_type(self, input_type):
-      key_type, value_type = trivial_inference.key_value_types(input_type)
-      return Iterable[KV[key_type, typehints.WindowedValue[value_type]]]
-
-  class GroupAlsoByWindow(DoFn):
-    # TODO(robertwb): Support combiner lifting.
-
-    def __init__(self, windowing):
-      super(GroupByKey.GroupAlsoByWindow, self).__init__()
-      self.windowing = windowing
-
-    def infer_output_type(self, input_type):
-      key_type, windowed_value_iter_type = trivial_inference.key_value_types(
-          input_type)
-      value_type = windowed_value_iter_type.inner_type.inner_type
-      return Iterable[KV[key_type, Iterable[value_type]]]
-
-    def process(self, context):
-      k, vs = context.element
-      # pylint: disable=g-import-not-at-top
-      from google.cloud.dataflow.transforms.trigger import InMemoryUnmergedState
-      from google.cloud.dataflow.transforms.trigger import create_trigger_driver
-      # pylint: enable=g-import-not-at-top
-      driver = create_trigger_driver(self.windowing, True)
-      state = InMemoryUnmergedState()
-      # TODO(robertwb): Conditionally process in smaller chunks.
-      for wvalue in driver.process_elements(state, vs, MIN_TIMESTAMP):
-        yield wvalue.with_value((k, wvalue.value))
-      while state.timers:
-        fired = state.get_and_clear_timers()
-        for timer_window, (name, time_domain, fire_time) in fired:
-          for wvalue in driver.process_timer(
-              timer_window, name, time_domain, fire_time, state):
-            yield wvalue.with_value((k, wvalue.value))
-
-  def apply(self, pcoll):
-    # This code path is only used in the local direct runner.  For Dataflow
-    # runner execution, the GroupByKey transform is expanded on the service.
-    input_type = pcoll.element_type
-
-    if input_type is not None:
-      # Initialize type-hints used below to enforce type-checking and to pass
-      # downstream to further PTransforms.
-      key_type, value_type = trivial_inference.key_value_types(input_type)
-      typecoders.registry.verify_deterministic(
-          typecoders.registry.get_coder(key_type),
-          'GroupByKey operation "%s"' % self.label)
-
-      reify_output_type = KV[key_type, typehints.WindowedValue[value_type]]
-      gbk_input_type = KV[key_type, Iterable[typehints.WindowedValue[value_type]]]
-      gbk_output_type = KV[key_type, Iterable[value_type]]
-
-      return (pcoll
-              | (ParDo('reify_windows', self.ReifyWindows())
-                 .with_output_types(reify_output_type))
-              | (GroupByKeyOnly('group_by_key')
-                 .with_input_types(reify_output_type)
-                 .with_output_types(gbk_input_type))
-              | (ParDo('group_by_window',
-                       self.GroupAlsoByWindow(pcoll.windowing))
-                 .with_input_types(gbk_input_type)
-                 .with_output_types(gbk_output_type)))
-    else:
-      return (pcoll
-              | ParDo('reify_windows', self.ReifyWindows())
-              | GroupByKeyOnly('group_by_key')
-              | ParDo('group_by_window',
-                      self.GroupAlsoByWindow(pcoll.windowing)))
-
-
-K = typehints.TypeVariable('K')
-V = typehints.TypeVariable('V')
-@typehints.with_input_types(typehints.KV[K, V])
-@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
-class GroupByKeyOnly(PTransform):
-  """A group by key transform, ignoring windows."""
-
-  def __init__(self, label=None):
-    super(GroupByKeyOnly, self).__init__(label)
-
-  def infer_output_type(self, input_type):
-    key_type, value_type = trivial_inference.key_value_types(input_type)
-    return KV[key_type, Iterable[value_type]]
-
-  def apply(self, pcoll):
-    self._check_pcollection(pcoll)
-    return pvalue.PCollection(pcoll.pipeline)
-
-
-class Partition(PTransformWithSideInputs):
-  """Split a PCollection into several partitions.
-
-  Uses the specified PartitionFn to separate an input PCollection into the
-  specified number of sub-PCollections.
-
-  When apply()d, a Partition() PTransform requires the following:
-
-  Args:
-    partitionfn: a PartitionFn, or a callable with the signature described in
-      CallableWrapperPartitionFn.
-    n: number of output partitions.
-
-  The result of this PTransform is a simple list of the output PCollections
-  representing each of n partitions, in order.
-  """
-
-  class ApplyPartitionFnFn(DoFn):
-    """A DoFn that applies a PartitionFn."""
-
-    def process(self, context, partitionfn, n, *args, **kwargs):
-      partition = partitionfn.partition_for(context, n, *args, **kwargs)
-      if not 0 <= partition < n:
-        raise ValueError(
-            'PartitionFn specified out-of-bounds partition index: '
-            '%d not in [0, %d)' % (partition, n))
-      # Each input is directed into the side output that corresponds to the
-      # selected partition.
-      yield pvalue.SideOutputValue(str(partition), context.element)
-
-  def make_fn(self, fn):
-    return fn if isinstance(fn, PartitionFn) else CallableWrapperPartitionFn(fn)
-
-  def apply(self, pcoll):
-    n = int(self.args[0])
-    return pcoll | ParDo(
-        self.ApplyPartitionFnFn(), self.fn, *self.args,
-        **self.kwargs).with_outputs(*[str(t) for t in range(n)])
-
-
-class Windowing(object):
-
-  def __init__(self, windowfn, triggerfn=None, accumulation_mode=None,
-               output_time_fn=None):
-    global AccumulationMode, DefaultTrigger
-    # pylint: disable=g-import-not-at-top
-    from google.cloud.dataflow.transforms.trigger import AccumulationMode, DefaultTrigger
-    # pylint: enable=g-import-not-at-top
-    if triggerfn is None:
-      triggerfn = DefaultTrigger()
-    if accumulation_mode is None:
-      if triggerfn == DefaultTrigger():
-        accumulation_mode = AccumulationMode.DISCARDING
-      else:
-        raise ValueError(
-            'accumulation_mode must be provided for non-trivial triggers')
-    self.windowfn = windowfn
-    self.triggerfn = triggerfn
-    self.accumulation_mode = accumulation_mode
-    self.output_time_fn = output_time_fn or OutputTimeFn.OUTPUT_AT_EOW
-    self._is_default = (
-        self.windowfn == window.GlobalWindows() and
-        self.triggerfn == DefaultTrigger() and
-        self.accumulation_mode == AccumulationMode.DISCARDING and
-        self.output_time_fn == OutputTimeFn.OUTPUT_AT_EOW)
-
-  def __repr__(self):
-    return "Windowing(%s, %s, %s, %s)" % (self.windowfn, self.triggerfn,
-                                          self.accumulation_mode,
-                                          self.output_time_fn)
-
-  def is_default(self):
-    return self._is_default
-
-
-T = typehints.TypeVariable('T')
-@typehints.with_input_types(T)
-@typehints.with_output_types(T)
-class WindowInto(ParDo):  # pylint: disable=g-wrong-blank-lines
-  """A window transform assigning windows to each element of a PCollection.
-
-  Transforms an input PCollection by applying a windowing function to each
-  element.  Each transformed element in the result will be a WindowedValue
-  element with the same input value and timestamp, with its new set of windows
-  determined by the windowing function.
-  """
-
-  class WindowIntoFn(DoFn):
-    """A DoFn that applies a WindowInto operation."""
-
-    def __init__(self, windowing):
-      self.windowing = windowing
-
-    def process(self, context):
-      context = WindowFn.AssignContext(context.timestamp,
-                                       element=context.element,
-                                       existing_windows=context.windows)
-      new_windows = self.windowing.windowfn.assign(context)
-      yield WindowedValue(context.element, context.timestamp, new_windows)
-
-  def __init__(self, *args, **kwargs):
-    """Initializes a WindowInto transform.
-
-    Args:
-      *args: A tuple of position arguments.
-      **kwargs: A dictionary of keyword arguments.
-
-    The *args, **kwargs are expected to be (label, windowfn) or (windowfn).
-    The optional trigger and accumulation_mode kwargs may also be provided.
-    """
-    triggerfn = kwargs.pop('trigger', None)
-    accumulation_mode = kwargs.pop('accumulation_mode', None)
-    output_time_fn = kwargs.pop('output_time_fn', None)
-    label, windowfn = self.parse_label_and_arg(args, kwargs, 'windowfn')
-    self.windowing = Windowing(windowfn, triggerfn, accumulation_mode,
-                               output_time_fn)
-    dofn = self.WindowIntoFn(self.windowing)
-    super(WindowInto, self).__init__(label, dofn)
-
-  def get_windowing(self, unused_inputs):
-    return self.windowing
-
-  def infer_output_type(self, input_type):
-    return input_type
-
-  def apply(self, pcoll):
-    input_type = pcoll.element_type
-
-    if input_type is not None:
-      output_type = input_type
-      self.with_input_types(input_type)
-      self.with_output_types(output_type)
-    return super(WindowInto, self).apply(pcoll)
-
-
-# Python's pickling is broken for nested classes.
-WindowIntoFn = WindowInto.WindowIntoFn
-
-
-class Flatten(PTransform):
-  """Merges several PCollections into a single PCollection.
-
-  Copies all elements in 0 or more PCollections into a single output
-  PCollection. If there are no input PCollections, the resulting PCollection
-  will be empty (but see also kwargs below).
-
-  Args:
-    label: name of this transform instance. Useful while monitoring and
-      debugging a pipeline execution.
-    **kwargs: Accepts a single named argument "pipeline", which specifies the
-      pipeline that "owns" this PTransform. Ordinarily Flatten can obtain this
-      information from one of the input PCollections, but if there are none (or
-      if there's a chance there may be none), this argument is the only way to
-      provide pipeline information and should be considered mandatory.
-  """
-
-  def __init__(self, label=None, **kwargs):
-    super(Flatten, self).__init__(label)
-    self.pipeline = kwargs.pop('pipeline', None)
-    if kwargs:
-      raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys())
-
-  def _extract_input_pvalues(self, pvalueish):
-    try:
-      pvalueish = tuple(pvalueish)
-    except TypeError:
-      raise ValueError('Input to Flatten must be an iterable.')
-    return pvalueish, pvalueish
-
-  def apply(self, pcolls):
-    for pcoll in pcolls:
-      self._check_pcollection(pcoll)
-    return pvalue.PCollection(self.pipeline)
-
-  def get_windowing(self, inputs):
-    if not inputs:
-      # TODO(robertwb): Return something compatible with every windowing?
-      return Windowing(window.GlobalWindows())
-    else:
-      return super(Flatten, self).get_windowing(inputs)
-
-
-class Create(PTransform):
-  """A transform that creates a PCollection from an iterable."""
-
-  def __init__(self, *args, **kwargs):
-    """Initializes a Create transform.
-
-    Args:
-      *args: A tuple of position arguments.
-      **kwargs: A dictionary of keyword arguments.
-
-    The *args, **kwargs are expected to be (label, value) or (value).
-    """
-    label, value = self.parse_label_and_arg(args, kwargs, 'value')
-    super(Create, self).__init__(label)
-    if isinstance(value, basestring):
-      raise TypeError('PTransform Create: Refusing to treat string as '
-                      'an iterable. (string=%r)' % value)
-    elif isinstance(value, dict):
-      value = value.items()
-    self.value = tuple(value)
-
-  def infer_output_type(self, unused_input_type):
-    if not self.value:
-      return Any
-    else:
-      return Union[[trivial_inference.instance_to_type(v) for v in self.value]]
-
-  def apply(self, pbegin):
-    assert isinstance(pbegin, pvalue.PBegin)
-    self.pipeline = pbegin.pipeline
-    return pvalue.PCollection(self.pipeline)
-
-  def get_windowing(self, unused_inputs):
-    return Windowing(window.GlobalWindows())
-
-
-def Read(*args, **kwargs):
-  from google.cloud.dataflow import io
-  return io.Read(*args, **kwargs)
-
-
-def Write(*args, **kwargs):
-  from google.cloud.dataflow import io
-  return io.Write(*args, **kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/cy_combiners.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/cy_combiners.pxd b/sdks/python/google/cloud/dataflow/transforms/cy_combiners.pxd
deleted file mode 100644
index d0ab833..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/cy_combiners.pxd
+++ /dev/null
@@ -1,89 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-cimport cython
-from libc.stdint cimport int64_t, INT64_MIN, INT64_MAX
-
-cdef double _NEG_INF, _POS_INF, _NAN
-
-
-cdef class CountAccumulator(object):
-  cdef readonly int64_t value
-  cpdef add_input(self, unused_element)
-  @cython.locals(accumulator=CountAccumulator)
-  cpdef merge(self, accumulators)
-
-cdef class SumInt64Accumulator(object):
-  cdef readonly int64_t value
-  cpdef add_input(self, int64_t element)
-  @cython.locals(accumulator=SumInt64Accumulator)
-  cpdef merge(self, accumulators)
-
-cdef class MinInt64Accumulator(object):
-  cdef readonly int64_t value
-  cpdef add_input(self, int64_t element)
-  @cython.locals(accumulator=MinInt64Accumulator)
-  cpdef merge(self, accumulators)
-
-cdef class MaxInt64Accumulator(object):
-  cdef readonly int64_t value
-  cpdef add_input(self, int64_t element)
-  @cython.locals(accumulator=MaxInt64Accumulator)
-  cpdef merge(self, accumulators)
-
-cdef class MeanInt64Accumulator(object):
-  cdef readonly int64_t sum
-  cdef readonly int64_t count
-  cpdef add_input(self, int64_t element)
-  @cython.locals(accumulator=MeanInt64Accumulator)
-  cpdef merge(self, accumulators)
-
-
-cdef class SumDoubleAccumulator(object):
-  cdef readonly double value
-  cpdef add_input(self, double element)
-  @cython.locals(accumulator=SumDoubleAccumulator)
-  cpdef merge(self, accumulators)
-
-cdef class MinDoubleAccumulator(object):
-  cdef readonly double value
-  cpdef add_input(self, double element)
-  @cython.locals(accumulator=MinDoubleAccumulator)
-  cpdef merge(self, accumulators)
-
-cdef class MaxDoubleAccumulator(object):
-  cdef readonly double value
-  cpdef add_input(self, double element)
-  @cython.locals(accumulator=MaxDoubleAccumulator)
-  cpdef merge(self, accumulators)
-
-cdef class MeanDoubleAccumulator(object):
-  cdef readonly double sum
-  cdef readonly int64_t count
-  cpdef add_input(self, double element)
-  @cython.locals(accumulator=MeanDoubleAccumulator)
-  cpdef merge(self, accumulators)
-
-
-cdef class AllAccumulator(object):
-  cdef readonly bint value
-  cpdef add_input(self, bint element)
-  @cython.locals(accumulator=AllAccumulator)
-  cpdef merge(self, accumulators)
-
-cdef class AnyAccumulator(object):
-  cdef readonly bint value
-  cpdef add_input(self, bint element)
-  @cython.locals(accumulator=AnyAccumulator)
-  cpdef merge(self, accumulators)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/cy_combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/cy_combiners.py b/sdks/python/google/cloud/dataflow/transforms/cy_combiners.py
deleted file mode 100644
index 4cc4233..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/cy_combiners.py
+++ /dev/null
@@ -1,250 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A library of basic cythonized CombineFn subclasses."""
-
-from __future__ import absolute_import
-
-from google.cloud.dataflow.transforms import core
-
-
-class AccumulatorCombineFn(core.CombineFn):
-  # singleton?
-  def create_accumulator(self):
-    return self._accumulator_type()
-  @staticmethod
-  def add_input(accumulator, element):
-    accumulator.add_input(element)
-    return accumulator
-  def merge_accumulators(self, accumulators):
-    accumulator = self._accumulator_type()
-    accumulator.merge(accumulators)
-    return accumulator
-  @staticmethod
-  def extract_output(accumulator):
-    return accumulator.extract_output()
-  def __eq__(self, other):
-    return (isinstance(other, AccumulatorCombineFn)
-            and self._accumulator_type is other._accumulator_type)
-  def __hash__(self):
-    return hash(self._accumulator_type)
-
-
-_63 = 63  # Avoid large literals in C source code.
-globals()['INT64_MAX'] = 2**_63 - 1
-globals()['INT64_MIN'] = -2**_63
-
-
-class CountAccumulator(object):
-  def __init__(self):
-    self.value = 0
-  def add_input(self, unused_element):
-    self.value += 1
-  def merge(self, accumulators):
-    for accumulator in accumulators:
-      self.value += accumulator.value
-  def extract_output(self):
-    return self.value
-
-
-class SumInt64Accumulator(object):
-  def __init__(self):
-    self.value = 0
-  def add_input(self, element):
-    element = int(element)
-    if not INT64_MIN <= element <= INT64_MAX:
-      raise OverflowError(element)
-    self.value += element
-  def merge(self, accumulators):
-    for accumulator in accumulators:
-      self.value += accumulator.value
-  def extract_output(self):
-    if not INT64_MIN <= self.value <= INT64_MAX:
-      self.value %= 2**64
-      if self.value >= INT64_MAX:
-        self.value -= 2**64
-    return self.value
-
-
-class MinInt64Accumulator(object):
-  def __init__(self):
-    self.value = INT64_MAX
-  def add_input(self, element):
-    element = int(element)
-    if not INT64_MIN <= element <= INT64_MAX:
-      raise OverflowError(element)
-    if element < self.value:
-      self.value = element
-  def merge(self, accumulators):
-    for accumulator in accumulators:
-      if accumulator.value < self.value:
-        self.value = accumulator.value
-  def extract_output(self):
-    return self.value
-
-
-class MaxInt64Accumulator(object):
-  def __init__(self):
-    self.value = INT64_MIN
-  def add_input(self, element):
-    element = int(element)
-    if not INT64_MIN <= element <= INT64_MAX:
-      raise OverflowError(element)
-    if element > self.value:
-      self.value = element
-  def merge(self, accumulators):
-    for accumulator in accumulators:
-      if accumulator.value > self.value:
-        self.value = accumulator.value
-  def extract_output(self):
-    return self.value
-
-
-class MeanInt64Accumulator(object):
-  def __init__(self):
-    self.sum = 0
-    self.count = 0
-  def add_input(self, element):
-    element = int(element)
-    if not INT64_MIN <= element <= INT64_MAX:
-      raise OverflowError(element)
-    self.sum += element
-    self.count += 1
-  def merge(self, accumulators):
-    for accumulator in accumulators:
-      self.sum += accumulator.sum
-      self.count += accumulator.count
-  def extract_output(self):
-    if not INT64_MIN <= self.sum <= INT64_MAX:
-      self.sum %= 2**64
-      if self.sum >= INT64_MAX:
-        self.sum -= 2**64
-    return self.sum / self.count if self.count else _NAN
-
-
-class CountCombineFn(AccumulatorCombineFn):
-  _accumulator_type = CountAccumulator
-class SumInt64Fn(AccumulatorCombineFn):
-  _accumulator_type = SumInt64Accumulator
-class MinInt64Fn(AccumulatorCombineFn):
-  _accumulator_type = MinInt64Accumulator
-class MaxInt64Fn(AccumulatorCombineFn):
-  _accumulator_type = MaxInt64Accumulator
-class MeanInt64Fn(AccumulatorCombineFn):
-  _accumulator_type = MeanInt64Accumulator
-
-
-_POS_INF = float('inf')
-_NEG_INF = float('-inf')
-_NAN = float('nan')
-
-
-class SumDoubleAccumulator(object):
-  def __init__(self):
-    self.value = 0
-  def add_input(self, element):
-    element = float(element)
-    self.value += element
-  def merge(self, accumulators):
-    for accumulator in accumulators:
-      self.value += accumulator.value
-  def extract_output(self):
-    return self.value
-
-
-class MinDoubleAccumulator(object):
-  def __init__(self):
-    self.value = _POS_INF
-  def add_input(self, element):
-    element = float(element)
-    if element < self.value:
-      self.value = element
-  def merge(self, accumulators):
-    for accumulator in accumulators:
-      if accumulator.value < self.value:
-        self.value = accumulator.value
-  def extract_output(self):
-    return self.value
-
-
-class MaxDoubleAccumulator(object):
-  def __init__(self):
-    self.value = _NEG_INF
-  def add_input(self, element):
-    element = float(element)
-    if element > self.value:
-      self.value = element
-  def merge(self, accumulators):
-    for accumulator in accumulators:
-      if accumulator.value > self.value:
-        self.value = accumulator.value
-  def extract_output(self):
-    return self.value
-
-
-class MeanDoubleAccumulator(object):
-  def __init__(self):
-    self.sum = 0
-    self.count = 0
-  def add_input(self, element):
-    element = float(element)
-    self.sum += element
-    self.count += 1
-  def merge(self, accumulators):
-    for accumulator in accumulators:
-      self.sum += accumulator.sum
-      self.count += accumulator.count
-  def extract_output(self):
-    return self.sum / self.count if self.count else _NAN
-
-
-class SumFloatFn(AccumulatorCombineFn):
-  _accumulator_type = SumDoubleAccumulator
-class MinFloatFn(AccumulatorCombineFn):
-  _accumulator_type = MinDoubleAccumulator
-class MaxFloatFn(AccumulatorCombineFn):
-  _accumulator_type = MaxDoubleAccumulator
-class MeanFloatFn(AccumulatorCombineFn):
-  _accumulator_type = MeanDoubleAccumulator
-
-
-class AllAccumulator(object):
-  def __init__(self):
-    self.value = True
-  def add_input(self, element):
-    self.value &= not not element
-  def merge(self, accumulators):
-    for accumulator in accumulators:
-      self.value &= accumulator.value
-  def extract_output(self):
-    return self.value
-
-
-class AnyAccumulator(object):
-  def __init__(self):
-    self.value = False
-  def add_input(self, element):
-    self.value |= not not element
-  def merge(self, accumulators):
-    for accumulator in accumulators:
-      self.value |= accumulator.value
-  def extract_output(self):
-    return self.value
-
-
-class AnyCombineFn(AccumulatorCombineFn):
-  _accumulator_type = AnyAccumulator
-
-class AllCombineFn(AccumulatorCombineFn):
-  _accumulator_type = AllAccumulator