You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:13:09 UTC

[34/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
new file mode 100644
index 0000000..ec87f46
--- /dev/null
+++ b/sdks/python/apache_beam/pipeline.py
@@ -0,0 +1,435 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Pipeline, the top-level Dataflow object.
+
+A pipeline holds a DAG of data transforms. Conceptually the nodes of the DAG
+are transforms (PTransform objects) and the edges are values (mostly PCollection
+objects). The transforms take as inputs one or more PValues and output one or
+more PValues.
+
+The pipeline offers functionality to traverse the graph.  The actual operation
+to be executed for each node visited is specified through a runner object.
+
+Typical usage:
+
+  # Create a pipeline object using a local runner for execution.
+  pipeline = Pipeline(runner=DirectPipelineRunner())
+
+  # Add to the pipeline a "Create" transform. When executed this
+  # transform will produce a PCollection object with the specified values.
+  pcoll = pipeline.create('label', [1, 2, 3])
+
+  # run() will execute the DAG stored in the pipeline.  The execution of the
+  # nodes visited is done using the specified local runner.
+  pipeline.run()
+
+"""
+
+from __future__ import absolute_import
+
+import collections
+import logging
+import os
+import shutil
+import tempfile
+
+from google.cloud.dataflow import pvalue
+from google.cloud.dataflow import typehints
+from google.cloud.dataflow.internal import pickler
+from google.cloud.dataflow.runners import create_runner
+from google.cloud.dataflow.runners import PipelineRunner
+from google.cloud.dataflow.transforms import format_full_label
+from google.cloud.dataflow.transforms import ptransform
+from google.cloud.dataflow.typehints import TypeCheckError
+from google.cloud.dataflow.utils.options import PipelineOptions
+from google.cloud.dataflow.utils.options import SetupOptions
+from google.cloud.dataflow.utils.options import StandardOptions
+from google.cloud.dataflow.utils.options import TypeOptions
+from google.cloud.dataflow.utils.pipeline_options_validator import PipelineOptionsValidator
+
+
+class Pipeline(object):
+  """A pipeline object that manages a DAG of PValues and their PTransforms.
+
+  Conceptually the PValues are the DAG's nodes and the PTransforms computing
+  the PValues are the edges.
+
+  All the transforms applied to the pipeline must have distinct full labels.
+  If same transform instance needs to be applied then a clone should be created
+  with a new label (e.g., transform.clone('new label')).
+  """
+
+  def __init__(self, runner=None, options=None, argv=None):
+    """Initialize a pipeline object.
+
+    Args:
+      runner: An object of type 'PipelineRunner' that will be used to execute
+        the pipeline. For registered runners, the runner name can be specified,
+        otherwise a runner object must be supplied.
+      options: A configured 'PipelineOptions' object containing arguments
+        that should be used for running the Dataflow job.
+      argv: a list of arguments (such as sys.argv) to be used for building a
+        'PipelineOptions' object. This will only be used if argument 'options'
+        is None.
+
+    Raises:
+      ValueError: if either the runner or options argument is not of the
+      expected type.
+    """
+
+    if options is not None:
+      if isinstance(options, PipelineOptions):
+        self.options = options
+      else:
+        raise ValueError(
+            'Parameter options, if specified, must be of type PipelineOptions. '
+            'Received : %r', options)
+    elif argv is not None:
+      if isinstance(argv, list):
+        self.options = PipelineOptions(argv)
+      else:
+        raise ValueError(
+            'Parameter argv, if specified, must be a list. Received : %r', argv)
+    else:
+      self.options = None
+
+    if runner is None and self.options is not None:
+      runner = self.options.view_as(StandardOptions).runner
+      if runner is None:
+        runner = StandardOptions.DEFAULT_RUNNER
+        logging.info(('Missing pipeline option (runner). Executing pipeline '
+                      'using the default runner: %s.'), runner)
+
+    if isinstance(runner, str):
+      runner = create_runner(runner)
+    elif not isinstance(runner, PipelineRunner):
+      raise TypeError('Runner must be a PipelineRunner object or the '
+                      'name of a registered runner.')
+
+    # Validate pipeline options
+    if self.options is not None:
+      errors = PipelineOptionsValidator(self.options, runner).validate()
+      if errors:
+        raise ValueError(
+            'Pipeline has validations errors: \n' + '\n'.join(errors))
+
+    # Default runner to be used.
+    self.runner = runner
+    # Stack of transforms generated by nested apply() calls. The stack will
+    # contain a root node as an enclosing (parent) node for top transforms.
+    self.transforms_stack = [AppliedPTransform(None, None, '', None)]
+    # Set of transform labels (full labels) applied to the pipeline.
+    # If a transform is applied and the full label is already in the set
+    # then the transform will have to be cloned with a new label.
+    self.applied_labels = set()
+    # Store cache of views created from PCollections.  For reference, see
+    # pvalue._cache_view().
+    self._view_cache = {}
+
+  def _current_transform(self):
+    """Returns the transform currently on the top of the stack."""
+    return self.transforms_stack[-1]
+
+  def _root_transform(self):
+    """Returns the root transform of the transform stack."""
+    return self.transforms_stack[0]
+
+  def run(self):
+    """Runs the pipeline. Returns whatever our runner returns after running."""
+    if not self.options or self.options.view_as(SetupOptions).save_main_session:
+      # If this option is chosen, verify we can pickle the main session early.
+      tmpdir = tempfile.mkdtemp()
+      try:
+        pickler.dump_session(os.path.join(tmpdir, 'main_session.pickle'))
+      finally:
+        shutil.rmtree(tmpdir)
+    return self.runner.run(self)
+
+  def __enter__(self):
+    return self
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    if not exc_type:
+      self.run()
+
+  def visit(self, visitor):
+    """Visits depth-first every node of a pipeline's DAG.
+
+    Args:
+      visitor: PipelineVisitor object whose callbacks will be called for each
+        node visited. See PipelineVisitor comments.
+
+    Raises:
+      TypeError: if node is specified and is not a PValue.
+      pipeline.PipelineError: if node is specified and does not belong to this
+        pipeline instance.
+    """
+
+    visited = set()
+    self._root_transform().visit(visitor, self, visited)
+
+  def apply(self, transform, pvalueish=None):
+    """Applies a custom transform using the pvalueish specified.
+
+    Args:
+      transform: the PTranform (or callable) to apply.
+      pvalueish: the input for the PTransform (typically a PCollection).
+
+    Raises:
+      TypeError: if the transform object extracted from the argument list is
+        not a callable type or a descendant from PTransform.
+      RuntimeError: if the transform object was already applied to this pipeline
+        and needs to be cloned in order to apply again.
+    """
+    if not isinstance(transform, ptransform.PTransform):
+      transform = _CallableWrapperPTransform(transform)
+
+    full_label = format_full_label(self._current_transform(), transform)
+    if full_label in self.applied_labels:
+      raise RuntimeError(
+          'Transform "%s" does not have a stable unique label. '
+          'This will prevent updating of pipelines. '
+          'To clone a transform with a new label use: '
+          'transform.clone("NEW LABEL").'
+          % full_label)
+    self.applied_labels.add(full_label)
+
+    pvalueish, inputs = transform._extract_input_pvalues(pvalueish)
+    try:
+      inputs = tuple(inputs)
+      for leaf_input in inputs:
+        if not isinstance(leaf_input, pvalue.PValue):
+          raise TypeError
+    except TypeError:
+      raise NotImplementedError(
+          'Unable to extract PValue inputs from %s; either %s does not accept '
+          'inputs of this format, or it does not properly override '
+          '_extract_input_values' % (pvalueish, transform))
+
+    current = AppliedPTransform(
+        self._current_transform(), transform, full_label, inputs)
+    self._current_transform().add_part(current)
+    self.transforms_stack.append(current)
+
+    if self.options is not None:
+      type_options = self.options.view_as(TypeOptions)
+    else:
+      type_options = None
+
+    if type_options is not None and type_options.pipeline_type_check:
+      transform.type_check_inputs(pvalueish)
+
+    pvalueish_result = self.runner.apply(transform, pvalueish)
+
+    if type_options is not None and type_options.pipeline_type_check:
+      transform.type_check_outputs(pvalueish_result)
+
+    for result in ptransform.GetPValues().visit(pvalueish_result):
+      assert isinstance(result, (pvalue.PValue, pvalue.DoOutputsTuple))
+
+      # Make sure we set the producer only for a leaf node in the transform DAG.
+      # This way we preserve the last transform of a composite transform as
+      # being the real producer of the result.
+      if result.producer is None:
+        result.producer = current
+      # TODO(robertwb): Multi-input, multi-output inference.
+      # TODO(robertwb): Ideally we'd do intersection here.
+      if (type_options is not None and type_options.pipeline_type_check and
+          isinstance(result, (pvalue.PCollection, pvalue.PCollectionView))
+          and not result.element_type):
+        input_element_type = (
+            inputs[0].element_type
+            if len(inputs) == 1
+            else typehints.Any)
+        type_hints = transform.get_type_hints()
+        declared_output_type = type_hints.simple_output_type(transform.label)
+        if declared_output_type:
+          input_types = type_hints.input_types
+          if input_types and input_types[0]:
+            declared_input_type = input_types[0][0]
+            result.element_type = typehints.bind_type_variables(
+                declared_output_type,
+                typehints.match_type_variables(declared_input_type,
+                                               input_element_type))
+          else:
+            result.element_type = declared_output_type
+        else:
+          result.element_type = transform.infer_output_type(input_element_type)
+
+      assert isinstance(result.producer.inputs, tuple)
+      current.add_output(result)
+
+    if (type_options is not None and
+        type_options.type_check_strictness == 'ALL_REQUIRED' and
+        transform.get_type_hints().output_types is None):
+      ptransform_name = '%s(%s)' % (transform.__class__.__name__, full_label)
+      raise TypeCheckError('Pipeline type checking is enabled, however no '
+                           'output type-hint was found for the '
+                           'PTransform %s' % ptransform_name)
+
+    current.update_input_refcounts()
+    self.transforms_stack.pop()
+    return pvalueish_result
+
+
+class _CallableWrapperPTransform(ptransform.PTransform):
+
+  def __init__(self, callee):
+    assert callable(callee)
+    super(_CallableWrapperPTransform, self).__init__(
+        label=getattr(callee, '__name__', 'Callable'))
+    self._callee = callee
+
+  def apply(self, *args, **kwargs):
+    return self._callee(*args, **kwargs)
+
+
+class PipelineVisitor(object):
+  """Visitor pattern class used to traverse a DAG of transforms.
+
+  This is an internal class used for bookkeeping by a Pipeline.
+  """
+
+  def visit_value(self, value, producer_node):
+    """Callback for visiting a PValue in the pipeline DAG.
+
+    Args:
+      value: PValue visited (typically a PCollection instance).
+      producer_node: AppliedPTransform object whose transform produced the
+        pvalue.
+    """
+    pass
+
+  def visit_transform(self, transform_node):
+    """Callback for visiting a transform node in the pipeline DAG."""
+    pass
+
+  def enter_composite_transform(self, transform_node):
+    """Callback for entering traversal of a composite transform node."""
+    pass
+
+  def leave_composite_transform(self, transform_node):
+    """Callback for leaving traversal of a composite transform node."""
+    pass
+
+
+class AppliedPTransform(object):
+  """A transform node representing an instance of applying a PTransform.
+
+  This is an internal class used for bookkeeping by a Pipeline.
+  """
+
+  def __init__(self, parent, transform, full_label, inputs):
+    self.parent = parent
+    self.transform = transform
+    # Note that we want the PipelineVisitor classes to use the full_label,
+    # inputs, side_inputs, and outputs fields from this instance instead of the
+    # ones of the PTransform instance associated with it. Doing this permits
+    # reusing PTransform instances in different contexts (apply() calls) without
+    # any interference. This is particularly useful for composite transforms.
+    self.full_label = full_label
+    self.inputs = inputs or ()
+    self.side_inputs = () if transform is None else tuple(transform.side_inputs)
+    self.outputs = {}
+    self.parts = []
+
+    # Per tag refcount dictionary for PValues for which this node is a
+    # root producer.
+    self.refcounts = collections.defaultdict(int)
+
+  def update_input_refcounts(self):
+    """Increment refcounts for all transforms providing inputs."""
+
+    def real_producer(pv):
+      real = pv.producer
+      while real.parts:
+        real = real.parts[-1]
+      return real
+
+    if not self.is_composite():
+      for main_input in self.inputs:
+        if not isinstance(main_input, pvalue.PBegin):
+          real_producer(main_input).refcounts[main_input.tag] += 1
+      for side_input in self.side_inputs:
+        real_producer(side_input).refcounts[side_input.tag] += 1
+
+  def add_output(self, output, tag=None):
+    assert (isinstance(output, pvalue.PValue) or
+            isinstance(output, pvalue.DoOutputsTuple))
+    if tag is None:
+      tag = len(self.outputs)
+    assert tag not in self.outputs
+    self.outputs[tag] = output
+
+  def add_part(self, part):
+    assert isinstance(part, AppliedPTransform)
+    self.parts.append(part)
+
+  def is_composite(self):
+    """Returns whether this is a composite transform.
+
+    A composite transform has parts (inner transforms) or isn't the
+    producer for any of its outputs. (An example of a transform that
+    is not a producer is one that returns its inputs instead.)
+    """
+    return bool(self.parts) or all(
+        pval.producer is not self for pval in self.outputs.values())
+
+  def visit(self, visitor, pipeline, visited):
+    """Visits all nodes reachable from the current node."""
+
+    for pval in self.inputs:
+      if pval not in visited and not isinstance(pval, pvalue.PBegin):
+        assert pval.producer is not None
+        pval.producer.visit(visitor, pipeline, visited)
+        # The value should be visited now since we visit outputs too.
+        assert pval in visited, pval
+
+    # Visit side inputs.
+    for pval in self.side_inputs:
+      if isinstance(pval, pvalue.PCollectionView) and pval not in visited:
+        assert pval.producer is not None
+        pval.producer.visit(visitor, pipeline, visited)
+        # The value should be visited now since we visit outputs too.
+        assert pval in visited
+        # TODO(silviuc): Is there a way to signal that we are visiting a side
+        # value? The issue is that the same PValue can be reachable through
+        # multiple paths and therefore it is not guaranteed that the value
+        # will be visited as a side value.
+
+    # Visit a composite or primitive transform.
+    if self.is_composite():
+      visitor.enter_composite_transform(self)
+      for part in self.parts:
+        part.visit(visitor, pipeline, visited)
+      visitor.leave_composite_transform(self)
+    else:
+      visitor.visit_transform(self)
+
+    # Visit the outputs (one or more). It is essential to mark as visited the
+    # tagged PCollections of the DoOutputsTuple object. A tagged PCollection is
+    # connected directly with its producer (a multi-output ParDo), but the
+    # output of such a transform is the containing DoOutputsTuple, not the
+    # PCollection inside it. Without the code below a tagged PCollection will
+    # not be marked as visited while visiting its producer.
+    for pval in self.outputs.values():
+      if isinstance(pval, pvalue.DoOutputsTuple):
+        pvals = (v for v in pval)
+      else:
+        pvals = (pval,)
+      for v in pvals:
+        if v not in visited:
+          visited.add(v)
+          visitor.visit_value(v, self)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
new file mode 100644
index 0000000..ce3bd6d
--- /dev/null
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -0,0 +1,345 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for the Pipeline class."""
+
+import gc
+import logging
+import unittest
+
+from google.cloud.dataflow.io.iobase import NativeSource
+from google.cloud.dataflow.pipeline import Pipeline
+from google.cloud.dataflow.pipeline import PipelineOptions
+from google.cloud.dataflow.pipeline import PipelineVisitor
+from google.cloud.dataflow.pvalue import AsIter
+from google.cloud.dataflow.pvalue import SideOutputValue
+from google.cloud.dataflow.transforms import CombinePerKey
+from google.cloud.dataflow.transforms import Create
+from google.cloud.dataflow.transforms import FlatMap
+from google.cloud.dataflow.transforms import Flatten
+from google.cloud.dataflow.transforms import Map
+from google.cloud.dataflow.transforms import PTransform
+from google.cloud.dataflow.transforms import Read
+from google.cloud.dataflow.transforms.util import assert_that, equal_to
+
+
+class FakeSource(NativeSource):
+  """Fake source returning a fixed list of values."""
+
+  class _Reader(object):
+
+    def __init__(self, vals):
+      self._vals = vals
+
+    def __enter__(self):
+      return self
+
+    def __exit__(self, exception_type, exception_value, traceback):
+      pass
+
+    def __iter__(self):
+      for v in self._vals:
+        yield v
+
+  def __init__(self, vals):
+    self._vals = vals
+
+  def reader(self):
+    return FakeSource._Reader(self._vals)
+
+
+class PipelineTest(unittest.TestCase):
+
+  def setUp(self):
+    self.runner_name = 'DirectPipelineRunner'
+
+  @staticmethod
+  def custom_callable(pcoll):
+    return pcoll | FlatMap('+1', lambda x: [x + 1])
+
+  # Some of these tests designate a runner by name, others supply a runner.
+  # This variation is just to verify that both means of runner specification
+  # work and is not related to other aspects of the tests.
+
+  class CustomTransform(PTransform):
+
+    def apply(self, pcoll):
+      return pcoll | FlatMap('+1', lambda x: [x + 1])
+
+  class Visitor(PipelineVisitor):
+
+    def __init__(self, visited):
+      self.visited = visited
+      self.enter_composite = []
+      self.leave_composite = []
+
+    def visit_value(self, value, _):
+      self.visited.append(value)
+
+    def enter_composite_transform(self, transform_node):
+      self.enter_composite.append(transform_node)
+
+    def leave_composite_transform(self, transform_node):
+      self.leave_composite.append(transform_node)
+
+  def test_create(self):
+    pipeline = Pipeline(self.runner_name)
+    pcoll = pipeline | Create('label1', [1, 2, 3])
+    assert_that(pcoll, equal_to([1, 2, 3]))
+
+    # Test if initial value is an iterator object.
+    pcoll2 = pipeline | Create('label2', iter((4, 5, 6)))
+    pcoll3 = pcoll2 | FlatMap('do', lambda x: [x + 10])
+    assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3')
+    pipeline.run()
+
+  def test_create_singleton_pcollection(self):
+    pipeline = Pipeline(self.runner_name)
+    pcoll = pipeline | Create('label', [[1, 2, 3]])
+    assert_that(pcoll, equal_to([[1, 2, 3]]))
+    pipeline.run()
+
+  def test_read(self):
+    pipeline = Pipeline(self.runner_name)
+    pcoll = pipeline | Read('read', FakeSource([1, 2, 3]))
+    assert_that(pcoll, equal_to([1, 2, 3]))
+    pipeline.run()
+
+  def test_visit_entire_graph(self):
+    pipeline = Pipeline(self.runner_name)
+    pcoll1 = pipeline | Create('pcoll', [1, 2, 3])
+    pcoll2 = pcoll1 | FlatMap('do1', lambda x: [x + 1])
+    pcoll3 = pcoll2 | FlatMap('do2', lambda x: [x + 1])
+    pcoll4 = pcoll2 | FlatMap('do3', lambda x: [x + 1])
+    transform = PipelineTest.CustomTransform()
+    pcoll5 = pcoll4 | transform
+
+    visitor = PipelineTest.Visitor(visited=[])
+    pipeline.visit(visitor)
+    self.assertEqual(set([pcoll1, pcoll2, pcoll3, pcoll4, pcoll5]),
+                     set(visitor.visited))
+    self.assertEqual(set(visitor.enter_composite),
+                     set(visitor.leave_composite))
+    self.assertEqual(2, len(visitor.enter_composite))
+    self.assertEqual(visitor.enter_composite[1].transform, transform)
+    self.assertEqual(visitor.leave_composite[0].transform, transform)
+
+  def test_apply_custom_transform(self):
+    pipeline = Pipeline(self.runner_name)
+    pcoll = pipeline | Create('pcoll', [1, 2, 3])
+    result = pcoll | PipelineTest.CustomTransform()
+    assert_that(result, equal_to([2, 3, 4]))
+    pipeline.run()
+
+  def test_reuse_custom_transform_instance(self):
+    pipeline = Pipeline(self.runner_name)
+    pcoll1 = pipeline | Create('pcoll1', [1, 2, 3])
+    pcoll2 = pipeline | Create('pcoll2', [4, 5, 6])
+    transform = PipelineTest.CustomTransform()
+    pcoll1 | transform
+    with self.assertRaises(RuntimeError) as cm:
+      pipeline.apply(transform, pcoll2)
+    self.assertEqual(
+        cm.exception.message,
+        'Transform "CustomTransform" does not have a stable unique label. '
+        'This will prevent updating of pipelines. '
+        'To clone a transform with a new label use: '
+        'transform.clone("NEW LABEL").')
+
+  def test_reuse_cloned_custom_transform_instance(self):
+    pipeline = Pipeline(self.runner_name)
+    pcoll1 = pipeline | Create('pcoll1', [1, 2, 3])
+    pcoll2 = pipeline | Create('pcoll2', [4, 5, 6])
+    transform = PipelineTest.CustomTransform()
+    result1 = pcoll1 | transform
+    result2 = pcoll2 | transform.clone('new label')
+    assert_that(result1, equal_to([2, 3, 4]), label='r1')
+    assert_that(result2, equal_to([5, 6, 7]), label='r2')
+    pipeline.run()
+
+  def test_apply_custom_callable(self):
+    pipeline = Pipeline(self.runner_name)
+    pcoll = pipeline | Create('pcoll', [1, 2, 3])
+    result = pipeline.apply(PipelineTest.custom_callable, pcoll)
+    assert_that(result, equal_to([2, 3, 4]))
+    pipeline.run()
+
+  def test_transform_no_super_init(self):
+    class AddSuffix(PTransform):
+
+      def __init__(self, suffix):
+        # No call to super(...).__init__
+        self.suffix = suffix
+
+      def apply(self, pcoll):
+        return pcoll | Map(lambda x: x + self.suffix)
+
+    self.assertEqual(
+        ['a-x', 'b-x', 'c-x'],
+        sorted(['a', 'b', 'c'] | AddSuffix('-x')))
+
+  def test_cached_pvalues_are_refcounted(self):
+    """Test that cached PValues are refcounted and deleted.
+
+    The intermediary PValues computed by the workflow below contain
+    one million elements so if the refcounting does not work the number of
+    objects tracked by the garbage collector will increase by a few millions
+    by the time we execute the final Map checking the objects tracked.
+    Anything that is much larger than what we started with will fail the test.
+    """
+    def check_memory(value, count_threshold):
+      gc.collect()
+      objects_count = len(gc.get_objects())
+      if objects_count > count_threshold:
+        raise RuntimeError(
+            'PValues are not refcounted: %s, %s' % (
+                objects_count, count_threshold))
+      return value
+
+    def create_dupes(o, _):
+      yield o
+      yield SideOutputValue('side', o)
+
+    pipeline = Pipeline('DirectPipelineRunner')
+
+    gc.collect()
+    count_threshold = len(gc.get_objects()) + 10000
+    biglist = pipeline | Create('oom:create', ['x'] * 1000000)
+    dupes = (
+        biglist
+        | Map('oom:addone', lambda x: (x, 1))
+        | FlatMap('oom:dupes', create_dupes,
+                  AsIter(biglist)).with_outputs('side', main='main'))
+    result = (
+        (dupes.side, dupes.main, dupes.side)
+        | Flatten('oom:flatten')
+        | CombinePerKey('oom:combine', sum)
+        | Map('oom:check', check_memory, count_threshold))
+
+    assert_that(result, equal_to([('x', 3000000)]))
+    pipeline.run()
+    self.assertEqual(
+        pipeline.runner.debug_counters['element_counts'],
+        {
+            'oom:flatten': 3000000,
+            ('oom:combine/GroupByKey/reify_windows', None): 3000000,
+            ('oom:dupes/oom:dupes', 'side'): 1000000,
+            ('oom:dupes/oom:dupes', None): 1000000,
+            'oom:create': 1000000,
+            ('oom:addone', None): 1000000,
+            'oom:combine/GroupByKey/group_by_key': 1,
+            ('oom:check', None): 1,
+            'assert_that/singleton': 1,
+            ('assert_that/Map(match)', None): 1,
+            ('oom:combine/GroupByKey/group_by_window', None): 1,
+            ('oom:combine/Combine/ParDo(CombineValuesDoFn)', None): 1})
+
+  def test_pipeline_as_context(self):
+    def raise_exception(exn):
+      raise exn
+    with self.assertRaises(ValueError):
+      with Pipeline(self.runner_name) as p:
+        # pylint: disable=expression-not-assigned
+        p | Create([ValueError]) | Map(raise_exception)
+
+  def test_eager_pipeline(self):
+    p = Pipeline('EagerPipelineRunner')
+    self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x))
+
+
+class DiskCachedRunnerPipelineTest(PipelineTest):
+
+  def setUp(self):
+    self.runner_name = 'DiskCachedPipelineRunner'
+
+  def test_cached_pvalues_are_refcounted(self):
+    # Takes long with disk spilling.
+    pass
+
+  def test_eager_pipeline(self):
+    # Tests eager runner only
+    pass
+
+
+class Bacon(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument('--slices', type=int)
+
+
+class Eggs(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument('--style', default='scrambled')
+
+
+class Breakfast(Bacon, Eggs):
+  pass
+
+
+class PipelineOptionsTest(unittest.TestCase):
+
+  def test_flag_parsing(self):
+    options = Breakfast(['--slices=3', '--style=sunny side up', '--ignored'])
+    self.assertEquals(3, options.slices)
+    self.assertEquals('sunny side up', options.style)
+
+  def test_keyword_parsing(self):
+    options = Breakfast(
+        ['--slices=3', '--style=sunny side up', '--ignored'],
+        slices=10)
+    self.assertEquals(10, options.slices)
+    self.assertEquals('sunny side up', options.style)
+
+  def test_attribute_setting(self):
+    options = Breakfast(slices=10)
+    self.assertEquals(10, options.slices)
+    options.slices = 20
+    self.assertEquals(20, options.slices)
+
+  def test_view_as(self):
+    generic_options = PipelineOptions(['--slices=3'])
+    self.assertEquals(3, generic_options.view_as(Bacon).slices)
+    self.assertEquals(3, generic_options.view_as(Breakfast).slices)
+
+    generic_options.view_as(Breakfast).slices = 10
+    self.assertEquals(10, generic_options.view_as(Bacon).slices)
+
+    with self.assertRaises(AttributeError):
+      generic_options.slices  # pylint: disable=pointless-statement
+
+    with self.assertRaises(AttributeError):
+      generic_options.view_as(Eggs).slices  # pylint: disable=expression-not-assigned
+
+  def test_defaults(self):
+    options = Breakfast(['--slices=3'])
+    self.assertEquals(3, options.slices)
+    self.assertEquals('scrambled', options.style)
+
+  def test_dir(self):
+    options = Breakfast()
+    self.assertEquals(
+        ['from_dictionary', 'get_all_options', 'slices', 'style', 'view_as'],
+        [attr for attr in dir(options) if not attr.startswith('_')])
+    self.assertEquals(
+        ['from_dictionary', 'get_all_options', 'style', 'view_as'],
+        [attr for attr in dir(options.view_as(Eggs))
+         if not attr.startswith('_')])
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.DEBUG)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py
new file mode 100644
index 0000000..5e40706
--- /dev/null
+++ b/sdks/python/apache_beam/pvalue.py
@@ -0,0 +1,459 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""PValue, PCollection: one node of a dataflow graph.
+
+A node of a dataflow processing graph is a PValue. Currently, there is only
+one type: PCollection (a potentially very large set of arbitrary values).
+Once created, a PValue belongs to a pipeline and has an associated
+transform (of type PTransform), which describes how the value will be
+produced when the pipeline gets executed.
+"""
+
+from __future__ import absolute_import
+
+import collections
+
+
+class PValue(object):
+  """Base class for PCollection.
+
+  Dataflow users should not construct PValue objects directly in their
+  pipelines.
+
+  A PValue has the following main characteristics:
+    (1) Belongs to a pipeline. Added during object initialization.
+    (2) Has a transform that can compute the value if executed.
+    (3) Has a value which is meaningful if the transform was executed.
+  """
+
+  def __init__(self, pipeline, tag=None, element_type=None):
+    """Initializes a PValue with all arguments hidden behind keyword arguments.
+
+    Args:
+      pipeline: Pipeline object for this PValue.
+      tag: Tag of this PValue.
+      element_type: The type of this PValue.
+    """
+    self.pipeline = pipeline
+    self.tag = tag
+    self.element_type = element_type
+    # The AppliedPTransform instance for the application of the PTransform
+    # generating this PValue. The field gets initialized when a transform
+    # gets applied.
+    self.producer = None
+
+  def __str__(self):
+    return '<%s>' % self._str_internal()
+
+  def __repr__(self):
+    return '<%s at %s>' % (self._str_internal(), hex(id(self)))
+
+  def _str_internal(self):
+    return '%s transform=%s' % (
+        self.__class__.__name__,
+        self.producer.transform if self.producer else 'n/a')
+
+  def apply(self, *args, **kwargs):
+    """Applies a transform or callable to a PValue.
+
+    Args:
+      *args: positional arguments.
+      **kwargs: keyword arguments.
+
+    The method will insert the pvalue as the next argument following an
+    optional first label and a transform/callable object. It will call the
+    pipeline.apply() method with this modified argument list.
+    """
+    if isinstance(args[0], str):
+      # TODO(robertwb): Make sure labels are properly passed during
+      # ptransform construction and drop this argument.
+      args = args[1:]
+    arglist = list(args)
+    arglist.insert(1, self)
+    return self.pipeline.apply(*arglist, **kwargs)
+
+  def __or__(self, ptransform):
+    return self.pipeline.apply(ptransform, self)
+
+
+class PCollection(PValue):
+  """A multiple values (potentially huge) container.
+
+  Dataflow users should not construct PCollection objects directly in their
+  pipelines.
+  """
+
+  def __init__(self, pipeline, **kwargs):
+    """Initializes a PCollection. Do not call directly."""
+    super(PCollection, self).__init__(pipeline, **kwargs)
+
+  @property
+  def windowing(self):
+    if not hasattr(self, '_windowing'):
+      self._windowing = self.producer.transform.get_windowing(
+          self.producer.inputs)
+    return self._windowing
+
+  def __reduce_ex__(self, unused_version):
+    # Pickling a PCollection is almost always the wrong thing to do, but we
+    # can't prohibit it as it often gets implicitly picked up (e.g. as part
+    # of a closure).
+    return _InvalidUnpickledPCollection, ()
+
+
+class _InvalidUnpickledPCollection(object):
+  pass
+
+
+class PBegin(PValue):
+  """A pipeline begin marker used as input to create/read transforms.
+
+  The class is used internally to represent inputs to Create and Read
+  transforms. This allows us to have transforms that uniformly take PValue(s)
+  as inputs.
+  """
+  pass
+
+
+class PDone(PValue):
+  """PDone is the output of a transform that has a trivial result such as Write.
+  """
+  pass
+
+
+class DoOutputsTuple(object):
+  """An object grouping the multiple outputs of a ParDo or FlatMap transform."""
+
+  def __init__(self, pipeline, transform, tags, main_tag):
+    self._pipeline = pipeline
+    self._tags = tags
+    self._main_tag = main_tag
+    self._transform = transform
+    # The ApplyPTransform instance for the application of the multi FlatMap
+    # generating this value. The field gets initialized when a transform
+    # gets applied.
+    self.producer = None
+    # Dictionary of PCollections already associated with tags.
+    self._pcolls = {}
+
+  def __str__(self):
+    return '<%s>' % self._str_internal()
+
+  def __repr__(self):
+    return '<%s at %s>' % (self._str_internal(), hex(id(self)))
+
+  def _str_internal(self):
+    return '%s main_tag=%s tags=%s transform=%s' % (
+        self.__class__.__name__, self._main_tag, self._tags, self._transform)
+
+  def __iter__(self):
+    """Iterates over tags returning for each call a (tag, pvalue) pair."""
+    if self._main_tag is not None:
+      yield self[self._main_tag]
+    for tag in self._tags:
+      yield self[tag]
+
+  def __getattr__(self, tag):
+    # Special methods which may be accessed before the object is
+    # fully constructed (e.g. in unpickling).
+    if tag[:2] == tag[-2:] == '__':
+      return object.__getattr__(self, tag)
+    return self[tag]
+
+  def __getitem__(self, tag):
+    # Accept int tags so that we can look at Partition tags with the
+    # same ints that we used in the partition function.
+    # TODO(gildea): Consider requiring string-based tags everywhere.
+    # This will require a partition function that does not return ints.
+    if isinstance(tag, int):
+      tag = str(tag)
+    if tag == self._main_tag:
+      tag = None
+    elif self._tags and tag not in self._tags:
+      raise ValueError(
+          'Tag %s is neither the main tag %s nor any of the side tags %s' % (
+              tag, self._main_tag, self._tags))
+    # Check if we accessed this tag before.
+    if tag in self._pcolls:
+      return self._pcolls[tag]
+    if tag is not None:
+      self._transform.side_output_tags.add(tag)
+    pcoll = PCollection(self._pipeline, tag=tag)
+    # Transfer the producer from the DoOutputsTuple to the resulting
+    # PCollection.
+    pcoll.producer = self.producer
+    self.producer.add_output(pcoll, tag)
+    self._pcolls[tag] = pcoll
+    return pcoll
+
+
+class SideOutputValue(object):
+  """An object representing a tagged value.
+
+  ParDo, Map, and FlatMap transforms can emit values on multiple outputs which
+  are distinguished by string tags. The DoFn will return plain values
+  if it wants to emit on the main output and SideOutputValue objects
+  if it wants to emit a value on a specific tagged output.
+  """
+
+  def __init__(self, tag, value):
+    if not isinstance(tag, basestring):
+      raise TypeError(
+          'Attempting to create a SideOutputValue with non-string tag %s' % tag)
+    self.tag = tag
+    self.value = value
+
+
+class PCollectionView(PValue):
+  """An immutable view of a PCollection that can be used as a side input."""
+
+  def __init__(self, pipeline):
+    """Initializes a PCollectionView. Do not call directly."""
+    super(PCollectionView, self).__init__(pipeline)
+
+  @property
+  def windowing(self):
+    if not hasattr(self, '_windowing'):
+      self._windowing = self.producer.transform.get_windowing(
+          self.producer.inputs)
+    return self._windowing
+
+  def _view_options(self):
+    """Internal options corresponding to specific view.
+
+    Intended for internal use by runner implementations.
+
+    Returns:
+      Tuple of options for the given view.
+    """
+    return ()
+
+
+class SingletonPCollectionView(PCollectionView):
+  """A PCollectionView that contains a single object."""
+
+  def __init__(self, pipeline, has_default, default_value):
+    super(SingletonPCollectionView, self).__init__(pipeline)
+    self.has_default = has_default
+    self.default_value = default_value
+
+  def _view_options(self):
+    return (self.has_default, self.default_value)
+
+
+class IterablePCollectionView(PCollectionView):
+  """A PCollectionView that can be treated as an iterable."""
+  pass
+
+
+class ListPCollectionView(PCollectionView):
+  """A PCollectionView that can be treated as a list."""
+  pass
+
+
+class DictPCollectionView(PCollectionView):
+  """A PCollectionView that can be treated as a dict."""
+  pass
+
+
+def _get_cached_view(pipeline, key):
+  return pipeline._view_cache.get(key, None)  # pylint: disable=protected-access
+
+
+def _cache_view(pipeline, key, view):
+  pipeline._view_cache[key] = view  # pylint: disable=protected-access
+
+
+def can_take_label_as_first_argument(callee):
+  """Decorator to allow the "label" kwarg to be passed as the first argument.
+
+  For example, since AsSingleton is annotated with this decorator, this allows
+  the call "AsSingleton(pcoll, label='label1')" to be written more succinctly
+  as "AsSingleton('label1', pcoll)".
+
+  Args:
+    callee: The callable to be called with an optional label argument.
+
+  Returns:
+    Callable that allows (but does not require) a string label as its first
+    argument.
+  """
+  def _inner(maybe_label, *args, **kwargs):
+    if isinstance(maybe_label, basestring):
+      return callee(*args, label=maybe_label, **kwargs)
+    return callee(*((maybe_label,) + args), **kwargs)
+  return _inner
+
+
+def _format_view_label(pcoll):
+  # The monitoring UI doesn't like '/' character in transform labels.
+  if not pcoll.producer:
+    return str(pcoll.tag)
+  return '%s.%s' % (pcoll.producer.full_label.replace('/', '|'),
+                    pcoll.tag)
+
+
+_SINGLETON_NO_DEFAULT = object()
+
+
+@can_take_label_as_first_argument
+def AsSingleton(pcoll, default_value=_SINGLETON_NO_DEFAULT, label=None):  # pylint: disable=invalid-name
+  """Create a SingletonPCollectionView from the contents of input PCollection.
+
+  The input PCollection should contain at most one element (per window) and the
+  resulting PCollectionView can then be used as a side input to PTransforms. If
+  the PCollectionView is empty (for a given window), the side input value will
+  be the default_value, if specified; otherwise, it will be an EmptySideInput
+  object.
+
+  Args:
+    pcoll: Input pcollection.
+    default_value: Default value for the singleton view.
+    label: Label to be specified if several AsSingleton's with different
+      defaults for the same PCollection.
+
+  Returns:
+    A singleton PCollectionView containing the element as above.
+  """
+  label = label or _format_view_label(pcoll)
+  has_default = default_value is not _SINGLETON_NO_DEFAULT
+  if not has_default:
+    default_value = None
+
+  # Don't recreate the view if it was already created.
+  hashable_default_value = ('val', default_value)
+  if not isinstance(default_value, collections.Hashable):
+    # Massage default value to treat as hash key.
+    hashable_default_value = ('id', id(default_value))
+  cache_key = (pcoll, AsSingleton, has_default, hashable_default_value)
+  cached_view = _get_cached_view(pcoll.pipeline, cache_key)
+  if cached_view:
+    return cached_view
+
+  # Local import is required due to dependency loop; even though the
+  # implementation of this function requires concepts defined in modules that
+  # depend on pvalue, it lives in this module to reduce user workload.
+  from google.cloud.dataflow.transforms import sideinputs  # pylint: disable=g-import-not-at-top
+  view = (pcoll | sideinputs.ViewAsSingleton(has_default, default_value,
+                                             label=label))
+  _cache_view(pcoll.pipeline, cache_key, view)
+  return view
+
+
+@can_take_label_as_first_argument
+def AsIter(pcoll, label=None):  # pylint: disable=invalid-name
+  """Create an IterablePCollectionView from the elements of input PCollection.
+
+  The contents of the given PCollection will be available as an iterable in
+  PTransforms that use the returned PCollectionView as a side input.
+
+  Args:
+    pcoll: Input pcollection.
+    label: Label to be specified if several AsIter's for the same PCollection.
+
+  Returns:
+    An iterable PCollectionView containing the elements as above.
+  """
+  label = label or _format_view_label(pcoll)
+
+  # Don't recreate the view if it was already created.
+  cache_key = (pcoll, AsIter)
+  cached_view = _get_cached_view(pcoll.pipeline, cache_key)
+  if cached_view:
+    return cached_view
+
+  # Local import is required due to dependency loop; even though the
+  # implementation of this function requires concepts defined in modules that
+  # depend on pvalue, it lives in this module to reduce user workload.
+  from google.cloud.dataflow.transforms import sideinputs  # pylint: disable=g-import-not-at-top
+  view = (pcoll | sideinputs.ViewAsIterable(label=label))
+  _cache_view(pcoll.pipeline, cache_key, view)
+  return view
+
+
+@can_take_label_as_first_argument
+def AsList(pcoll, label=None):  # pylint: disable=invalid-name
+  """Create a ListPCollectionView from the elements of input PCollection.
+
+  The contents of the given PCollection will be available as a list-like object
+  in PTransforms that use the returned PCollectionView as a side input.
+
+  Args:
+    pcoll: Input pcollection.
+    label: Label to be specified if several AsList's for the same PCollection.
+
+  Returns:
+    A list PCollectionView containing the elements as above.
+  """
+  label = label or _format_view_label(pcoll)
+
+  # Don't recreate the view if it was already created.
+  cache_key = (pcoll, AsList)
+  cached_view = _get_cached_view(pcoll.pipeline, cache_key)
+  if cached_view:
+    return cached_view
+
+  # Local import is required due to dependency loop; even though the
+  # implementation of this function requires concepts defined in modules that
+  # depend on pvalue, it lives in this module to reduce user workload.
+  from google.cloud.dataflow.transforms import sideinputs  # pylint: disable=g-import-not-at-top
+  view = (pcoll | sideinputs.ViewAsList(label=label))
+  _cache_view(pcoll.pipeline, cache_key, view)
+  return view
+
+
+@can_take_label_as_first_argument
+def AsDict(pcoll, label=None):  # pylint: disable=invalid-name
+  """Create a DictPCollectionView from the elements of input PCollection.
+
+  The contents of the given PCollection whose elements are 2-tuples of key and
+  value will be available as a dict-like object in PTransforms that use the
+  returned PCollectionView as a side input.
+
+  Args:
+    pcoll: Input pcollection containing 2-tuples of key and value.
+    label: Label to be specified if several AsDict's for the same PCollection.
+
+  Returns:
+    A dict PCollectionView containing the dict as above.
+  """
+  label = label or _format_view_label(pcoll)
+
+  # Don't recreate the view if it was already created.
+  cache_key = (pcoll, AsDict)
+  cached_view = _get_cached_view(pcoll.pipeline, cache_key)
+  if cached_view:
+    return cached_view
+
+  # Local import is required due to dependency loop; even though the
+  # implementation of this function requires concepts defined in modules that
+  # depend on pvalue, it lives in this module to reduce user workload.
+  from google.cloud.dataflow.transforms import sideinputs  # pylint: disable=g-import-not-at-top
+  view = (pcoll | sideinputs.ViewAsDict(label=label))
+  _cache_view(pcoll.pipeline, cache_key, view)
+  return view
+
+
+class EmptySideInput(object):
+  """Value indicating when a singleton side input was empty.
+
+  If a PCollection was furnished as a singleton side input to a PTransform, and
+  that PCollection was empty, then this value is supplied to the DoFn in the
+  place where a value from a non-empty PCollection would have gone. This alerts
+  the DoFn that the side input PCollection was empty. Users may want to check
+  whether side input values are EmptySideInput, but they will very likely never
+  want to create new instances of this class themselves.
+  """
+  pass

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/pvalue_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue_test.py b/sdks/python/apache_beam/pvalue_test.py
new file mode 100644
index 0000000..d3c1c44
--- /dev/null
+++ b/sdks/python/apache_beam/pvalue_test.py
@@ -0,0 +1,63 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for the PValue and PCollection classes."""
+
+import unittest
+
+from google.cloud.dataflow.pipeline import Pipeline
+from google.cloud.dataflow.pvalue import AsDict
+from google.cloud.dataflow.pvalue import AsIter
+from google.cloud.dataflow.pvalue import AsList
+from google.cloud.dataflow.pvalue import AsSingleton
+from google.cloud.dataflow.pvalue import PValue
+from google.cloud.dataflow.transforms import Create
+
+
+class FakePipeline(Pipeline):
+  """Fake pipeline object used to check if apply() receives correct args."""
+
+  def apply(self, *args, **kwargs):
+    self.args = args
+    self.kwargs = kwargs
+
+
+class PValueTest(unittest.TestCase):
+
+  def test_pvalue_expected_arguments(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    value = PValue(pipeline)
+    self.assertEqual(pipeline, value.pipeline)
+
+  def test_pcollectionview_not_recreated(self):
+    pipeline = Pipeline('DirectPipelineRunner')
+    value = pipeline | Create('create1', [1, 2, 3])
+    value2 = pipeline | Create('create2', [(1, 1), (2, 2), (3, 3)])
+    self.assertEqual(AsSingleton(value), AsSingleton(value))
+    self.assertEqual(AsSingleton('new', value, default_value=1),
+                     AsSingleton('new', value, default_value=1))
+    self.assertNotEqual(AsSingleton(value),
+                        AsSingleton('new', value, default_value=1))
+    self.assertEqual(AsIter(value), AsIter(value))
+    self.assertEqual(AsList(value), AsList(value))
+    self.assertEqual(AsDict(value2), AsDict(value2))
+
+    self.assertNotEqual(AsSingleton(value), AsSingleton(value2))
+    self.assertNotEqual(AsIter(value), AsIter(value2))
+    self.assertNotEqual(AsList(value), AsList(value2))
+    self.assertNotEqual(AsDict(value), AsDict(value2))
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/python_sdk_releases.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/python_sdk_releases.py b/sdks/python/apache_beam/python_sdk_releases.py
new file mode 100644
index 0000000..52e07aa
--- /dev/null
+++ b/sdks/python/apache_beam/python_sdk_releases.py
@@ -0,0 +1,53 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Descriptions of the versions of the SDK.
+
+This manages the features and tests supported by different versions of the
+Dataflow SDK for Python.
+
+To add feature 'foo' to a particular release, add a 'properties' value with
+'feature_foo': True. To remove feature 'foo' from a particular release, add a
+'properties' value with 'feature_foo': False. Features are cumulative and can
+be added and removed multiple times.
+
+By default, all tests are enabled. To remove test 'bar' from a particular
+release, add a 'properties' value with 'test_bar': False. To add it back to a
+subsequent release, add a 'properties' value with 'test_bar': True. Tests are
+cumulative and can be removed and added multiple times.
+
+See go/dataflow-testing for more information.
+"""
+
+OLDEST_SUPPORTED_PYTHON_SDK = 'python-0.1.4'
+
+RELEASES = [
+    {'name': 'python-0.2.7',},
+    {'name': 'python-0.2.6',},
+    {'name': 'python-0.2.5',},
+    {'name': 'python-0.2.4',},
+    {'name': 'python-0.2.3',},
+    {'name': 'python-0.2.2',},
+    {'name': 'python-0.2.1',},
+    {'name': 'python-0.2.0',},
+    {'name': 'python-0.1.5',},
+    {'name': 'python-0.1.4',},
+    {'name': 'python-0.1.3',},
+    {'name': 'python-0.1.2',},
+    {'name': 'python-0.1.1',
+     'properties': {
+         'feature_python': True,
+     }
+    },
+]

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/runners/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/__init__.py b/sdks/python/apache_beam/runners/__init__.py
new file mode 100644
index 0000000..06d1af4
--- /dev/null
+++ b/sdks/python/apache_beam/runners/__init__.py
@@ -0,0 +1,24 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Runner objects execute a Pipeline.
+
+This package defines runners, which are used to execute a pipeline.
+"""
+
+from google.cloud.dataflow.runners.dataflow_runner import DataflowPipelineRunner
+from google.cloud.dataflow.runners.direct_runner import DirectPipelineRunner
+from google.cloud.dataflow.runners.runner import create_runner
+from google.cloud.dataflow.runners.runner import PipelineRunner
+from google.cloud.dataflow.runners.runner import PipelineState

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
new file mode 100644
index 0000000..fa1e3d6
--- /dev/null
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -0,0 +1,28 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cdef type SideOutputValue, TimestampedValue, WindowedValue
+
+cdef class DoFnRunner(object):
+
+  cdef object dofn
+  cdef object window_fn
+  cdef object context
+  cdef object tagged_receivers
+  cdef object logger
+  cdef object step_name
+
+  cdef object main_receivers
+
+  cpdef _process_outputs(self, element, results)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
new file mode 100644
index 0000000..34e480b
--- /dev/null
+++ b/sdks/python/apache_beam/runners/common.py
@@ -0,0 +1,181 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# cython: profile=True
+
+"""Worker operations executor."""
+
+import sys
+
+from google.cloud.dataflow.internal import util
+from google.cloud.dataflow.pvalue import SideOutputValue
+from google.cloud.dataflow.transforms import core
+from google.cloud.dataflow.transforms.window import TimestampedValue
+from google.cloud.dataflow.transforms.window import WindowedValue
+from google.cloud.dataflow.transforms.window import WindowFn
+
+
+class FakeLogger(object):
+  def PerThreadLoggingContext(self, *unused_args, **unused_kwargs):
+    return self
+  def __enter__(self):
+    pass
+  def __exit__(self, *unused_args):
+    pass
+
+
+class DoFnRunner(object):
+  """A helper class for executing ParDo operations.
+  """
+
+  def __init__(self,
+               fn,
+               args,
+               kwargs,
+               side_inputs,
+               windowing,
+               context,
+               tagged_receivers,
+               logger=None,
+               step_name=None):
+    if not args and not kwargs:
+      self.dofn = fn
+    else:
+      args, kwargs = util.insert_values_in_args(args, kwargs, side_inputs)
+
+      class CurriedFn(core.DoFn):
+
+        def start_bundle(self, context):
+          return fn.start_bundle(context, *args, **kwargs)
+
+        def process(self, context):
+          return fn.process(context, *args, **kwargs)
+
+        def finish_bundle(self, context):
+          return fn.finish_bundle(context, *args, **kwargs)
+      self.dofn = CurriedFn()
+    self.window_fn = windowing.windowfn
+    self.context = context
+    self.tagged_receivers = tagged_receivers
+    self.logger = logger or FakeLogger()
+    self.step_name = step_name
+
+    # Optimize for the common case.
+    self.main_receivers = tagged_receivers[None]
+
+  def start(self):
+    self.context.set_element(None)
+    try:
+      self._process_outputs(None, self.dofn.start_bundle(self.context))
+    except BaseException as exn:
+      self.reraise_augmented(exn)
+
+  def finish(self):
+    self.context.set_element(None)
+    try:
+      self._process_outputs(None, self.dofn.finish_bundle(self.context))
+    except BaseException as exn:
+      self.reraise_augmented(exn)
+
+  def process(self, element):
+    try:
+      with self.logger.PerThreadLoggingContext(step_name=self.step_name):
+        self.context.set_element(element)
+        self._process_outputs(element, self.dofn.process(self.context))
+    except BaseException as exn:
+      self.reraise_augmented(exn)
+
+  def reraise_augmented(self, exn):
+    if getattr(exn, '_tagged_with_step', False) or not self.step_name:
+      raise
+    args = exn.args
+    if args and isinstance(args[0], str):
+      args = (args[0] + " [while running '%s']" % self.step_name,) + args[1:]
+      # Poor man's exception chaining.
+      raise type(exn), args, sys.exc_info()[2]
+    else:
+      raise
+
+  def _process_outputs(self, element, results):
+    """Dispatch the result of computation to the appropriate receivers.
+
+    A value wrapped in a SideOutputValue object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      return
+    for result in results:
+      tag = None
+      if isinstance(result, SideOutputValue):
+        tag = result.tag
+        if not isinstance(tag, basestring):
+          raise TypeError('In %s, tag %s is not a string' % (self, tag))
+        result = result.value
+      if isinstance(result, WindowedValue):
+        windowed_value = result
+      elif element is None:
+        # Start and finish have no element from which to grab context,
+        # but may emit elements.
+        if isinstance(result, TimestampedValue):
+          value = result.value
+          timestamp = result.timestamp
+          assign_context = NoContext(value, timestamp)
+        else:
+          value = result
+          timestamp = -1
+          assign_context = NoContext(value)
+        windowed_value = WindowedValue(
+            value, timestamp, self.window_fn.assign(assign_context))
+      elif isinstance(result, TimestampedValue):
+        assign_context = WindowFn.AssignContext(
+            result.timestamp, result.value, element.windows)
+        windowed_value = WindowedValue(
+            result.value, result.timestamp,
+            self.window_fn.assign(assign_context))
+      else:
+        windowed_value = element.with_value(result)
+      if tag is None:
+        self.main_receivers.output(windowed_value)
+      else:
+        self.tagged_receivers[tag].output(windowed_value)
+
+class NoContext(WindowFn.AssignContext):
+  """An uninspectable WindowFn.AssignContext."""
+  NO_VALUE = object()
+  def __init__(self, value, timestamp=NO_VALUE):
+    self.value = value
+    self._timestamp = timestamp
+  @property
+  def timestamp(self):
+    if self._timestamp is self.NO_VALUE:
+      raise ValueError('No timestamp in this context.')
+    else:
+      return self._timestamp
+  @property
+  def existing_windows(self):
+    raise ValueError('No existing_windows in this context.')
+
+
+class DoFnState(object):
+  """Keeps track of state that DoFns want, currently, user counters.
+  """
+
+  def __init__(self, counter_factory):
+    self.step_name = ''
+    self._counter_factory = counter_factory
+
+  def counter_for(self, aggregator):
+    """Looks up the counter for this aggregator, creating one if necessary."""
+    return self._counter_factory.get_aggregator_counter(
+        self.step_name, aggregator)