You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:13:07 UTC

[32/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
new file mode 100644
index 0000000..b8142ea
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -0,0 +1,225 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for our libraries of combine PTransforms."""
+
+import unittest
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.pipeline import Pipeline
+from google.cloud.dataflow.transforms import combiners
+import google.cloud.dataflow.transforms.combiners as combine
+from google.cloud.dataflow.transforms.core import CombineGlobally
+from google.cloud.dataflow.transforms.core import Create
+from google.cloud.dataflow.transforms.core import Map
+from google.cloud.dataflow.transforms.ptransform import PTransform
+from google.cloud.dataflow.transforms.util import assert_that, equal_to
+
+
+class CombineTest(unittest.TestCase):
+
+  def test_builtin_combines(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+
+    vals = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
+    mean = sum(vals) / float(len(vals))
+    size = len(vals)
+
+    # First for global combines.
+    pcoll = pipeline | Create('start', vals)
+    result_mean = pcoll | combine.Mean.Globally('mean')
+    result_count = pcoll | combine.Count.Globally('count')
+    assert_that(result_mean, equal_to([mean]), label='assert:mean')
+    assert_that(result_count, equal_to([size]), label='assert:size')
+
+    # Again for per-key combines.
+    pcoll = pipeline | Create('start-perkey', [('a', x) for x in vals])
+    result_key_mean = pcoll | combine.Mean.PerKey('mean-perkey')
+    result_key_count = pcoll | combine.Count.PerKey('count-perkey')
+    assert_that(result_key_mean, equal_to([('a', mean)]), label='key:mean')
+    assert_that(result_key_count, equal_to([('a', size)]), label='key:size')
+    pipeline.run()
+
+  def test_top(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+
+    # A parameter we'll be sharing with a custom comparator.
+    names = {0: 'zo',
+             1: 'one',
+             2: 'twoo',
+             3: 'three',
+             5: 'fiiive',
+             6: 'sssssix',
+             9: 'nniiinne'}
+
+    # First for global combines.
+    pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
+    result_top = pcoll | combine.Top.Largest('top', 5)
+    result_bot = pcoll | combine.Top.Smallest('bot', 4)
+    result_cmp = pcoll | combine.Top.Of(
+        'cmp',
+        6,
+        lambda a, b, names: len(names[a]) < len(names[b]),
+        names)  # Note parameter passed to comparator.
+    assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top')
+    assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot')
+    assert_that(result_cmp, equal_to([[9, 6, 6, 5, 3, 2]]), label='assert:cmp')
+
+    # Again for per-key combines.
+    pcoll = pipeline | Create(
+        'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
+    result_key_top = pcoll | combine.Top.LargestPerKey('top-perkey', 5)
+    result_key_bot = pcoll | combine.Top.SmallestPerKey('bot-perkey', 4)
+    result_key_cmp = pcoll | combine.Top.PerKey(
+        'cmp-perkey',
+        6,
+        lambda a, b, names: len(names[a]) < len(names[b]),
+        names)  # Note parameter passed to comparator.
+    assert_that(result_key_top, equal_to([('a', [9, 6, 6, 5, 3])]),
+                label='key:top')
+    assert_that(result_key_bot, equal_to([('a', [0, 1, 1, 1])]),
+                label='key:bot')
+    assert_that(result_key_cmp, equal_to([('a', [9, 6, 6, 5, 3, 2])]),
+                label='key:cmp')
+    pipeline.run()
+
+  def test_top_shorthands(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+
+    pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
+    result_top = pcoll | df.CombineGlobally('top', combiners.Largest(5))
+    result_bot = pcoll | df.CombineGlobally('bot', combiners.Smallest(4))
+    assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top')
+    assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot')
+
+    pcoll = pipeline | Create(
+        'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
+    result_ktop = pcoll | df.CombinePerKey('top-perkey', combiners.Largest(5))
+    result_kbot = pcoll | df.CombinePerKey('bot-perkey', combiners.Smallest(4))
+    assert_that(result_ktop, equal_to([('a', [9, 6, 6, 5, 3])]), label='k:top')
+    assert_that(result_kbot, equal_to([('a', [0, 1, 1, 1])]), label='k:bot')
+    pipeline.run()
+
+  def test_sample(self):
+
+    # First test global samples (lots of them).
+    for ix in xrange(300):
+      pipeline = Pipeline('DirectPipelineRunner')
+      pcoll = pipeline | Create('start', [1, 1, 2, 2])
+      result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3)
+      def matcher():
+        def match(actual):
+          # There is always exactly one result.
+          equal_to([1])([len(actual)])
+          # There are always exactly three samples in the result.
+          equal_to([3])([len(actual[0])])
+          # Sampling is without replacement.
+          num_ones = sum(1 for x in actual[0] if x == 1)
+          num_twos = sum(1 for x in actual[0] if x == 2)
+          equal_to([1, 2])([num_ones, num_twos])
+        return match
+      assert_that(result, matcher())
+      pipeline.run()
+
+    # Now test per-key samples.
+    pipeline = Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | Create(
+        'start-perkey',
+        sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
+    result = pcoll | combine.Sample.FixedSizePerKey('sample', 3)
+    def matcher():
+      def match(actual):
+        for _, samples in actual:
+          equal_to([3])([len(samples)])
+          num_ones = sum(1 for x in samples if x == 1)
+          num_twos = sum(1 for x in samples if x == 2)
+          equal_to([1, 2])([num_ones, num_twos])
+      return match
+    assert_that(result, matcher())
+    pipeline.run()
+
+  def test_tuple_combine_fn(self):
+    p = Pipeline('DirectPipelineRunner')
+    result = (
+        p
+        | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
+        | df.CombineGlobally(combine.TupleCombineFn(max,
+                                                    combine.MeanCombineFn(),
+                                                    sum)).without_defaults())
+    assert_that(result, equal_to([('c', 111.0 / 3, 99.0)]))
+    p.run()
+
+  def test_tuple_combine_fn_without_defaults(self):
+    p = Pipeline('DirectPipelineRunner')
+    result = (
+        p
+        | Create([1, 1, 2, 3])
+        | df.CombineGlobally(
+            combine.TupleCombineFn(min, combine.MeanCombineFn(), max)
+            .with_common_input()).without_defaults())
+    assert_that(result, equal_to([(1, 7.0 / 4, 3)]))
+    p.run()
+
+  def test_to_list_and_to_dict(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
+    pcoll = pipeline | Create('start', the_list)
+    result = pcoll | combine.ToList('to list')
+    def matcher(expected):
+      def match(actual):
+        equal_to(expected[0])(actual[0])
+      return match
+    assert_that(result, matcher([the_list]))
+    pipeline.run()
+
+    pipeline = Pipeline('DirectPipelineRunner')
+    pairs = [(1, 2), (3, 4), (5, 6)]
+    pcoll = pipeline | Create('start-pairs', pairs)
+    result = pcoll | combine.ToDict('to dict')
+    def matcher():
+      def match(actual):
+        equal_to([1])([len(actual)])
+        equal_to(pairs)(actual[0].iteritems())
+      return match
+    assert_that(result, matcher())
+    pipeline.run()
+
+  def test_combine_globally_with_default(self):
+    p = Pipeline('DirectPipelineRunner')
+    assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0]))
+    p.run()
+
+  def test_combine_globally_without_default(self):
+    p = Pipeline('DirectPipelineRunner')
+    result = p | Create([]) | CombineGlobally(sum).without_defaults()
+    assert_that(result, equal_to([]))
+    p.run()
+
+  def test_combine_globally_with_default_side_input(self):
+    class CombineWithSideInput(PTransform):
+      def apply(self, pcoll):
+        side = pcoll | CombineGlobally(sum).as_singleton_view()
+        main = pcoll.pipeline | Create([None])
+        return main | Map(lambda _, s: s, side)
+
+    p = Pipeline('DirectPipelineRunner')
+    result1 = p | Create('label1', []) | CombineWithSideInput('L1')
+    result2 = p | Create('label2', [1, 2, 3, 4]) | CombineWithSideInput('L2')
+    assert_that(result1, equal_to([0]), label='r1')
+    assert_that(result2, equal_to([10]), label='r2')
+    p.run()
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
new file mode 100644
index 0000000..6db0099
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -0,0 +1,1292 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Core PTransform subclasses, such as FlatMap, GroupByKey, and Map."""
+
+from __future__ import absolute_import
+
+import copy
+import uuid
+
+from google.cloud.dataflow import pvalue
+from google.cloud.dataflow import typehints
+from google.cloud.dataflow.coders import typecoders
+from google.cloud.dataflow.internal import util
+from google.cloud.dataflow.pvalue import AsIter
+from google.cloud.dataflow.pvalue import AsSingleton
+from google.cloud.dataflow.transforms import ptransform
+from google.cloud.dataflow.transforms import window
+from google.cloud.dataflow.transforms.ptransform import PTransform
+from google.cloud.dataflow.transforms.ptransform import ptransform_fn
+from google.cloud.dataflow.transforms.ptransform import PTransformWithSideInputs
+from google.cloud.dataflow.transforms.window import MIN_TIMESTAMP
+from google.cloud.dataflow.transforms.window import OutputTimeFn
+from google.cloud.dataflow.transforms.window import WindowedValue
+from google.cloud.dataflow.transforms.window import WindowFn
+from google.cloud.dataflow.typehints import Any
+from google.cloud.dataflow.typehints import get_type_hints
+from google.cloud.dataflow.typehints import is_consistent_with
+from google.cloud.dataflow.typehints import Iterable
+from google.cloud.dataflow.typehints import KV
+from google.cloud.dataflow.typehints import trivial_inference
+from google.cloud.dataflow.typehints import TypeCheckError
+from google.cloud.dataflow.typehints import Union
+from google.cloud.dataflow.typehints import WithTypeHints
+from google.cloud.dataflow.typehints.trivial_inference import element_type
+from google.cloud.dataflow.utils.options import TypeOptions
+
+
+class DoFnProcessContext(object):
+  """A processing context passed to DoFn methods during execution.
+
+  Most importantly, a DoFn.process method will access context.element
+  to get the element it is supposed to process.
+
+  Attributes:
+    label: label of the ParDo whose element is being processed.
+    element: element being processed
+      (in process method only; always None in start_bundle and finish_bundle)
+    timestamp: timestamp of the element
+      (in process method only; always None in start_bundle and finish_bundle)
+    windows: windows of the element
+      (in process method only; always None in start_bundle and finish_bundle)
+    state: a DoFnState object, which holds the runner's internal state
+      for this element.  For example, aggregator state is here.
+      Not used by the pipeline code.
+  """
+
+  def __init__(self, label, element=None, state=None):
+    """Initialize a processing context object with an element and state.
+
+    The element represents one value from a PCollection that will be accessed
+    by a DoFn object during pipeline execution, and state is an arbitrary object
+    where counters and other pipeline state information can be passed in.
+
+    DoFnProcessContext objects are also used as inputs to PartitionFn instances.
+
+    Args:
+      label: label of the PCollection whose element is being processed.
+      element: element of a PCollection being processed using this context.
+      state: a DoFnState object with state to be passed in to the DoFn object.
+    """
+    self.label = label
+    self.state = state
+    if element is not None:
+      self.set_element(element)
+
+  def set_element(self, windowed_value):
+    if windowed_value is None:
+      # Not currently processing an element.
+      if hasattr(self, 'element'):
+        del self.element
+        del self.timestamp
+        del self.windows
+    else:
+      self.element = windowed_value.value
+      self.timestamp = windowed_value.timestamp
+      self.windows = windowed_value.windows
+
+  def aggregate_to(self, aggregator, input_value):
+    """Provide a new input value for the aggregator.
+
+    Args:
+      aggregator: the aggregator to update
+      input_value: the new value to input to the combine_fn of this aggregator.
+    """
+    self.state.counter_for(aggregator).update(input_value)
+
+
+class DoFn(WithTypeHints):
+  """A function object used by a transform with custom processing.
+
+  The ParDo transform is such a transform. The ParDo.apply
+  method will take an object of type DoFn and apply it to all elements of a
+  PCollection object.
+
+  In order to have concrete DoFn objects one has to subclass from DoFn and
+  define the desired behavior (start_bundle/finish_bundle and process) or wrap a
+  callable object using the CallableWrapperDoFn class.
+  """
+
+  def default_label(self):
+    return self.__class__.__name__
+
+  def infer_output_type(self, input_type):
+    # TODO(robertwb): Side inputs types.
+    # TODO(robertwb): Assert compatibility with input type hint?
+    return self._strip_output_annotations(
+        trivial_inference.infer_return_type(self.process, [input_type]))
+
+  def start_bundle(self, context, *args, **kwargs):
+    """Called before a bundle of elements is processed on a worker.
+
+    Elements to be processed are split into bundles and distributed
+    to workers.  Before a worker calls process() on the first element
+    of its bundle, it calls this method.
+
+    Args:
+      context: a DoFnProcessContext object
+      *args: side inputs
+      **kwargs: keyword side inputs
+
+    """
+    pass
+
+  def finish_bundle(self, context, *args, **kwargs):
+    """Called after a bundle of elements is processed on a worker.
+
+    Args:
+      context: a DoFnProcessContext object
+      *args: side inputs
+      **kwargs: keyword side inputs
+    """
+    pass
+
+  def process(self, context, *args, **kwargs):
+    """Called for each element of a pipeline.
+
+    Args:
+      context: a DoFnProcessContext object containing, among other
+        attributes, the element to be processed.
+        See the DoFnProcessContext documentation for details.
+      *args: side inputs
+      **kwargs: keyword side inputs
+    """
+    raise NotImplementedError
+
+  @staticmethod
+  def from_callable(fn):
+    return CallableWrapperDoFn(fn)
+
+  def process_argspec_fn(self):
+    """Returns the Python callable that will eventually be invoked.
+
+    This should ideally be the user-level function that is called with
+    the main and (if any) side inputs, and is used to relate the type
+    hint parameters with the input parameters (e.g., by argument name).
+    """
+    return self.process
+
+  def _strip_output_annotations(self, type_hint):
+    annotations = (window.TimestampedValue, window.WindowedValue,
+                   pvalue.SideOutputValue)
+    # TODO(robertwb): These should be parameterized types that the
+    # type inferencer understands.
+    if (type_hint in annotations
+        or trivial_inference.element_type(type_hint) in annotations):
+      return Any
+    else:
+      return type_hint
+
+
+class CallableWrapperDoFn(DoFn):
+  """A DoFn (function) object wrapping a callable object.
+
+  The purpose of this class is to conveniently wrap simple functions and use
+  them in transforms.
+  """
+
+  def __init__(self, fn):
+    """Initializes a CallableWrapperDoFn object wrapping a callable.
+
+    Args:
+      fn: A callable object.
+
+    Raises:
+      TypeError: if fn parameter is not a callable type.
+    """
+    if not callable(fn):
+      raise TypeError('Expected a callable object instead of: %r' % fn)
+
+    self._fn = fn
+
+    super(CallableWrapperDoFn, self).__init__()
+
+  def __repr__(self):
+    return 'CallableWrapperDoFn(%s)' % self._fn
+
+  def default_type_hints(self):
+    type_hints = get_type_hints(self._fn)
+    # If the fn was a DoFn annotated with a type-hint that hinted a return
+    # type compatible with Iterable[Any], then we strip off the outer
+    # container type due to the 'flatten' portion of FlatMap.
+    # TODO(robertwb): Should we require an iterable specification for FlatMap?
+    if type_hints.output_types:
+      args, kwargs = type_hints.output_types
+      if len(args) == 1 and is_consistent_with(args[0], Iterable[Any]):
+        type_hints = type_hints.copy()
+        type_hints.set_output_types(element_type(args[0]), **kwargs)
+    return type_hints
+
+  def infer_output_type(self, input_type):
+    return self._strip_output_annotations(
+        trivial_inference.infer_return_type(self._fn, [input_type]))
+
+  def process(self, context, *args, **kwargs):
+    return self._fn(context.element, *args, **kwargs)
+
+  def process_argspec_fn(self):
+    return getattr(self._fn, '_argspec_fn', self._fn)
+
+
+class CombineFn(WithTypeHints):
+  """A function object used by a Combine transform with custom processing.
+
+  A CombineFn specifies how multiple values in all or part of a PCollection can
+  be merged into a single value---essentially providing the same kind of
+  information as the arguments to the Python "reduce" builtin (except for the
+  input argument, which is an instance of CombineFnProcessContext). The
+  combining process proceeds as follows:
+
+  1. Input values are partitioned into one or more batches.
+  2. For each batch, the create_accumulator method is invoked to create a fresh
+     initial "accumulator" value representing the combination of zero values.
+  3. For each input value in the batch, the add_inputs method is invoked to
+     combine more values with the accumulator for that batch.
+  4. The merge_accumulators method is invoked to combine accumulators from
+     separate batches into a single combined output accumulator value, once all
+     of the accumulators have had all the input value in their batches added to
+     them. This operation is invoked repeatedly, until there is only one
+     accumulator value left.
+  5. The extract_output operation is invoked on the final accumulator to get
+     the output value.
+  """
+
+  def default_label(self):
+    return self.__class__.__name__
+
+  def create_accumulator(self, *args, **kwargs):
+    """Return a fresh, empty accumulator for the combine operation.
+
+    Args:
+      *args: Additional arguments and side inputs.
+      **kwargs: Additional arguments and side inputs.
+    """
+    raise NotImplementedError(str(self))
+
+  def add_input(self, accumulator, element, *args, **kwargs):
+    """Return result of folding element into accumulator.
+
+    CombineFn implementors must override either add_input or add_inputs.
+
+    Args:
+      accumulator: the current accumulator
+      element: the element to add
+      *args: Additional arguments and side inputs.
+      **kwargs: Additional arguments and side inputs.
+    """
+    raise NotImplementedError(str(self))
+
+  def add_inputs(self, accumulator, elements, *args, **kwargs):
+    """Returns the result of folding each element in elements into accumulator.
+
+    This is provided in case the implementation affords more efficient
+    bulk addition of elements. The default implementation simply loops
+    over the inputs invoking add_input for each one.
+
+    Args:
+      accumulator: the current accumulator
+      elements: the elements to add
+      *args: Additional arguments and side inputs.
+      **kwargs: Additional arguments and side inputs.
+    """
+    for element in elements:
+      accumulator = self.add_input(accumulator, element, *args, **kwargs)
+    return accumulator
+
+  def merge_accumulators(self, accumulators, *args, **kwargs):
+    """Returns the result of merging several accumulators
+    to a single accumulator value.
+
+    Args:
+      accumulators: the accumulators to merge
+      *args: Additional arguments and side inputs.
+      **kwargs: Additional arguments and side inputs.
+    """
+    raise NotImplementedError(str(self))
+
+  def extract_output(self, accumulator, *args, **kwargs):
+    """Return result of converting accumulator into the output value.
+
+    Args:
+      accumulator: the final accumulator value computed by this CombineFn
+        for the entire input key or PCollection.
+      *args: Additional arguments and side inputs.
+      **kwargs: Additional arguments and side inputs.
+    """
+    raise NotImplementedError(str(self))
+
+  def apply(self, elements, *args, **kwargs):
+    """Returns result of applying this CombineFn to the input values.
+
+    Args:
+      elements: the set of values to combine.
+      *args: Additional arguments and side inputs.
+      **kwargs: Additional arguments and side inputs.
+    """
+    return self.extract_output(
+        self.add_inputs(
+            self.create_accumulator(*args, **kwargs), elements,
+            *args, **kwargs),
+        *args, **kwargs)
+
+  def for_input_type(self, input_type):
+    """Returns a specialized implementation of self, if it exists.
+
+    Otherwise, returns self.
+
+    Args:
+      input_type: the type of input elements.
+    """
+    return self
+
+  @staticmethod
+  def from_callable(fn):
+    return CallableWrapperCombineFn(fn)
+
+  @staticmethod
+  def maybe_from_callable(fn):
+    return fn if isinstance(fn, CombineFn) else CallableWrapperCombineFn(fn)
+
+
+class CallableWrapperCombineFn(CombineFn):
+  """A CombineFn (function) object wrapping a callable object.
+
+  The purpose of this class is to conveniently wrap simple functions and use
+  them in Combine transforms.
+  """
+  _EMPTY = object()
+
+  def __init__(self, fn):
+    """Initializes a CallableFn object wrapping a callable.
+
+    Args:
+      fn: A callable object that reduces elements of an iterable to a single
+        value (like the builtins sum and max). This callable must be capable of
+        receiving the kind of values it generates as output in its input, and
+        for best results, its operation must be commutative and associative.
+
+    Raises:
+      TypeError: if fn parameter is not a callable type.
+    """
+    if not callable(fn):
+      raise TypeError('Expected a callable object instead of: %r' % fn)
+
+    super(CallableWrapperCombineFn, self).__init__()
+    self._fn = fn
+
+  def __repr__(self):
+    return "CallableWrapperCombineFn(%s)" % self._fn
+
+  def create_accumulator(self, *args, **kwargs):
+    return self._EMPTY
+
+  def add_input(self, accumulator, element, *args, **kwargs):
+    if accumulator is self._EMPTY:
+      return element
+    else:
+      return self._fn([accumulator, element], *args, **kwargs)
+
+  def add_inputs(self, accumulator, elements, *args, **kwargs):
+    if accumulator is self._EMPTY:
+      return self._fn(elements, *args, **kwargs)
+    elif isinstance(elements, (list, tuple)):
+      return self._fn([accumulator] + elements, *args, **kwargs)
+    else:
+      def union():
+        yield accumulator
+        for e in elements:
+          yield e
+      return self._fn(union(), *args, **kwargs)
+
+  def merge_accumulators(self, accumulators, *args, **kwargs):
+    # It's (weakly) assumed that self._fn is associative.
+    return self._fn(accumulators, *args, **kwargs)
+
+  def extract_output(self, accumulator, *args, **kwargs):
+    return self._fn(()) if accumulator is self._EMPTY else accumulator
+
+  def default_type_hints(self):
+    fn_hints = get_type_hints(self._fn)
+    if fn_hints.input_types is None:
+      return fn_hints
+    else:
+      # fn(Iterable[V]) -> V becomes CombineFn(V) -> V
+      input_args, input_kwargs = fn_hints.input_types
+      if not input_args:
+        if len(input_kwargs) == 1:
+          input_args, input_kwargs = tuple(input_kwargs.values()), {}
+        else:
+          raise TypeError('Combiner input type must be specified positionally.')
+      if not is_consistent_with(input_args[0], Iterable[Any]):
+        raise TypeCheckError(
+            'All functions for a Combine PTransform must accept a '
+            'single argument compatible with: Iterable[Any]. '
+            'Instead a function with input type: %s was received.'
+             % input_args[0])
+      input_args = (element_type(input_args[0]),) + input_args[1:]
+      # TODO(robertwb): Assert output type is consistent with input type?
+      hints = fn_hints.copy()
+      hints.set_input_types(*input_args, **input_kwargs)
+      return hints
+
+  def for_input_type(self, input_type):
+    # Avoid circular imports.
+    from google.cloud.dataflow.transforms import cy_combiners
+    if self._fn is any:
+      return cy_combiners.AnyCombineFn()
+    elif self._fn is all:
+      return cy_combiners.AllCombineFn()
+    else:
+      known_types = {
+          (sum, int): cy_combiners.SumInt64Fn(),
+          (min, int): cy_combiners.MinInt64Fn(),
+          (max, int): cy_combiners.MaxInt64Fn(),
+          (sum, float): cy_combiners.SumFloatFn(),
+          (min, float): cy_combiners.MinFloatFn(),
+          (max, float): cy_combiners.MaxFloatFn(),
+      }
+    return known_types.get((self._fn, input_type), self)
+
+
+class PartitionFn(WithTypeHints):
+  """A function object used by a Partition transform.
+
+  A PartitionFn specifies how individual values in a PCollection will be placed
+  into separate partitions, indexed by an integer.
+  """
+
+  def default_label(self):
+    return self.__class__.__name__
+
+  def partition_for(self, context, num_partitions, *args, **kwargs):
+    """Specify which partition will receive this element.
+
+    Args:
+      context: A DoFnProcessContext containing an element of the
+        input PCollection.
+      num_partitions: Number of partitions, i.e., output PCollections.
+      *args: optional parameters and side inputs.
+      **kwargs: optional parameters and side inputs.
+
+    Returns:
+      An integer in [0, num_partitions).
+    """
+    pass
+
+
+class CallableWrapperPartitionFn(PartitionFn):
+  """A PartitionFn object wrapping a callable object.
+
+  Instances of this class wrap simple functions for use in Partition operations.
+  """
+
+  def __init__(self, fn):
+    """Initializes a PartitionFn object wrapping a callable.
+
+    Args:
+      fn: A callable object, which should accept the following arguments:
+            element - element to assign to a partition.
+            num_partitions - number of output partitions.
+          and may accept additional arguments and side inputs.
+
+    Raises:
+      TypeError: if fn is not a callable type.
+    """
+    if not callable(fn):
+      raise TypeError('Expected a callable object instead of: %r' % fn)
+    self._fn = fn
+
+  def partition_for(self, context, num_partitions, *args, **kwargs):
+    return self._fn(context.element, num_partitions, *args, **kwargs)
+
+
+class ParDo(PTransformWithSideInputs):
+  """A ParDo transform.
+
+  Processes an input PCollection by applying a DoFn to each element and
+  returning the accumulated results into an output PCollection. The type of the
+  elements is not fixed as long as the DoFn can deal with it. In reality
+  the type is restrained to some extent because the elements sometimes must be
+  persisted to external storage. See the apply() method comments for a detailed
+  description of all possible arguments.
+
+  Note that the DoFn must return an iterable for each element of the input
+  PCollection.  An easy way to do this is to use the yield keyword in the
+  process method.
+
+  Args:
+      label: name of this transform instance. Useful while monitoring and
+        debugging a pipeline execution.
+      pcoll: a PCollection to be processed.
+      dofn: a DoFn object to be applied to each element of pcoll argument.
+      *args: positional arguments passed to the dofn object.
+      **kwargs:  keyword arguments passed to the dofn object.
+
+  Note that the positional and keyword arguments will be processed in order
+  to detect PCollections that will be computed as side inputs to the
+  transform. During pipeline execution whenever the DoFn object gets executed
+  (its apply() method gets called) the PCollection arguments will be replaced
+  by values from the PCollection in the exact positions where they appear in
+  the argument lists.
+  """
+
+  def __init__(self, fn_or_label, *args, **kwargs):
+    super(ParDo, self).__init__(fn_or_label, *args, **kwargs)
+
+    if not isinstance(self.fn, DoFn):
+      raise TypeError('ParDo must be called with a DoFn instance.')
+
+  def default_type_hints(self):
+    return self.fn.get_type_hints()
+
+  def infer_output_type(self, input_type):
+    return trivial_inference.element_type(
+        self.fn.infer_output_type(input_type))
+
+  def make_fn(self, fn):
+    return fn if isinstance(fn, DoFn) else CallableWrapperDoFn(fn)
+
+  def process_argspec_fn(self):
+    return self.fn.process_argspec_fn()
+
+  def apply(self, pcoll):
+    self.side_output_tags = set()
+    # TODO(robertwb): Change all uses of the dofn attribute to use fn instead.
+    self.dofn = self.fn
+    return pvalue.PCollection(pcoll.pipeline)
+
+  def with_outputs(self, *tags, **main_kw):
+    """Returns a tagged tuple allowing access to the outputs of a ParDo.
+
+    The resulting object supports access to the
+    PCollection associated with a tag (e.g., o.tag, o[tag]) and iterating over
+    the available tags (e.g., for tag in o: ...).
+
+    Args:
+      *tags: if non-empty, list of valid tags. If a list of valid tags is given,
+        it will be an error to use an undeclared tag later in the pipeline.
+      **main_kw: dictionary empty or with one key 'main' defining the tag to be
+        used for the main output (which will not have a tag associated with it).
+
+    Returns:
+      An object of type DoOutputsTuple that bundles together all the outputs
+      of a ParDo transform and allows accessing the individual
+      PCollections for each output using an object.tag syntax.
+
+    Raises:
+      TypeError: if the self object is not a PCollection that is the result of
+        a ParDo transform.
+      ValueError: if main_kw contains any key other than 'main'.
+    """
+    main_tag = main_kw.pop('main', None)
+    if main_kw:
+      raise ValueError('Unexpected keyword arguments: %s' % main_kw.keys())
+    return _MultiParDo(self, tags, main_tag)
+
+
+class _MultiParDo(PTransform):
+
+  def __init__(self, do_transform, tags, main_tag):
+    super(_MultiParDo, self).__init__(do_transform.label)
+    self._do_transform = do_transform
+    self._tags = tags
+    self._main_tag = main_tag
+
+  def apply(self, pcoll):
+    _ = pcoll | self._do_transform
+    return pvalue.DoOutputsTuple(
+        pcoll.pipeline, self._do_transform, self._tags, self._main_tag)
+
+
+def FlatMap(fn_or_label, *args, **kwargs):  # pylint: disable=invalid-name
+  """FlatMap is like ParDo except it takes a callable to specify the
+  transformation.
+
+  The callable must return an iterable for each element of the input
+  PCollection.  The elements of these iterables will be flattened into
+  the output PCollection.
+
+  Args:
+    fn_or_label: name of this transform instance. Useful while monitoring and
+      debugging a pipeline execution.
+    *args: positional arguments passed to the transform callable.
+    **kwargs: keyword arguments passed to the transform callable.
+
+  Returns:
+    A PCollection containing the Map outputs.
+
+  Raises:
+    TypeError: If the fn passed as argument is not a callable. Typical error
+      is to pass a DoFn instance which is supported only for ParDo.
+  """
+  if fn_or_label is None or isinstance(fn_or_label, str):
+    label, fn, args = fn_or_label, args[0], args[1:]
+  else:
+    label, fn = None, fn_or_label
+  if not callable(fn):
+    raise TypeError(
+        'FlatMap can be used only with callable objects. '
+        'Received %r instead for %s argument.'
+        % (fn, 'first' if label is None else 'second'))
+
+  if label is None:
+    label = 'FlatMap(%s)' % ptransform.label_from_callable(fn)
+
+  return ParDo(label, CallableWrapperDoFn(fn), *args, **kwargs)
+
+
+def Map(fn_or_label, *args, **kwargs):  # pylint: disable=invalid-name
+  """Map is like FlatMap except its callable returns only a single element.
+
+  Args:
+    fn_or_label: name of this transform instance. Useful while monitoring and
+      debugging a pipeline execution.
+    *args: positional arguments passed to the transform callable.
+    **kwargs: keyword arguments passed to the transform callable.
+
+  Returns:
+    A PCollection containing the Map outputs.
+
+  Raises:
+    TypeError: If the fn passed as argument is not a callable. Typical error
+      is to pass a DoFn instance which is supported only for ParDo.
+  """
+  if isinstance(fn_or_label, str):
+    label, fn, args = fn_or_label, args[0], args[1:]
+  else:
+    label, fn = None, fn_or_label
+  if not callable(fn):
+    raise TypeError(
+        'Map can be used only with callable objects. '
+        'Received %r instead for %s argument.'
+        % (fn, 'first' if label is None else 'second'))
+  wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
+
+  # Proxy the type-hint information from the original function to this new
+  # wrapped function.
+  get_type_hints(wrapper).input_types = get_type_hints(fn).input_types
+  output_hint = get_type_hints(fn).simple_output_type(label)
+  if output_hint:
+    get_type_hints(wrapper).set_output_types(typehints.Iterable[output_hint])
+  # pylint: disable=protected-access
+  wrapper._argspec_fn = fn
+  # pylint: enable=protected-access
+
+  if label is None:
+    label = 'Map(%s)' % ptransform.label_from_callable(fn)
+
+  return FlatMap(label, wrapper, *args, **kwargs)
+
+
+def Filter(fn_or_label, *args, **kwargs):  # pylint: disable=invalid-name
+  """Filter is a FlatMap with its callable filtering out elements.
+
+  Args:
+    fn_or_label: name of this transform instance. Useful while monitoring and
+      debugging a pipeline execution.
+    *args: positional arguments passed to the transform callable.
+    **kwargs: keyword arguments passed to the transform callable.
+
+  Returns:
+    A PCollection containing the Filter outputs.
+
+  Raises:
+    TypeError: If the fn passed as argument is not a callable. Typical error
+      is to pass a DoFn instance which is supported only for FlatMap.
+  """
+  if isinstance(fn_or_label, str):
+    label, fn, args = fn_or_label, args[0], args[1:]
+  else:
+    label, fn = None, fn_or_label
+  if not callable(fn):
+    raise TypeError(
+        'Filter can be used only with callable objects. '
+        'Received %r instead for %s argument.'
+        % (fn, 'first' if label is None else 'second'))
+  wrapper = lambda x, *args, **kwargs: [x] if fn(x, *args, **kwargs) else []
+
+  # Proxy the type-hint information from the function being wrapped, setting the
+  # output type to be the same as the input type.
+  get_type_hints(wrapper).input_types = get_type_hints(fn).input_types
+  output_hint = get_type_hints(fn).simple_output_type(label)
+  if (output_hint is None
+      and get_type_hints(wrapper).input_types
+      and get_type_hints(wrapper).input_types[0]):
+    output_hint = get_type_hints(wrapper).input_types[0]
+  if output_hint:
+    get_type_hints(wrapper).set_output_types(typehints.Iterable[output_hint])
+  # pylint: disable=protected-access
+  wrapper._argspec_fn = fn
+  # pylint: enable=protected-access
+
+  if label is None:
+    label = 'Filter(%s)' % ptransform.label_from_callable(fn)
+
+  return FlatMap(label, wrapper, *args, **kwargs)
+
+
+class CombineGlobally(PTransform):
+  """A CombineGlobally transform.
+
+  Reduces a PCollection to a single value by progressively applying a CombineFn
+  to portions of the PCollection (and to intermediate values created thereby).
+  See documentation in CombineFn for details on the specifics on how CombineFns
+  are applied.
+
+  Args:
+    label: name of this transform instance. Useful while monitoring and
+      debugging a pipeline execution.
+    pcoll: a PCollection to be reduced into a single value.
+    fn: a CombineFn object that will be called to progressively reduce the
+      PCollection into single values, or a callable suitable for wrapping
+      by CallableWrapperCombineFn.
+    *args: positional arguments passed to the CombineFn object.
+    **kwargs: keyword arguments passed to the CombineFn object.
+
+  Raises:
+    TypeError: If the output type of the input PCollection is not compatible
+      with Iterable[A].
+
+  Returns:
+    A single-element PCollection containing the main output of the Combine
+    transform.
+
+  Note that the positional and keyword arguments will be processed in order
+  to detect PObjects that will be computed as side inputs to the transform.
+  During pipeline execution whenever the CombineFn object gets executed (i.e.,
+  any of the CombineFn methods get called), the PObject arguments will be
+  replaced by their actual value in the exact position where they appear in
+  the argument lists.
+  """
+  has_defaults = True
+  as_view = False
+
+  def __init__(self, label_or_fn, *args, **kwargs):
+    if label_or_fn is None or isinstance(label_or_fn, str):
+      label, fn, args = label_or_fn, args[0], args[1:]
+    else:
+      label, fn = None, label_or_fn
+
+    super(CombineGlobally, self).__init__(label)
+    self.fn = fn
+    self.args = args
+    self.kwargs = kwargs
+
+  def default_label(self):
+    return 'CombineGlobally(%s)' % ptransform.label_from_callable(self.fn)
+
+  def clone(self, **extra_attributes):
+    clone = copy.copy(self)
+    clone.__dict__.update(extra_attributes)
+    return clone
+
+  def with_defaults(self, has_defaults=True):
+    return self.clone(has_defaults=has_defaults)
+
+  def without_defaults(self):
+    return self.with_defaults(False)
+
+  def as_singleton_view(self):
+    return self.clone(as_view=True)
+
+  def apply(self, pcoll):
+    def add_input_types(transform):
+      type_hints = self.get_type_hints()
+      if type_hints.input_types:
+        return transform.with_input_types(type_hints.input_types[0][0])
+      else:
+        return transform
+
+    combined = (pcoll
+            | add_input_types(Map('KeyWithVoid', lambda v: (None, v))
+               .with_output_types(KV[None, pcoll.element_type]))
+            | CombinePerKey('CombinePerKey', self.fn, *self.args, **self.kwargs)
+            | Map('UnKey', lambda (k, v): v))
+
+    if not self.has_defaults and not self.as_view:
+      return combined
+
+    if self.has_defaults:
+      combine_fn = (
+          self.fn if isinstance(self.fn, CombineFn)
+          else CombineFn.from_callable(self.fn))
+      default_value = combine_fn.apply([], *self.args, **self.kwargs)
+    else:
+      default_value = pvalue._SINGLETON_NO_DEFAULT  # pylint: disable=protected-access
+    view = pvalue.AsSingleton(combined, default_value=default_value)
+    if self.as_view:
+      return view
+    else:
+      if pcoll.windowing.windowfn != window.GlobalWindows():
+        raise ValueError(
+            "Default values are not yet supported in CombineGlobally() if the "
+            "output  PCollection is not windowed by GlobalWindows. "
+            "Instead, use CombineGlobally().without_defaults() to output "
+            "an empty PCollection if the input PCollection is empty, "
+            "or CombineGlobally().as_singleton_view() to get the default "
+            "output of the CombineFn if the input PCollection is empty.")
+      def typed(transform):
+        # TODO(robertwb): We should infer this.
+        if combined.element_type:
+          return transform.with_output_types(combined.element_type)
+        else:
+          return transform
+      return (pcoll.pipeline
+              | Create('DoOnce', [None])
+              | typed(Map('InjectDefault', lambda _, s: s, view)))
+
+
+@ptransform_fn
+def CombinePerKey(label, pcoll, fn, *args, **kwargs):  # pylint: disable=invalid-name
+  """A per-key Combine transform.
+
+  Identifies sets of values associated with the same key in the input
+  PCollection, then applies a CombineFn to condense those sets to single
+  values. See documentation in CombineFn for details on the specifics on how
+  CombineFns are applied.
+
+  Args:
+    label: name of this transform instance. Useful while monitoring and
+      debugging a pipeline execution.
+    pcoll: input pcollection.
+    fn: instance of CombineFn to apply to all values under the same key in
+      pcoll, or a callable whose signature is f(iterable, *args, **kwargs)
+      (e.g., sum, max).
+    *args: arguments and side inputs, passed directly to the CombineFn.
+    **kwargs: arguments and side inputs, passed directly to the CombineFn.
+
+  Returns:
+    A PObject holding the result of the combine operation.
+  """
+  return pcoll | GroupByKey() | CombineValues('Combine', fn, *args, **kwargs)
+
+
+# TODO(robertwb): Rename to CombineGroupedValues?
+class CombineValues(PTransformWithSideInputs):
+
+  def make_fn(self, fn):
+    return fn if isinstance(fn, CombineFn) else CombineFn.from_callable(fn)
+
+  def apply(self, pcoll):
+    args, kwargs = util.insert_values_in_args(
+      self.args, self.kwargs, self.side_inputs)
+
+    input_type = pcoll.element_type
+    key_type = None
+    if input_type is not None:
+      key_type, _ = input_type.tuple_types
+
+    runtime_type_check = (
+        pcoll.pipeline.options is not None and
+        pcoll.pipeline.options.view_as(TypeOptions).runtime_type_check)
+    return pcoll | ParDo(
+        CombineValuesDoFn(key_type, self.fn, runtime_type_check),
+        *args, **kwargs)
+
+
+class CombineValuesDoFn(DoFn):
+  """DoFn for performing per-key Combine transforms."""
+
+  def __init__(self, input_pcoll_type, combinefn, runtime_type_check):
+    super(CombineValuesDoFn, self).__init__()
+    self.combinefn = combinefn
+    self.runtime_type_check = runtime_type_check
+
+  def process(self, p_context, *args, **kwargs):
+    # Expected elements input to this DoFn are 2-tuples of the form
+    # (key, iter), with iter an iterable of all the values associated with key
+    # in the input PCollection.
+    if self.runtime_type_check:
+      # Apply the combiner in a single operation rather than artificially
+      # breaking it up so that output type violations manifest as TypeCheck
+      # errors rather than type errors.
+      return [
+          (p_context.element[0],
+           self.combinefn.apply(p_context.element[1], *args, **kwargs))]
+    else:
+      # Add the elements into three accumulators (for testing of merge).
+      elements = p_context.element[1]
+      accumulators = []
+      for k in range(3):
+        if len(elements) <= k:
+          break
+        accumulators.append(
+            self.combinefn.add_inputs(
+                self.combinefn.create_accumulator(*args, **kwargs),
+                elements[k::3],
+                *args, **kwargs))
+      # Merge the accumulators.
+      accumulator = self.combinefn.merge_accumulators(
+          accumulators, *args, **kwargs)
+      # Convert accumulator to the final result.
+      return [(p_context.element[0],
+               self.combinefn.extract_output(accumulator, *args, **kwargs))]
+
+  def default_type_hints(self):
+    hints = self.combinefn.get_type_hints().copy()
+    if hints.input_types:
+      K = typehints.TypeVariable('K')
+      args, kwargs = hints.input_types
+      args = (typehints.Tuple[K, typehints.Iterable[args[0]]],) + args[1:]
+      hints.set_input_types(*args, **kwargs)
+    else:
+      K = typehints.Any
+    if hints.output_types:
+      main_output_type = hints.simple_output_type('')
+      hints.set_output_types(typehints.Tuple[K, main_output_type])
+    return hints
+
+
+K = typehints.TypeVariable('K')
+V = typehints.TypeVariable('V')
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
+class GroupByKey(PTransform):
+  """A group by key transform.
+
+  Processes an input PCollection consisting of key/value pairs represented as a
+  tuple pair. The result is a PCollection where values having a common key are
+  grouped together.  For example (a, 1), (b, 2), (a, 3) will result into
+  (a, [1, 3]), (b, [2]).
+
+  The implementation here is used only when run on the local direct runner.
+  """
+
+  class ReifyWindows(DoFn):
+
+    def process(self, context):
+      try:
+        k, v = context.element
+      except TypeError:
+        raise TypeCheckError('Input to GroupByKey must be a PCollection with '
+                             'elements compatible with KV[A, B]')
+
+      return [(k, window.WindowedValue(v, context.timestamp, context.windows))]
+
+    def infer_output_type(self, input_type):
+      key_type, value_type = trivial_inference.key_value_types(input_type)
+      return Iterable[KV[key_type, typehints.WindowedValue[value_type]]]
+
+  class GroupAlsoByWindow(DoFn):
+    # TODO(robertwb): Support combiner lifting.
+
+    def __init__(self, windowing):
+      super(GroupByKey.GroupAlsoByWindow, self).__init__()
+      self.windowing = windowing
+
+    def infer_output_type(self, input_type):
+      key_type, windowed_value_iter_type = trivial_inference.key_value_types(
+          input_type)
+      value_type = windowed_value_iter_type.inner_type.inner_type
+      return Iterable[KV[key_type, Iterable[value_type]]]
+
+    def process(self, context):
+      k, vs = context.element
+      # pylint: disable=g-import-not-at-top
+      from google.cloud.dataflow.transforms.trigger import InMemoryUnmergedState
+      from google.cloud.dataflow.transforms.trigger import create_trigger_driver
+      # pylint: enable=g-import-not-at-top
+      driver = create_trigger_driver(self.windowing, True)
+      state = InMemoryUnmergedState()
+      # TODO(robertwb): Conditionally process in smaller chunks.
+      for wvalue in driver.process_elements(state, vs, MIN_TIMESTAMP):
+        yield wvalue.with_value((k, wvalue.value))
+      while state.timers:
+        fired = state.get_and_clear_timers()
+        for timer_window, (name, time_domain, fire_time) in fired:
+          for wvalue in driver.process_timer(
+              timer_window, name, time_domain, fire_time, state):
+            yield wvalue.with_value((k, wvalue.value))
+
+  def apply(self, pcoll):
+    # This code path is only used in the local direct runner.  For Dataflow
+    # runner execution, the GroupByKey transform is expanded on the service.
+    input_type = pcoll.element_type
+
+    if input_type is not None:
+      # Initialize type-hints used below to enforce type-checking and to pass
+      # downstream to further PTransforms.
+      key_type, value_type = trivial_inference.key_value_types(input_type)
+      typecoders.registry.verify_deterministic(
+          typecoders.registry.get_coder(key_type),
+          'GroupByKey operation "%s"' % self.label)
+
+      reify_output_type = KV[key_type, typehints.WindowedValue[value_type]]
+      gbk_input_type = KV[key_type, Iterable[typehints.WindowedValue[value_type]]]
+      gbk_output_type = KV[key_type, Iterable[value_type]]
+
+      return (pcoll
+              | (ParDo('reify_windows', self.ReifyWindows())
+                 .with_output_types(reify_output_type))
+              | (GroupByKeyOnly('group_by_key')
+                 .with_input_types(reify_output_type)
+                 .with_output_types(gbk_input_type))
+              | (ParDo('group_by_window',
+                       self.GroupAlsoByWindow(pcoll.windowing))
+                 .with_input_types(gbk_input_type)
+                 .with_output_types(gbk_output_type)))
+    else:
+      return (pcoll
+              | ParDo('reify_windows', self.ReifyWindows())
+              | GroupByKeyOnly('group_by_key')
+              | ParDo('group_by_window',
+                      self.GroupAlsoByWindow(pcoll.windowing)))
+
+
+K = typehints.TypeVariable('K')
+V = typehints.TypeVariable('V')
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
+class GroupByKeyOnly(PTransform):
+  """A group by key transform, ignoring windows."""
+
+  def __init__(self, label=None):
+    super(GroupByKeyOnly, self).__init__(label)
+
+  def infer_output_type(self, input_type):
+    key_type, value_type = trivial_inference.key_value_types(input_type)
+    return KV[key_type, Iterable[value_type]]
+
+  def apply(self, pcoll):
+    self._check_pcollection(pcoll)
+    return pvalue.PCollection(pcoll.pipeline)
+
+
+class Partition(PTransformWithSideInputs):
+  """Split a PCollection into several partitions.
+
+  Uses the specified PartitionFn to separate an input PCollection into the
+  specified number of sub-PCollections.
+
+  When apply()d, a Partition() PTransform requires the following:
+
+  Args:
+    partitionfn: a PartitionFn, or a callable with the signature described in
+      CallableWrapperPartitionFn.
+    n: number of output partitions.
+
+  The result of this PTransform is a simple list of the output PCollections
+  representing each of n partitions, in order.
+  """
+
+  class ApplyPartitionFnFn(DoFn):
+    """A DoFn that applies a PartitionFn."""
+
+    def process(self, context, partitionfn, n, *args, **kwargs):
+      partition = partitionfn.partition_for(context, n, *args, **kwargs)
+      if not 0 <= partition < n:
+        raise ValueError(
+            'PartitionFn specified out-of-bounds partition index: '
+            '%d not in [0, %d)' % (partition, n))
+      # Each input is directed into the side output that corresponds to the
+      # selected partition.
+      yield pvalue.SideOutputValue(str(partition), context.element)
+
+  def make_fn(self, fn):
+    return fn if isinstance(fn, PartitionFn) else CallableWrapperPartitionFn(fn)
+
+  def apply(self, pcoll):
+    n = int(self.args[0])
+    return pcoll | ParDo(
+        self.ApplyPartitionFnFn(), self.fn, *self.args,
+        **self.kwargs).with_outputs(*[str(t) for t in range(n)])
+
+
+class Windowing(object):
+
+  def __init__(self, windowfn, triggerfn=None, accumulation_mode=None,
+               output_time_fn=None):
+    global AccumulationMode, DefaultTrigger
+    # pylint: disable=g-import-not-at-top
+    from google.cloud.dataflow.transforms.trigger import AccumulationMode, DefaultTrigger
+    # pylint: enable=g-import-not-at-top
+    if triggerfn is None:
+      triggerfn = DefaultTrigger()
+    if accumulation_mode is None:
+      if triggerfn == DefaultTrigger():
+        accumulation_mode = AccumulationMode.DISCARDING
+      else:
+        raise ValueError(
+            'accumulation_mode must be provided for non-trivial triggers')
+    self.windowfn = windowfn
+    self.triggerfn = triggerfn
+    self.accumulation_mode = accumulation_mode
+    self.output_time_fn = output_time_fn or OutputTimeFn.OUTPUT_AT_EOW
+    self._is_default = (
+        self.windowfn == window.GlobalWindows() and
+        self.triggerfn == DefaultTrigger() and
+        self.accumulation_mode == AccumulationMode.DISCARDING and
+        self.output_time_fn == OutputTimeFn.OUTPUT_AT_EOW)
+
+  def __repr__(self):
+    return "Windowing(%s, %s, %s, %s)" % (self.windowfn, self.triggerfn,
+                                          self.accumulation_mode,
+                                          self.output_time_fn)
+
+  def is_default(self):
+    return self._is_default
+
+
+T = typehints.TypeVariable('T')
+@typehints.with_input_types(T)
+@typehints.with_output_types(T)
+class WindowInto(ParDo):  # pylint: disable=g-wrong-blank-lines
+  """A window transform assigning windows to each element of a PCollection.
+
+  Transforms an input PCollection by applying a windowing function to each
+  element.  Each transformed element in the result will be a WindowedValue
+  element with the same input value and timestamp, with its new set of windows
+  determined by the windowing function.
+  """
+
+  class WindowIntoFn(DoFn):
+    """A DoFn that applies a WindowInto operation."""
+
+    def __init__(self, windowing):
+      self.windowing = windowing
+
+    def process(self, context):
+      context = WindowFn.AssignContext(context.timestamp,
+                                       element=context.element,
+                                       existing_windows=context.windows)
+      new_windows = self.windowing.windowfn.assign(context)
+      yield WindowedValue(context.element, context.timestamp, new_windows)
+
+  def __init__(self, *args, **kwargs):
+    """Initializes a WindowInto transform.
+
+    Args:
+      *args: A tuple of position arguments.
+      **kwargs: A dictionary of keyword arguments.
+
+    The *args, **kwargs are expected to be (label, windowfn) or (windowfn).
+    The optional trigger and accumulation_mode kwargs may also be provided.
+    """
+    triggerfn = kwargs.pop('trigger', None)
+    accumulation_mode = kwargs.pop('accumulation_mode', None)
+    output_time_fn = kwargs.pop('output_time_fn', None)
+    label, windowfn = self.parse_label_and_arg(args, kwargs, 'windowfn')
+    self.windowing = Windowing(windowfn, triggerfn, accumulation_mode,
+                               output_time_fn)
+    dofn = self.WindowIntoFn(self.windowing)
+    super(WindowInto, self).__init__(label, dofn)
+
+  def get_windowing(self, unused_inputs):
+    return self.windowing
+
+  def infer_output_type(self, input_type):
+    return input_type
+
+  def apply(self, pcoll):
+    input_type = pcoll.element_type
+
+    if input_type is not None:
+      output_type = input_type
+      self.with_input_types(input_type)
+      self.with_output_types(output_type)
+    return super(WindowInto, self).apply(pcoll)
+
+
+# Python's pickling is broken for nested classes.
+WindowIntoFn = WindowInto.WindowIntoFn
+
+
+class Flatten(PTransform):
+  """Merges several PCollections into a single PCollection.
+
+  Copies all elements in 0 or more PCollections into a single output
+  PCollection. If there are no input PCollections, the resulting PCollection
+  will be empty (but see also kwargs below).
+
+  Args:
+    label: name of this transform instance. Useful while monitoring and
+      debugging a pipeline execution.
+    **kwargs: Accepts a single named argument "pipeline", which specifies the
+      pipeline that "owns" this PTransform. Ordinarily Flatten can obtain this
+      information from one of the input PCollections, but if there are none (or
+      if there's a chance there may be none), this argument is the only way to
+      provide pipeline information and should be considered mandatory.
+  """
+
+  def __init__(self, label=None, **kwargs):
+    super(Flatten, self).__init__(label)
+    self.pipeline = kwargs.pop('pipeline', None)
+    if kwargs:
+      raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys())
+
+  def _extract_input_pvalues(self, pvalueish):
+    try:
+      pvalueish = tuple(pvalueish)
+    except TypeError:
+      raise ValueError('Input to Flatten must be an iterable.')
+    return pvalueish, pvalueish
+
+  def apply(self, pcolls):
+    for pcoll in pcolls:
+      self._check_pcollection(pcoll)
+    return pvalue.PCollection(self.pipeline)
+
+  def get_windowing(self, inputs):
+    if not inputs:
+      # TODO(robertwb): Return something compatible with every windowing?
+      return Windowing(window.GlobalWindows())
+    else:
+      return super(Flatten, self).get_windowing(inputs)
+
+
+class Create(PTransform):
+  """A transform that creates a PCollection from an iterable."""
+
+  def __init__(self, *args, **kwargs):
+    """Initializes a Create transform.
+
+    Args:
+      *args: A tuple of position arguments.
+      **kwargs: A dictionary of keyword arguments.
+
+    The *args, **kwargs are expected to be (label, value) or (value).
+    """
+    label, value = self.parse_label_and_arg(args, kwargs, 'value')
+    super(Create, self).__init__(label)
+    if isinstance(value, basestring):
+      raise TypeError('PTransform Create: Refusing to treat string as '
+                      'an iterable. (string=%r)' % value)
+    elif isinstance(value, dict):
+      value = value.items()
+    self.value = tuple(value)
+
+  def infer_output_type(self, unused_input_type):
+    if not self.value:
+      return Any
+    else:
+      return Union[[trivial_inference.instance_to_type(v) for v in self.value]]
+
+  def apply(self, pbegin):
+    assert isinstance(pbegin, pvalue.PBegin)
+    self.pipeline = pbegin.pipeline
+    return pvalue.PCollection(self.pipeline)
+
+  def get_windowing(self, unused_inputs):
+    return Windowing(window.GlobalWindows())
+
+
+def Read(*args, **kwargs):
+  from google.cloud.dataflow import io
+  return io.Read(*args, **kwargs)
+
+
+def Write(*args, **kwargs):
+  from google.cloud.dataflow import io
+  return io.Write(*args, **kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/transforms/cy_combiners.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/cy_combiners.pxd b/sdks/python/apache_beam/transforms/cy_combiners.pxd
new file mode 100644
index 0000000..d0ab833
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/cy_combiners.pxd
@@ -0,0 +1,89 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cimport cython
+from libc.stdint cimport int64_t, INT64_MIN, INT64_MAX
+
+cdef double _NEG_INF, _POS_INF, _NAN
+
+
+cdef class CountAccumulator(object):
+  cdef readonly int64_t value
+  cpdef add_input(self, unused_element)
+  @cython.locals(accumulator=CountAccumulator)
+  cpdef merge(self, accumulators)
+
+cdef class SumInt64Accumulator(object):
+  cdef readonly int64_t value
+  cpdef add_input(self, int64_t element)
+  @cython.locals(accumulator=SumInt64Accumulator)
+  cpdef merge(self, accumulators)
+
+cdef class MinInt64Accumulator(object):
+  cdef readonly int64_t value
+  cpdef add_input(self, int64_t element)
+  @cython.locals(accumulator=MinInt64Accumulator)
+  cpdef merge(self, accumulators)
+
+cdef class MaxInt64Accumulator(object):
+  cdef readonly int64_t value
+  cpdef add_input(self, int64_t element)
+  @cython.locals(accumulator=MaxInt64Accumulator)
+  cpdef merge(self, accumulators)
+
+cdef class MeanInt64Accumulator(object):
+  cdef readonly int64_t sum
+  cdef readonly int64_t count
+  cpdef add_input(self, int64_t element)
+  @cython.locals(accumulator=MeanInt64Accumulator)
+  cpdef merge(self, accumulators)
+
+
+cdef class SumDoubleAccumulator(object):
+  cdef readonly double value
+  cpdef add_input(self, double element)
+  @cython.locals(accumulator=SumDoubleAccumulator)
+  cpdef merge(self, accumulators)
+
+cdef class MinDoubleAccumulator(object):
+  cdef readonly double value
+  cpdef add_input(self, double element)
+  @cython.locals(accumulator=MinDoubleAccumulator)
+  cpdef merge(self, accumulators)
+
+cdef class MaxDoubleAccumulator(object):
+  cdef readonly double value
+  cpdef add_input(self, double element)
+  @cython.locals(accumulator=MaxDoubleAccumulator)
+  cpdef merge(self, accumulators)
+
+cdef class MeanDoubleAccumulator(object):
+  cdef readonly double sum
+  cdef readonly int64_t count
+  cpdef add_input(self, double element)
+  @cython.locals(accumulator=MeanDoubleAccumulator)
+  cpdef merge(self, accumulators)
+
+
+cdef class AllAccumulator(object):
+  cdef readonly bint value
+  cpdef add_input(self, bint element)
+  @cython.locals(accumulator=AllAccumulator)
+  cpdef merge(self, accumulators)
+
+cdef class AnyAccumulator(object):
+  cdef readonly bint value
+  cpdef add_input(self, bint element)
+  @cython.locals(accumulator=AnyAccumulator)
+  cpdef merge(self, accumulators)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/transforms/cy_combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py b/sdks/python/apache_beam/transforms/cy_combiners.py
new file mode 100644
index 0000000..4cc4233
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/cy_combiners.py
@@ -0,0 +1,250 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A library of basic cythonized CombineFn subclasses."""
+
+from __future__ import absolute_import
+
+from google.cloud.dataflow.transforms import core
+
+
+class AccumulatorCombineFn(core.CombineFn):
+  # singleton?
+  def create_accumulator(self):
+    return self._accumulator_type()
+  @staticmethod
+  def add_input(accumulator, element):
+    accumulator.add_input(element)
+    return accumulator
+  def merge_accumulators(self, accumulators):
+    accumulator = self._accumulator_type()
+    accumulator.merge(accumulators)
+    return accumulator
+  @staticmethod
+  def extract_output(accumulator):
+    return accumulator.extract_output()
+  def __eq__(self, other):
+    return (isinstance(other, AccumulatorCombineFn)
+            and self._accumulator_type is other._accumulator_type)
+  def __hash__(self):
+    return hash(self._accumulator_type)
+
+
+_63 = 63  # Avoid large literals in C source code.
+globals()['INT64_MAX'] = 2**_63 - 1
+globals()['INT64_MIN'] = -2**_63
+
+
+class CountAccumulator(object):
+  def __init__(self):
+    self.value = 0
+  def add_input(self, unused_element):
+    self.value += 1
+  def merge(self, accumulators):
+    for accumulator in accumulators:
+      self.value += accumulator.value
+  def extract_output(self):
+    return self.value
+
+
+class SumInt64Accumulator(object):
+  def __init__(self):
+    self.value = 0
+  def add_input(self, element):
+    element = int(element)
+    if not INT64_MIN <= element <= INT64_MAX:
+      raise OverflowError(element)
+    self.value += element
+  def merge(self, accumulators):
+    for accumulator in accumulators:
+      self.value += accumulator.value
+  def extract_output(self):
+    if not INT64_MIN <= self.value <= INT64_MAX:
+      self.value %= 2**64
+      if self.value >= INT64_MAX:
+        self.value -= 2**64
+    return self.value
+
+
+class MinInt64Accumulator(object):
+  def __init__(self):
+    self.value = INT64_MAX
+  def add_input(self, element):
+    element = int(element)
+    if not INT64_MIN <= element <= INT64_MAX:
+      raise OverflowError(element)
+    if element < self.value:
+      self.value = element
+  def merge(self, accumulators):
+    for accumulator in accumulators:
+      if accumulator.value < self.value:
+        self.value = accumulator.value
+  def extract_output(self):
+    return self.value
+
+
+class MaxInt64Accumulator(object):
+  def __init__(self):
+    self.value = INT64_MIN
+  def add_input(self, element):
+    element = int(element)
+    if not INT64_MIN <= element <= INT64_MAX:
+      raise OverflowError(element)
+    if element > self.value:
+      self.value = element
+  def merge(self, accumulators):
+    for accumulator in accumulators:
+      if accumulator.value > self.value:
+        self.value = accumulator.value
+  def extract_output(self):
+    return self.value
+
+
+class MeanInt64Accumulator(object):
+  def __init__(self):
+    self.sum = 0
+    self.count = 0
+  def add_input(self, element):
+    element = int(element)
+    if not INT64_MIN <= element <= INT64_MAX:
+      raise OverflowError(element)
+    self.sum += element
+    self.count += 1
+  def merge(self, accumulators):
+    for accumulator in accumulators:
+      self.sum += accumulator.sum
+      self.count += accumulator.count
+  def extract_output(self):
+    if not INT64_MIN <= self.sum <= INT64_MAX:
+      self.sum %= 2**64
+      if self.sum >= INT64_MAX:
+        self.sum -= 2**64
+    return self.sum / self.count if self.count else _NAN
+
+
+class CountCombineFn(AccumulatorCombineFn):
+  _accumulator_type = CountAccumulator
+class SumInt64Fn(AccumulatorCombineFn):
+  _accumulator_type = SumInt64Accumulator
+class MinInt64Fn(AccumulatorCombineFn):
+  _accumulator_type = MinInt64Accumulator
+class MaxInt64Fn(AccumulatorCombineFn):
+  _accumulator_type = MaxInt64Accumulator
+class MeanInt64Fn(AccumulatorCombineFn):
+  _accumulator_type = MeanInt64Accumulator
+
+
+_POS_INF = float('inf')
+_NEG_INF = float('-inf')
+_NAN = float('nan')
+
+
+class SumDoubleAccumulator(object):
+  def __init__(self):
+    self.value = 0
+  def add_input(self, element):
+    element = float(element)
+    self.value += element
+  def merge(self, accumulators):
+    for accumulator in accumulators:
+      self.value += accumulator.value
+  def extract_output(self):
+    return self.value
+
+
+class MinDoubleAccumulator(object):
+  def __init__(self):
+    self.value = _POS_INF
+  def add_input(self, element):
+    element = float(element)
+    if element < self.value:
+      self.value = element
+  def merge(self, accumulators):
+    for accumulator in accumulators:
+      if accumulator.value < self.value:
+        self.value = accumulator.value
+  def extract_output(self):
+    return self.value
+
+
+class MaxDoubleAccumulator(object):
+  def __init__(self):
+    self.value = _NEG_INF
+  def add_input(self, element):
+    element = float(element)
+    if element > self.value:
+      self.value = element
+  def merge(self, accumulators):
+    for accumulator in accumulators:
+      if accumulator.value > self.value:
+        self.value = accumulator.value
+  def extract_output(self):
+    return self.value
+
+
+class MeanDoubleAccumulator(object):
+  def __init__(self):
+    self.sum = 0
+    self.count = 0
+  def add_input(self, element):
+    element = float(element)
+    self.sum += element
+    self.count += 1
+  def merge(self, accumulators):
+    for accumulator in accumulators:
+      self.sum += accumulator.sum
+      self.count += accumulator.count
+  def extract_output(self):
+    return self.sum / self.count if self.count else _NAN
+
+
+class SumFloatFn(AccumulatorCombineFn):
+  _accumulator_type = SumDoubleAccumulator
+class MinFloatFn(AccumulatorCombineFn):
+  _accumulator_type = MinDoubleAccumulator
+class MaxFloatFn(AccumulatorCombineFn):
+  _accumulator_type = MaxDoubleAccumulator
+class MeanFloatFn(AccumulatorCombineFn):
+  _accumulator_type = MeanDoubleAccumulator
+
+
+class AllAccumulator(object):
+  def __init__(self):
+    self.value = True
+  def add_input(self, element):
+    self.value &= not not element
+  def merge(self, accumulators):
+    for accumulator in accumulators:
+      self.value &= accumulator.value
+  def extract_output(self):
+    return self.value
+
+
+class AnyAccumulator(object):
+  def __init__(self):
+    self.value = False
+  def add_input(self, element):
+    self.value |= not not element
+  def merge(self, accumulators):
+    for accumulator in accumulators:
+      self.value |= accumulator.value
+  def extract_output(self):
+    return self.value
+
+
+class AnyCombineFn(AccumulatorCombineFn):
+  _accumulator_type = AnyAccumulator
+
+class AllCombineFn(AccumulatorCombineFn):
+  _accumulator_type = AllAccumulator