You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/18 16:52:30 UTC

[2/3] incubator-beam git commit: Python in process runner with bundled execution

Python in process runner with bundled execution

Executes batch pipeline in bundles in parallel.

It has tests for running against existing DirectRunner related tests. It
is self contained currently in its own folder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4f64d0a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4f64d0a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4f64d0a4

Branch: refs/heads/python-sdk
Commit: 4f64d0a4f9e2821fc598429e49fd2ae30a3db493
Parents: b8e3a7b
Author: Ahmet Altay <al...@google.com>
Authored: Wed Jul 6 15:51:50 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jul 18 09:51:43 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py            |   6 +
 .../apache_beam/runners/inprocess/__init__.py   |  19 +
 .../runners/inprocess/bundle_factory.py         | 102 ++++
 .../apache_beam/runners/inprocess/clock.py      |  50 ++
 .../consumer_tracking_pipeline_visitor.py       |  59 ++
 .../consumer_tracking_pipeline_visitor_test.py  | 122 ++++
 .../inprocess/inprocess_evaluation_context.py   | 297 ++++++++++
 .../runners/inprocess/inprocess_executor.py     | 550 +++++++++++++++++++
 .../runners/inprocess/inprocess_runner.py       | 142 +++++
 .../runners/inprocess/inprocess_runner_test.py  | 117 ++++
 .../inprocess/inprocess_transform_result.py     |  60 ++
 .../inprocess/inprocess_watermark_manager.py    | 224 ++++++++
 .../runners/inprocess/transform_evaluator.py    | 543 ++++++++++++++++++
 13 files changed, 2291 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f64d0a4/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index f532077..8e657d7 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -26,8 +26,10 @@ import os
 import re
 import shutil
 import tempfile
+import threading
 import time
 import zlib
+import weakref
 
 from apache_beam import coders
 from apache_beam.io import iobase
@@ -535,6 +537,10 @@ class FileSink(iobase.Sink):
         return(None, e)
       return (final_name, None)
 
+    # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a
+    # child thread. (http://bugs.python.org/issue10015)
+    if not hasattr(threading.current_thread(), "_children"):
+      threading.current_thread()._children = weakref.WeakKeyDictionary()
     rename_results = ThreadPool(num_threads).map(_rename_file, rename_ops)
 
     for final_name, err in rename_results:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f64d0a4/sdks/python/apache_beam/runners/inprocess/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/__init__.py b/sdks/python/apache_beam/runners/inprocess/__init__.py
new file mode 100644
index 0000000..53e725a
--- /dev/null
+++ b/sdks/python/apache_beam/runners/inprocess/__init__.py
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Inprocess runner executes pipelines locally in a single process."""
+from apache_beam.runners.inprocess.inprocess_runner import InProcessPipelineRunner

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f64d0a4/sdks/python/apache_beam/runners/inprocess/bundle_factory.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/bundle_factory.py b/sdks/python/apache_beam/runners/inprocess/bundle_factory.py
new file mode 100644
index 0000000..d284449
--- /dev/null
+++ b/sdks/python/apache_beam/runners/inprocess/bundle_factory.py
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A factory that creates UncommittedBundles."""
+
+from __future__ import absolute_import
+
+from apache_beam import pvalue
+
+
+class BundleFactory(object):
+  """BundleFactory creates output bundles to be used by transform evaluators."""
+
+  def create_bundle(self, output_pcollection):
+    return Bundle(output_pcollection)
+
+  def create_empty_committed_bundle(self, output_pcollection):
+    bundle = self.create_bundle(output_pcollection)
+    bundle.commit(None)
+    return bundle
+
+
+# a bundle represents a unit of work that will be processed by a transform.
+class Bundle(object):
+  """Part of a PCollection with output elements.
+
+  Part of a PCollection. Elements are output to a bundle, which will cause them
+  to be executed by PTransform that consume the PCollection this bundle is a
+  part of at a later point. It starts as an uncommitted bundle and can have
+  elements added to it. It needs to be committed to make it immutable before
+  passing it to a downstream ptransform.
+  """
+
+  def __init__(self, pcollection):
+    assert (isinstance(pcollection, pvalue.PCollection)
+            or isinstance(pcollection, pvalue.PCollectionView))
+    self._pcollection = pcollection
+    self._elements = []
+    self._committed = False
+    self._tag = None  # optional tag information for this bundle
+
+  @property
+  def elements(self):
+    """Returns iterable elements. If not committed will return a copy."""
+    if self._committed:
+      return self._elements
+    else:
+      return list(self._elements)
+
+  @property
+  def tag(self):
+    return self._tag
+
+  @tag.setter
+  def tag(self, value):
+    assert not self._tag
+    self._tag = value
+
+  @property
+  def pcollection(self):
+    """PCollection that the elements of this UncommittedBundle belong to."""
+    return self._pcollection
+
+  def add(self, element):
+    """Outputs an element to this bundle.
+
+    Args:
+      element: WindowedValue
+    """
+    assert not self._committed
+    self._elements.append(element)
+
+  def output(self, element):
+    self.add(element)
+
+  def commit(self, synchronized_processing_time):
+    """Commits this bundle.
+
+    Uncommitted bundle will become committed (immutable) after this call.
+
+    Args:
+      synchronized_processing_time: the synchronized processing time at which
+      this bundle was committed
+    """
+    assert not self._committed
+    self._committed = True
+    self._elements = tuple(self._elements)
+    self._synchronized_processing_time = synchronized_processing_time

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f64d0a4/sdks/python/apache_beam/runners/inprocess/clock.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/clock.py b/sdks/python/apache_beam/runners/inprocess/clock.py
new file mode 100644
index 0000000..11e49cd
--- /dev/null
+++ b/sdks/python/apache_beam/runners/inprocess/clock.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Clock implementations for real time processing and testing."""
+
+from __future__ import absolute_import
+
+import time
+
+
+class Clock(object):
+
+  @property
+  def now(self):
+    """Returns the number of milliseconds since epoch."""
+    return int(time.time() * 1000)
+
+
+class MockClock(Clock):
+  """Mock clock implementation for testing."""
+
+  def __init__(self, now_in_ms):
+    self._now_in_ms = now_in_ms
+
+  @property
+  def now(self):
+    return self._now_in_ms
+
+  @now.setter
+  def now(self, value_in_ms):
+    assert value_in_ms >= self._now_in_ms
+    self._now_in_ms = value_in_ms
+
+  def advance(self, duration_in_ms):
+    assert duration_in_ms >= 0
+    self._now_in_ms += duration_in_ms

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f64d0a4/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor.py b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor.py
new file mode 100644
index 0000000..6f1757a
--- /dev/null
+++ b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""ConsumerTrackingPipelineVisitor, a PipelineVisitor object."""
+
+from __future__ import absolute_import
+
+from apache_beam import pvalue
+from apache_beam.pipeline import PipelineVisitor
+
+
+class ConsumerTrackingPipelineVisitor(PipelineVisitor):
+  """Visitor for extracting value-consumer relations from the graph.
+
+  Tracks the AppliedPTransforms that consume each PValue in the Pipeline. This
+  is used to schedule consuming PTransforms to consume input after the upstream
+  transform has produced and committed output.
+  """
+
+  def __init__(self):
+    self.value_to_consumers = {}  # Map from PValue to [AppliedPTransform].
+    self.root_transforms = set()  # set of (root) AppliedPTransforms.
+    self.views = []               # list of PCollectionViews.
+    self.step_names = {}          # Map from AppliedPTransform to String.
+
+    self._num_transforms = 0
+
+  def visit_value(self, value, producer_node):
+    if value:
+      if isinstance(value, pvalue.PCollectionView):
+        self.views.append(value)
+
+  def visit_transform(self, applied_ptransform):
+    inputs = applied_ptransform.inputs
+    if inputs:
+      for input_value in inputs:
+        if isinstance(input_value, pvalue.PBegin):
+          self.root_transforms.add(applied_ptransform)
+        if input_value not in self.value_to_consumers:
+          self.value_to_consumers[input_value] = []
+        self.value_to_consumers[input_value].append(applied_ptransform)
+    else:
+      self.root_transforms.add(applied_ptransform)
+    self.step_names[applied_ptransform] = 's%d' % (self._num_transforms)
+    self._num_transforms += 1

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f64d0a4/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
new file mode 100644
index 0000000..b3b2968
--- /dev/null
+++ b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
@@ -0,0 +1,122 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for consumer_tracking_pipeline_visitor."""
+
+import logging
+import unittest
+
+from apache_beam import pvalue
+from apache_beam.io import Read
+from apache_beam.io import TextFileSource
+from apache_beam.pipeline import Pipeline
+from apache_beam.pvalue import AsList
+from apache_beam.runners.inprocess import InProcessPipelineRunner
+from apache_beam.runners.inprocess.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
+from apache_beam.transforms import CoGroupByKey
+from apache_beam.transforms import Create
+from apache_beam.transforms import DoFn
+from apache_beam.transforms import FlatMap
+from apache_beam.transforms import Flatten
+from apache_beam.transforms import ParDo
+
+# Disable frequent lint warning due to pipe operator for chaining transforms.
+# pylint: disable=expression-not-assigned
+# pylint: disable=pointless-statement
+
+
+class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
+
+  def setUp(self):
+    self.pipeline = Pipeline(InProcessPipelineRunner())
+    self.visitor = ConsumerTrackingPipelineVisitor()
+
+  def test_root_transforms(self):
+    root_create = Create('create', [[1, 2, 3]])
+    root_read = Read('read', TextFileSource('/tmp/somefile'))
+    root_flatten = Flatten('flatten', pipeline=self.pipeline)
+
+    pbegin = pvalue.PBegin(self.pipeline)
+    pcoll_create = pbegin | root_create
+    pbegin | root_read
+    pcoll_create | FlatMap(lambda x: x)
+    [] | root_flatten
+
+    self.pipeline.visit(self.visitor)
+
+    root_transforms = sorted(
+        [t.transform for t in self.visitor.root_transforms])
+    self.assertEqual(root_transforms, sorted(
+        [root_read, root_create, root_flatten]))
+
+    pbegin_consumers = sorted(
+        [c.transform for c in self.visitor.value_to_consumers[pbegin]])
+    self.assertEqual(pbegin_consumers, sorted([root_read, root_create]))
+    self.assertEqual(len(self.visitor.step_names), 4)
+
+  def test_side_inputs(self):
+
+    class SplitNumbersFn(DoFn):
+
+      def process(self, context):
+        if context.element < 0:
+          yield pvalue.SideOutputValue('tag_negative', context.element)
+        else:
+          yield context.element
+
+    class ProcessNumbersFn(DoFn):
+
+      def process(self, context, negatives):
+        yield context.element
+
+    root_create = Create('create', [[-1, 2, 3]])
+
+    result = (self.pipeline
+              | root_create
+              | ParDo(SplitNumbersFn()).with_outputs('tag_negative',
+                                                     main='positive'))
+    positive, negative = result
+    positive | ParDo(ProcessNumbersFn(), AsList(negative))
+
+    self.pipeline.visit(self.visitor)
+
+    root_transforms = sorted(
+        [t.transform for t in self.visitor.root_transforms])
+    self.assertEqual(root_transforms, sorted([root_create]))
+    self.assertEqual(len(self.visitor.step_names), 4)
+    self.assertEqual(len(self.visitor.views), 1)
+    self.assertTrue(isinstance(self.visitor.views[0],
+                               pvalue.ListPCollectionView))
+
+  def test_co_group_by_key(self):
+    emails = self.pipeline | Create('email', [('joe', 'joe@example.com')])
+    phones = self.pipeline | Create('phone', [('mary', '111-222-3333')])
+    {'emails': emails, 'phones': phones} | CoGroupByKey()
+
+    self.pipeline.visit(self.visitor)
+
+    root_transforms = sorted(
+        [t.transform for t in self.visitor.root_transforms])
+    self.assertEqual(len(root_transforms), 2)
+    self.assertGreater(
+        len(self.visitor.step_names), 3)  # 2 creates + expanded CoGBK
+    self.assertEqual(len(self.visitor.views), 0)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.DEBUG)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f64d0a4/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
new file mode 100644
index 0000000..9c8b695
--- /dev/null
+++ b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
@@ -0,0 +1,297 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""InProcessEvaluationContext tracks global state, triggers and watermarks."""
+
+from __future__ import absolute_import
+
+import collections
+import threading
+
+from apache_beam.pvalue import DictPCollectionView
+from apache_beam.pvalue import EmptySideInput
+from apache_beam.pvalue import IterablePCollectionView
+from apache_beam.pvalue import ListPCollectionView
+from apache_beam.pvalue import SingletonPCollectionView
+from apache_beam.runners.inprocess.clock import Clock
+from apache_beam.runners.inprocess.inprocess_watermark_manager import InProcessWatermarkManager
+from apache_beam.runners.inprocess.inprocess_executor import TransformExecutor
+from apache_beam.utils import counters
+
+
+class _InProcessExecutionContext(object):
+
+  def __init__(self, watermarks, existing_state):
+    self._watermarks = watermarks
+    self._existing_state = existing_state
+
+  @property
+  def watermarks(self):
+    return self._watermarks
+
+  @property
+  def existing_state(self):
+    return self._existing_state
+
+
+class _InProcessSideInputView(object):
+
+  def __init__(self, view):
+    self._view = view
+    self.callable_queue = collections.deque()
+    self.value = None
+    self.has_result = False
+
+
+class _InProcessSideInputsContainer(object):
+  """An in-process container for PCollectionViews.
+
+  It provides methods for blocking until a side-input is available and writing
+  to a side input.
+  """
+
+  def __init__(self, views):
+    self._lock = threading.Lock()
+    self._views = {}
+    for view in views:
+      self._views[view] = _InProcessSideInputView(view)
+
+  def get_value_or_schedule_after_output(self, pcollection_view, task):
+    with self._lock:
+      view = self._views[pcollection_view]
+      if not view.has_result:
+        view.callable_queue.append(task)
+        task.blocked = True
+      return (view.has_result, view.value)
+
+  def set_value_and_get_callables(self, pcollection_view, values):
+    with self._lock:
+      view = self._views[pcollection_view]
+      assert not view.has_result
+      assert view.value is None
+      assert view.callable_queue is not None
+      view.value = self._pvalue_to_value(pcollection_view, values)
+      result = tuple(view.callable_queue)
+      for task in result:
+        task.blocked = False
+      view.callable_queue = None
+      view.has_result = True
+      return result
+
+  def _pvalue_to_value(self, view, values):
+    """Given a PCollectionView, returns the associated value in requested form.
+
+    Args:
+      view: PCollectionView for the requested side input.
+      values: Iterable values associated with the side input.
+
+    Returns:
+      The side input in its requested form.
+
+    Raises:
+      ValueError: If values cannot be converted into the requested form.
+    """
+    if isinstance(view, SingletonPCollectionView):
+      has_default, default_value = view._view_options()  # pylint: disable=protected-access
+      if len(values) == 0:
+        if has_default:
+          result = default_value
+        else:
+          result = EmptySideInput()
+      elif len(values) == 1:
+        result = values[0].value
+      else:
+        raise ValueError(
+            ('PCollection with more than one element accessed as '
+             'a singleton view: %s.') % view)
+    elif isinstance(view, IterablePCollectionView):
+      result = [v.value for v in values]
+    elif isinstance(view, ListPCollectionView):
+      result = [v.value for v in values]
+    elif isinstance(view, DictPCollectionView):
+      result = dict(v.value for v in values)
+    else:
+      raise NotImplementedError
+    return result
+
+
+class InProcessEvaluationContext(object):
+  """Evaluation context with the global state information of the pipeline.
+
+  The evaluation context for a specific pipeline being executed by the
+  InProcessPipelineRunner. Contains state shared within the execution across all
+  transforms.
+
+  InProcessEvaluationContext contains shared state for an execution of the
+  InProcessPipelineRunner that can be used while evaluating a PTransform. This
+  consists of views into underlying state and watermark implementations, access
+  to read and write PCollectionViews, and constructing counter sets and
+  execution contexts. This includes executing callbacks asynchronously when
+  state changes to the appropriate point (e.g. when a PCollectionView is
+  requested and known to be empty).
+
+  InProcessEvaluationContext also handles results by committing finalizing
+  bundles based on the current global state and updating the global state
+  appropriately. This includes updating the per-(step,key) state, updating
+  global watermarks, and executing any callbacks that can be executed.
+  """
+
+  def __init__(self, pipeline_options, bundle_factory, root_transforms,
+               value_to_consumers, step_names, views):
+    self.pipeline_options = pipeline_options
+    self._bundle_factory = bundle_factory
+    self._root_transforms = root_transforms
+    self._value_to_consumers = value_to_consumers
+    self._step_names = step_names
+    self.views = views
+
+    # AppliedPTransform -> Evaluator specific state objects
+    self._application_state_interals = {}
+    self._watermark_manager = InProcessWatermarkManager(
+        Clock(), root_transforms, value_to_consumers)
+    self._side_inputs_container = _InProcessSideInputsContainer(views)
+    self._pending_unblocked_tasks = []
+    self._counter_factory = counters.CounterFactory()
+    self._cache = None
+
+    self._lock = threading.Lock()
+
+  def use_pvalue_cache(self, cache):
+    assert not self._cache
+    self._cache = cache
+
+  @property
+  def has_cache(self):
+    return self._cache is not None
+
+  def append_to_cache(self, applied_ptransform, tag, elements):
+    with self._lock:
+      assert self._cache
+      self._cache.append(applied_ptransform, tag, elements)
+
+  def is_root_transform(self, applied_ptransform):
+    return applied_ptransform in self._root_transforms
+
+  def handle_result(
+      self, completed_bundle, completed_timers, result):
+    """Handle the provided result produced after evaluating the input bundle.
+
+    Handle the provided InProcessTransformResult, produced after evaluating
+    the provided committed bundle (potentially None, if the result of a root
+    PTransform).
+
+    The result is the output of running the transform contained in the
+    InProcessTransformResult on the contents of the provided bundle.
+
+    Args:
+      completed_bundle: the bundle that was processed to produce the result.
+      completed_timers: the timers that were delivered to produce the
+                        completed_bundle.
+      result: the InProcessTransformResult of evaluating the input bundle
+
+    Returns:
+      the committed bundles contained within the handled result.
+    """
+    with self._lock:
+      committed_bundles = self._commit_bundles(result.output_bundles)
+      self._watermark_manager.update_watermarks(
+          completed_bundle, result.transform, completed_timers,
+          committed_bundles, result.watermark_hold)
+
+      # If the result is for a view, update side inputs container.
+      if (result.output_bundles
+          and result.output_bundles[0].pcollection in self.views):
+        if committed_bundles:
+          assert len(committed_bundles) == 1
+          side_input_result = committed_bundles[0].elements
+        else:
+          side_input_result = []
+        tasks = self._side_inputs_container.set_value_and_get_callables(
+            result.output_bundles[0].pcollection, side_input_result)
+        self._pending_unblocked_tasks.extend(tasks)
+
+      if result.counters:
+        for counter in result.counters:
+          merged_counter = self._counter_factory.get_counter(
+              counter.name, counter.combine_fn)
+          merged_counter.accumulator.merge([counter.accumulator])
+
+      self._application_state_interals[result.transform] = result.state
+      return committed_bundles
+
+  def get_aggregator_values(self, aggregator_or_name):
+    return self._counter_factory.get_aggregator_values(aggregator_or_name)
+
+  def schedule_pending_unblocked_tasks(self, executor_service):
+    if self._pending_unblocked_tasks:
+      with self._lock:
+        for task in self._pending_unblocked_tasks:
+          executor_service.submit(task)
+        self._pending_unblocked_tasks = []
+
+  def _commit_bundles(self, uncommitted_bundles):
+    """Commits bundles and returns a immutable set of committed bundles."""
+    for in_progress_bundle in uncommitted_bundles:
+      producing_applied_ptransform = in_progress_bundle.pcollection.producer
+      watermarks = self._watermark_manager.get_watermarks(
+          producing_applied_ptransform)
+      in_progress_bundle.commit(watermarks.synchronized_processing_output_time)
+    return tuple(uncommitted_bundles)
+
+  def get_execution_context(self, applied_ptransform):
+    return _InProcessExecutionContext(
+        self._watermark_manager.get_watermarks(applied_ptransform),
+        self._application_state_interals.get(applied_ptransform))
+
+  def create_bundle(self, output_pcollection):
+    """Create an uncommitted bundle for the specified PCollection."""
+    return self._bundle_factory.create_bundle(output_pcollection)
+
+  def create_empty_committed_bundle(self, output_pcollection):
+    """Create empty bundle useful for triggering evaluation."""
+    return self._bundle_factory.create_empty_committed_bundle(
+        output_pcollection)
+
+  def extract_fired_timers(self):
+    return self._watermark_manager.extract_fired_timers()
+
+  def is_done(self, transform=None):
+    """Checks completion of a step or the pipeline.
+
+    Args:
+      transform: AppliedPTransform to check for completion.
+
+    Returns:
+      True if the step will not produce additional output. If transform is None
+      returns true if all steps are done.
+    """
+    if transform:
+      return self._is_transform_done(transform)
+    else:
+      for applied_ptransform in self._step_names:
+        if not self._is_transform_done(applied_ptransform):
+          return False
+      return True
+
+  def _is_transform_done(self, transform):
+    tw = self._watermark_manager.get_watermarks(transform)
+    return tw.output_watermark == InProcessWatermarkManager.WATERMARK_POS_INF
+
+  def get_value_or_schedule_after_output(self, pcollection_view, task):
+    assert isinstance(task, TransformExecutor)
+    return self._side_inputs_container.get_value_or_schedule_after_output(
+        pcollection_view, task)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f64d0a4/sdks/python/apache_beam/runners/inprocess/inprocess_executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_executor.py b/sdks/python/apache_beam/runners/inprocess/inprocess_executor.py
new file mode 100644
index 0000000..2136855
--- /dev/null
+++ b/sdks/python/apache_beam/runners/inprocess/inprocess_executor.py
@@ -0,0 +1,550 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An executor that schedules and executes applied ptransforms."""
+
+from __future__ import absolute_import
+
+import collections
+import logging
+import Queue
+import threading
+import traceback
+from weakref import WeakValueDictionary
+
+
+class ExecutorService(object):
+  """Thread pool for executing tasks in parallel."""
+
+  class CallableTask(object):
+
+    def __call__(self):
+      pass
+
+    @property
+    def name(self):
+      return None
+
+  class ExecutorServiceWorker(threading.Thread):
+    """Worker thread for executing a single task at a time."""
+
+    # Amount to block waiting for getting an item from the queue in seconds.
+    TIMEOUT = 5
+
+    def __init__(self, queue, index):
+      super(ExecutorService.ExecutorServiceWorker, self).__init__()
+      self.queue = queue
+      self._index = index
+      self._default_name = 'ExecutorServiceWorker-' + str(index)
+      self._update_name()
+      self.shutdown_requested = False
+      self.start()
+
+    def _update_name(self, task=None):
+      if task and task.name:
+        name = task.name
+      else:
+        name = self._default_name
+      self.name = 'Thread: %d, %s (%s)' % (
+          self._index, name, 'executing' if task else 'idle')
+
+    def _get_task_or_none(self):
+      try:
+        # Do not block indefinitely, otherwise we may not act for a requested
+        # shutdown.
+        return self.queue.get(
+            timeout=ExecutorService.ExecutorServiceWorker.TIMEOUT)
+      except Queue.Empty:
+        return None
+
+    def run(self):
+      while not self.shutdown_requested:
+        task = self._get_task_or_none()
+        if task:
+          try:
+            if not self.shutdown_requested:
+              self._update_name(task)
+              task()
+              self._update_name()
+          finally:
+            self.queue.task_done()
+
+    def shutdown(self):
+      self.shutdown_requested = True
+
+  def __init__(self, num_workers):
+    self.queue = Queue.Queue()
+    self.workers = [ExecutorService.ExecutorServiceWorker(
+        self.queue, i) for i in range(num_workers)]
+    self.shutdown_requested = False
+
+  def submit(self, task):
+    assert isinstance(task, ExecutorService.CallableTask)
+    if not self.shutdown_requested:
+      self.queue.put(task)
+
+  def await_completion(self):
+    for worker in self.workers:
+      worker.join()
+
+  def shutdown(self):
+    self.shutdown_requested = True
+
+    for worker in self.workers:
+      worker.shutdown()
+
+    # Consume all the remaining items in the queue
+    while not self.queue.empty():
+      try:
+        self.queue.get_nowait()
+        self.queue.task_done()
+      except Queue.Empty:
+        continue
+    # All existing threads will eventually terminate (after they complete their
+    # last task).
+
+
+class TransformEvaluationState(object):
+
+  def __init__(self, executor_service, scheduled):
+    self.executor_service = executor_service
+    self.scheduled = scheduled
+
+  def schedule(self, work):
+    self.scheduled.add(work)
+    self.executor_service.submit(work)
+
+  def complete(self, completed_work):
+    self.scheduled.remove(completed_work)
+
+
+class ParallelEvaluationState(TransformEvaluationState):
+  """A TransformEvaluationState with unlimited parallelism.
+
+  Any TransformExecutor scheduled will be immediately submitted to the
+  ExecutorService.
+
+  A principal use of this is for evaluators that can generate output bundles
+  only using the input bundle (e.g. ParDo).
+  """
+  pass
+
+
+class SerialEvaluationState(TransformEvaluationState):
+  """A TransformEvaluationState with a single work queue.
+
+  Any TransformExecutor scheduled will be placed on the work queue. Only one
+  item of work will be submitted to the ExecutorService at any time.
+
+  A principal use of this is for evaluators that keeps a global state such as
+  GroupByKeyOnly.
+  """
+
+  def __init__(self, executor_service, scheduled):
+    super(SerialEvaluationState, self).__init__(executor_service, scheduled)
+    self.serial_queue = collections.deque()
+    self.currently_evaluating = None
+    self._lock = threading.Lock()
+
+  def complete(self, completed_work):
+    self._update_currently_evaluating(None, completed_work)
+    super(SerialEvaluationState, self).complete(completed_work)
+
+  def schedule(self, new_work):
+    self._update_currently_evaluating(new_work, None)
+
+  def _update_currently_evaluating(self, new_work, completed_work):
+    with self._lock:
+      if new_work:
+        self.serial_queue.append(new_work)
+      if completed_work:
+        assert self.currently_evaluating == completed_work
+        self.currently_evaluating = None
+      if self.serial_queue and not self.currently_evaluating:
+        next_work = self.serial_queue.pop()
+        self.currently_evaluating = next_work
+        super(SerialEvaluationState, self).schedule(next_work)
+
+
+class TransformExecutorServices(object):
+  """Schedules and completes TransformExecutors.
+
+  Controls the concurrency as appropriate for the applied transform the executor
+  exists for.
+  """
+
+  def __init__(self, executor_service):
+    self._executor_service = executor_service
+    self._scheduled = set()
+    self._parallel = ParallelEvaluationState(
+        self._executor_service, self._scheduled)
+    self._serial_cache = WeakValueDictionary()
+
+  def parallel(self):
+    return self._parallel
+
+  def serial(self, step):
+    cached = self._serial_cache.get(step)
+    if not cached:
+      cached = SerialEvaluationState(self._executor_service, self._scheduled)
+      self._serial_cache[step] = cached
+    return  cached
+
+  @property
+  def executors(self):
+    return frozenset(self._scheduled)
+
+
+class _CompletionCallback(object):
+  """The default completion callback.
+
+  The default completion callback is used to complete transform evaluations
+  that are triggered due to the arrival of elements from an upstream transform,
+  or for a source transform.
+  """
+
+  def __init__(self, evaluation_context, all_updates, timers=None):
+    self._evaluation_context = evaluation_context
+    self._all_updates = all_updates
+    self._timers = timers
+
+  def handle_result(self, input_committed_bundle, transform_result):
+    output_committed_bundles = self._evaluation_context.handle_result(
+        input_committed_bundle, self._timers, transform_result)
+    for output_committed_bundle in output_committed_bundles:
+      self._all_updates.offer(_ExecutorServiceParallelExecutor.ExecutorUpdate(
+          output_committed_bundle, None))
+    return output_committed_bundles
+
+  def handle_exception(self, exception):
+    self._all_updates.offer(
+        _ExecutorServiceParallelExecutor.ExecutorUpdate(None, exception))
+
+
+class _TimerCompletionCallback(_CompletionCallback):
+
+  def __init__(self, evaluation_context, all_updates, timers):
+    super(_TimerCompletionCallback, self).__init__(
+        evaluation_context, all_updates, timers)
+
+
+class TransformExecutor(ExecutorService.CallableTask):
+  """TransformExecutor will evaluate a bundle using an applied ptransform.
+
+  A CallableTask responsible for constructing a TransformEvaluator andevaluating
+  it on some bundle of input, and registering the result using the completion
+  callback.
+  """
+
+  def __init__(self, transform_evaluator_registry, evaluation_context,
+               input_bundle, applied_transform, completion_callback,
+               transform_evaluation_state):
+    self._transform_evaluator_registry = transform_evaluator_registry
+    self._evaluation_context = evaluation_context
+    self._input_bundle = input_bundle
+    self._applied_transform = applied_transform
+    self._completion_callback = completion_callback
+    self._transform_evaluation_state = transform_evaluation_state
+    self._side_input_values = {}
+    self.blocked = False
+    self._call_count = 0
+
+  def __call__(self):
+    self._call_count += 1
+    assert self._call_count <= (1 + len(self._applied_transform.side_inputs))
+
+    for side_input in self._applied_transform.side_inputs:
+      if side_input not in self._side_input_values:
+        has_result, value = (
+            self._evaluation_context.get_value_or_schedule_after_output(
+                side_input, self))
+        if not has_result:
+          # Monitor task will reschedule this executor once the side input is
+          # available.
+          return
+        self._side_input_values[side_input] = value
+
+    side_input_values = [self._side_input_values[side_input]
+                         for side_input in self._applied_transform.side_inputs]
+
+    try:
+      evaluator = self._transform_evaluator_registry.for_application(
+          self._applied_transform, self._input_bundle, side_input_values)
+
+      if self._input_bundle:
+        for value in self._input_bundle.elements:
+          evaluator.process_element(value)
+
+      result = evaluator.finish_bundle()
+
+      if self._evaluation_context.has_cache:
+        for uncommitted_bundle in result.output_bundles:
+          self._evaluation_context.append_to_cache(
+              self._applied_transform, uncommitted_bundle.tag,
+              uncommitted_bundle.elements)
+        undeclared_tag_values = result.undeclared_tag_values
+        if undeclared_tag_values:
+          for tag, value in undeclared_tag_values.iteritems():
+            self._evaluation_context.append_to_cache(
+                self._applied_transform, tag, value)
+
+      self._completion_callback.handle_result(self._input_bundle, result)
+      return result
+    except Exception as e:  # pylint: disable=broad-except
+      logging.warning('Task failed: %s', traceback.format_exc(), exc_info=True)
+      self._completion_callback.handle_exception(e)
+    finally:
+      self._transform_evaluation_state.complete(self)
+
+
+class InProcessExecutor(object):
+
+  def __init__(self, *args, **kwargs):
+    self._executor = _ExecutorServiceParallelExecutor(*args, **kwargs)
+
+  def start(self, roots):
+    self._executor.start(roots)
+
+  def await_completion(self):
+    self._executor.await_completion()
+
+
+class _ExecutorServiceParallelExecutor(object):
+  """An internal implementation for InProcessExecutor."""
+
+  NUM_WORKERS = 1
+
+  def __init__(self, value_to_consumers, transform_evaluator_registry,
+               evaluation_context):
+    self.executor_service = ExecutorService(
+        _ExecutorServiceParallelExecutor.NUM_WORKERS)
+    self.transform_executor_services = TransformExecutorServices(
+        self.executor_service)
+    self.value_to_consumers = value_to_consumers
+    self.transform_evaluator_registry = transform_evaluator_registry
+    self.evaluation_context = evaluation_context
+    self.all_updates = _ExecutorServiceParallelExecutor._TypedUpdateQueue(
+        _ExecutorServiceParallelExecutor.ExecutorUpdate)
+    self.visible_updates = _ExecutorServiceParallelExecutor._TypedUpdateQueue(
+        _ExecutorServiceParallelExecutor.VisibleExecutorUpdate)
+    self.default_completion_callback = _CompletionCallback(
+        evaluation_context, self.all_updates)
+
+  def start(self, roots):
+    self.root_nodes = frozenset(roots)
+    self.executor_service.submit(
+        _ExecutorServiceParallelExecutor._MonitorTask(self))
+
+  def await_completion(self):
+    update = self.visible_updates.take()
+    try:
+      if update.exception:
+        raise update.exception
+    finally:
+      self.executor_service.shutdown()
+
+  def schedule_consumers(self, committed_bundle):
+    if committed_bundle.pcollection in self.value_to_consumers:
+      consumers = self.value_to_consumers[committed_bundle.pcollection]
+      for applied_ptransform in consumers:
+        self.schedule_consumption(applied_ptransform, committed_bundle,
+                                  self.default_completion_callback)
+
+  def schedule_consumption(self, consumer_applied_transform, committed_bundle,
+                           on_complete):
+    """Schedules evaluation of the given bundle with the transform."""
+    assert all([consumer_applied_transform, on_complete])
+    assert committed_bundle or consumer_applied_transform in self.root_nodes
+    if (committed_bundle
+        and self.transform_evaluator_registry.should_execute_serially(
+            consumer_applied_transform)):
+      transform_executor_service = self.transform_executor_services.serial(
+          consumer_applied_transform)
+    else:
+      transform_executor_service = self.transform_executor_services.parallel()
+
+    transform_executor = TransformExecutor(
+        self.transform_evaluator_registry, self.evaluation_context,
+        committed_bundle, consumer_applied_transform, on_complete,
+        transform_executor_service)
+    transform_executor_service.schedule(transform_executor)
+
+  class _TypedUpdateQueue(object):
+    """Type checking update queue with blocking and non-blocking operations."""
+
+    def __init__(self, item_type):
+      self._item_type = item_type
+      self._queue = Queue.Queue()
+
+    def poll(self):
+      try:
+        item = self._queue.get_nowait()
+        self._queue.task_done()
+        return  item
+      except Queue.Empty:
+        return None
+
+    def take(self):
+      item = self._queue.get()
+      self._queue.task_done()
+      return item
+
+    def offer(self, item):
+      assert isinstance(item, self._item_type)
+      self._queue.put_nowait(item)
+
+  class ExecutorUpdate(object):
+    """An internal status update on the state of the executor."""
+
+    def __init__(self, produced_bundle=None, exception=None):
+      # Exactly one of them should be not-None
+      assert bool(produced_bundle) != bool(exception)
+      self.committed_bundle = produced_bundle
+      self.exception = exception
+
+  class VisibleExecutorUpdate(object):
+    """An update of interest to the user.
+
+    Used for awaiting the completion to decide whether to return normally or
+    raise an exception.
+    """
+
+    def __init__(self, exception=None):
+      self.finished = exception is not None
+      self.exception = exception
+
+  class _MonitorTask(ExecutorService.CallableTask):
+    """MonitorTask continuously runs to ensure that pipeline makes progress."""
+
+    def __init__(self, executor):
+      self._executor = executor
+
+    @property
+    def name(self):
+      return 'monitor'
+
+    def __call__(self):
+      try:
+        update = self._executor.all_updates.poll()
+        while update:
+          if update.committed_bundle:
+            self._executor.schedule_consumers(update.committed_bundle)
+          else:
+            assert update.exception
+            logging.warning('A task failed with exception.\n %s',
+                            update.exception)
+            self._executor.visible_updates.offer(
+                _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
+                    update.exception))
+          update = self._executor.all_updates.poll()
+        self._executor.evaluation_context.schedule_pending_unblocked_tasks(
+            self._executor.executor_service)
+        self._add_work_if_necessary(self._fire_timers())
+      except Exception as e:  # pylint: disable=broad-except
+        logging.error('Monitor task died due to exception.\n %s', e)
+        self._executor.visible_updates.offer(
+            _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(e))
+      finally:
+        if not self._should_shutdown():
+          self._executor.executor_service.submit(self)
+
+    def _should_shutdown(self):
+      """_should_shutdown checks whether pipeline is completed or not.
+
+      It will check for successful completion by checking the watermarks of all
+      transforms. If they all reached the maximum watermark it means that
+      pipeline successfully reached to completion.
+
+      If the above is not true, it will check that at least one executor is
+      making progress. Otherwise pipeline will be declared stalled.
+
+      If the pipeline reached to a terminal state as explained above
+      _should_shutdown will request executor to gracefully shutdown.
+
+      Returns:
+        True if pipeline reached a terminal state and monitor task could finish.
+        Otherwise monitor task should schedule itself again for future
+        execution.
+      """
+      if self._executor.evaluation_context.is_done():
+        self._executor.visible_updates.offer(
+            _ExecutorServiceParallelExecutor.VisibleExecutorUpdate())
+        self._executor.executor_service.shutdown()
+        return True
+      elif not self._is_executing:
+        self._executor.visible_updates.offer(
+            _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
+                Exception('Monitor task detected a pipeline stall.')))
+        self._executor.executor_service.shutdown()
+        return True
+      return False
+
+    def _fire_timers(self):
+      """Schedules triggered consumers if any timers fired.
+
+      Returns:
+        True if timers fired.
+      """
+      fired_timers = self._executor.evaluation_context.extract_fired_timers()
+      for applied_ptransform in fired_timers:
+        # Use an empty committed bundle. just to trigger.
+        empty_bundle = (
+            self._executor.evaluation_context.create_empty_committed_bundle(
+                applied_ptransform.inputs[0]))
+        timer_completion_callback = _TimerCompletionCallback(
+            self._executor.evaluation_context, self._executor.all_updates,
+            applied_ptransform)
+
+        self._executor.schedule_consumption(
+            applied_ptransform, empty_bundle, timer_completion_callback)
+      return bool(fired_timers)
+
+    def _is_executing(self):
+      """Returns True if there is at least one non-blocked TransformExecutor."""
+      for transform_executor in (
+          self._executor.transform_executor_services.executors):
+        if not transform_executor.blocked:
+          return True
+      return False
+
+    def _add_work_if_necessary(self, timers_fired):
+      """Adds more work from the roots if pipeline requires more input.
+
+      If all active TransformExecutors are in a blocked state, add more work
+      from root nodes that may have additional work. This ensures that if a
+      pipeline has elements available from the root nodes it will add those
+      elements when necessary.
+
+      Args:
+        timers_fired: True if any timers fired prior to this call.
+      """
+      # If any timers have fired, they will add more work; No need to add more.
+      if timers_fired:
+        return
+
+      if self._is_executing():
+        # We have at least one executor that can proceed without adding
+        # additional work.
+        return
+
+      # All current TransformExecutors are blocked; add more work from the
+      # roots.
+      for applied_transform in self._executor.root_nodes:
+        if not self._executor.evaluation_context.is_done(applied_transform):
+          self._executor.schedule_consumption(
+              applied_transform, None,
+              self._executor.default_completion_callback)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f64d0a4/sdks/python/apache_beam/runners/inprocess/inprocess_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_runner.py b/sdks/python/apache_beam/runners/inprocess/inprocess_runner.py
new file mode 100644
index 0000000..287c170
--- /dev/null
+++ b/sdks/python/apache_beam/runners/inprocess/inprocess_runner.py
@@ -0,0 +1,142 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""InProcessPipelineRunner, executing on the local machine."""
+
+from __future__ import absolute_import
+
+import collections
+import logging
+
+from apache_beam.runners.inprocess.bundle_factory import BundleFactory
+from apache_beam.runners.inprocess.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
+from apache_beam.runners.inprocess.inprocess_evaluation_context import InProcessEvaluationContext
+from apache_beam.runners.inprocess.inprocess_executor import InProcessExecutor
+from apache_beam.runners.inprocess.transform_evaluator import TransformEvaluatorRegistry
+from apache_beam.runners.runner import PipelineResult
+from apache_beam.runners.runner import PipelineRunner
+from apache_beam.runners.runner import PipelineState
+from apache_beam.runners.runner import PValueCache
+
+
+class InProcessPipelineRunner(PipelineRunner):
+  """Executes a single pipeline on the local machine."""
+
+  def __init__(self):
+    self._cache = None
+
+  def run(self, pipeline):
+    """Execute the entire pipeline and returns an InProcessPipelineResult."""
+    logging.info('Running pipeline with InProcessPipelineRunner.')
+    self.visitor = ConsumerTrackingPipelineVisitor()
+    pipeline.visit(self.visitor)
+
+    evaluation_context = InProcessEvaluationContext(
+        pipeline.options,
+        BundleFactory(),
+        self.visitor.root_transforms,
+        self.visitor.value_to_consumers,
+        self.visitor.step_names,
+        self.visitor.views)
+
+    evaluation_context.use_pvalue_cache(self._cache)
+
+    executor = InProcessExecutor(self.visitor.value_to_consumers,
+                                 TransformEvaluatorRegistry(evaluation_context),
+                                 evaluation_context)
+    # Start the executor. This is a non-blocking call, it will start the
+    # execution in background threads and return.
+    executor.start(self.visitor.root_transforms)
+    result = InProcessPipelineResult(executor, evaluation_context)
+
+    # TODO(altay): If blocking:
+    # Block until the pipeline completes. This call will return after the
+    # pipeline was fully terminated (successfully or with a failure).
+    result.await_completion()
+
+    if self._cache:
+      self._cache.finalize()
+
+    return result
+
+  @property
+  def cache(self):
+    if not self._cache:
+      self._cache = InProcessBufferingInMemoryCache()
+    return self._cache.pvalue_cache
+
+  def apply(self, transform, input):  # pylint: disable=redefined-builtin
+    """Runner callback for a pipeline.apply call."""
+    return transform.apply(input)
+
+
+class InProcessBufferingInMemoryCache(object):
+  """PValueCache wrapper for buffering bundles until a PValue is fully computed.
+
+  InProcessBufferingInMemoryCache keeps an in memory cache of
+  (applied_ptransform, tag) tuples. It accepts appending to existing cache
+  entries until it is finalized. finalize() will make all the existing cached
+  entries visible to the underyling PValueCache in their entirety, clean the in
+  memory cache and stop accepting new cache entries.
+  """
+
+  def __init__(self):
+    self._cache = collections.defaultdict(list)
+    self._pvalue_cache = PValueCache()
+    self._finalized = False
+
+  @property
+  def pvalue_cache(self):
+    return self._pvalue_cache
+
+  def append(self, applied_ptransform, tag, elements):
+    assert not self._finalized
+    assert elements is not None
+    self._cache[(applied_ptransform, tag)].extend(elements)
+
+  def finalize(self):
+    """Make buffered cache elements visible to the underlying PValueCache."""
+    assert not self._finalized
+    for key, value in self._cache.iteritems():
+      applied_ptransform, tag = key
+      self._pvalue_cache.cache_output(applied_ptransform, tag, value)
+      self._cache = None
+
+
+class InProcessPipelineResult(PipelineResult):
+  """A InProcessPipelineResult provides access to info about a pipeline."""
+
+  def __init__(self, executor, evaluation_context):
+    super(InProcessPipelineResult, self).__init__(PipelineState.RUNNING)
+    self._executor = executor
+    self._evaluation_context = evaluation_context
+
+  def _is_in_terminal_state(self):
+    return self._state is not PipelineState.RUNNING
+
+  def await_completion(self):
+    if not self._is_in_terminal_state():
+      try:
+        self._executor.await_completion()
+        self._state = PipelineState.DONE
+      except:  # pylint: disable=broad-except
+        self._state = PipelineState.FAILED
+        raise
+    return self._state
+
+  def aggregated_values(self, aggregator_or_name):
+    return self._evaluation_context.get_aggregator_values(aggregator_or_name)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f64d0a4/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py b/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
new file mode 100644
index 0000000..4e84147
--- /dev/null
+++ b/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for InProcessPipelineRunner."""
+
+import logging
+import unittest
+
+from apache_beam import Pipeline
+import apache_beam.examples.snippets.snippets_test as snippets_test
+import apache_beam.io.fileio_test as fileio_test
+import apache_beam.io.sources_test as sources_test
+import apache_beam.pipeline_test as pipeline_test
+import apache_beam.pvalue_test as pvalue_test
+from apache_beam.runners.inprocess.inprocess_runner import InProcessPipelineRunner
+import apache_beam.transforms.aggregator_test as aggregator_test
+import apache_beam.transforms.combiners_test as combiners_test
+import apache_beam.transforms.ptransform_test as ptransform_test
+import apache_beam.transforms.trigger_test as trigger_test
+import apache_beam.transforms.window_test as window_test
+import apache_beam.transforms.write_ptransform_test as write_ptransform_test
+import apache_beam.typehints.typed_pipeline_test as typed_pipeline_test
+
+
+class TestWithInProcessPipelineRunner(object):
+
+  def setUp(self):
+    original_init = Pipeline.__init__
+
+    def override_pipeline_init(self, runner=None, options=None, argv=None):
+      runner = InProcessPipelineRunner()
+      return original_init(self, runner, options, argv)
+
+    self.runner_name = None
+    self.original_init = original_init
+    Pipeline.__init__ = override_pipeline_init
+
+  def tearDown(self):
+    Pipeline.__init__ = self.original_init
+
+
+class InProcessPipelineRunnerPipelineTest(
+    TestWithInProcessPipelineRunner, pipeline_test.PipelineTest):
+
+  def test_cached_pvalues_are_refcounted(self):
+    # InProcessPipelineRunner does not have a refcounted cache.
+    pass
+
+  def test_eager_pipeline(self):
+    # Tests eager runner only
+    pass
+
+
+class InProcessPipelineRunnerSnippetsTest(
+    TestWithInProcessPipelineRunner, snippets_test.SnippetsTest,
+    snippets_test.ParDoTest, snippets_test.TypeHintsTest,
+    snippets_test.CombineTest):
+  pass
+
+
+class InProcessPipelineRunnerTransform(
+    TestWithInProcessPipelineRunner, aggregator_test.AggregatorTest,
+    combiners_test.CombineTest, ptransform_test.PTransformTest,
+    pvalue_test.PValueTest, window_test.WindowTest,
+    typed_pipeline_test.MainInputTest, typed_pipeline_test.SideInputTest,
+    typed_pipeline_test.CustomTransformTest, trigger_test.TriggerPipelineTest,
+    write_ptransform_test.WriteTest):
+  pass
+
+
+class TestTextFileSource(
+    TestWithInProcessPipelineRunner, fileio_test.TestTextFileSource):
+  pass
+
+
+class NativeTestTextFileSink(
+    TestWithInProcessPipelineRunner, fileio_test.NativeTestTextFileSink):
+  pass
+
+
+class TestTextFileSink(
+    TestWithInProcessPipelineRunner, fileio_test.TestTextFileSink):
+
+  def setUp(self):
+    TestWithInProcessPipelineRunner.setUp(self)
+    fileio_test.TestTextFileSink.setUp(self)
+
+
+class MyFileSink(TestWithInProcessPipelineRunner, fileio_test.MyFileSink):
+  pass
+
+
+class TestFileSink(TestWithInProcessPipelineRunner, fileio_test.TestFileSink):
+  pass
+
+
+class SourcesTest(TestWithInProcessPipelineRunner, sources_test.SourcesTest):
+  pass
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.DEBUG)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f64d0a4/sdks/python/apache_beam/runners/inprocess/inprocess_transform_result.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_transform_result.py b/sdks/python/apache_beam/runners/inprocess/inprocess_transform_result.py
new file mode 100644
index 0000000..798ebfb
--- /dev/null
+++ b/sdks/python/apache_beam/runners/inprocess/inprocess_transform_result.py
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""The result of evaluating an AppliedPTransform with a TransformEvaluator."""
+
+from __future__ import absolute_import
+
+
+class InProcessTransformResult(object):
+  """The result of evaluating an AppliedPTransform with a TransformEvaluator."""
+
+  def __init__(self, applied_ptransform, uncommitted_output_bundles, state,
+               timer_update, counters, watermark_hold,
+               undeclared_tag_values=None):
+    self._applied_ptransform = applied_ptransform
+    self._uncommitted_output_bundles = uncommitted_output_bundles
+    self._state = state
+    self._timer_update = timer_update
+    self._counters = counters
+    self._watermark_hold = watermark_hold
+    # Only used when caching (materializing) all values is requested.
+    self._undeclared_tag_values = undeclared_tag_values
+
+  @property
+  def transform(self):
+    return self._applied_ptransform
+
+  @property
+  def output_bundles(self):
+    return self._uncommitted_output_bundles
+
+  @property
+  def state(self):
+    return self._state
+
+  @property
+  def counters(self):
+    return self._counters
+
+  @property
+  def watermark_hold(self):
+    return self._watermark_hold
+
+  @property
+  def undeclared_tag_values(self):
+    return self._undeclared_tag_values

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f64d0a4/sdks/python/apache_beam/runners/inprocess/inprocess_watermark_manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_watermark_manager.py b/sdks/python/apache_beam/runners/inprocess/inprocess_watermark_manager.py
new file mode 100644
index 0000000..3530c9d
--- /dev/null
+++ b/sdks/python/apache_beam/runners/inprocess/inprocess_watermark_manager.py
@@ -0,0 +1,224 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Manages watermarks of PCollections and AppliedPTransforms."""
+
+from __future__ import absolute_import
+
+import threading
+
+from apache_beam import pipeline
+from apache_beam import pvalue
+from apache_beam.transforms.timeutil import MAX_TIMESTAMP
+from apache_beam.transforms.timeutil import MIN_TIMESTAMP
+
+
+class InProcessWatermarkManager(object):
+  """Tracks and updates watermarks for all AppliedPTransforms."""
+
+  WATERMARK_POS_INF = MAX_TIMESTAMP
+  WATERMARK_NEG_INF = MIN_TIMESTAMP
+
+  def __init__(self, clock, root_transforms, value_to_consumers):
+    self._clock = clock  # processing time clock
+    self._value_to_consumers = value_to_consumers
+    self._root_transforms = root_transforms
+    # AppliedPTransform -> TransformWatermarks
+    self._transform_to_watermarks = {}
+
+    for root_transform in root_transforms:
+      self._transform_to_watermarks[root_transform] = TransformWatermarks(
+          self._clock)
+
+    for consumers in value_to_consumers.values():
+      for consumer in consumers:
+        self._transform_to_watermarks[consumer] = TransformWatermarks(
+            self._clock)
+
+    for consumers in value_to_consumers.values():
+      for consumer in consumers:
+        self._update_input_transform_watermarks(consumer)
+
+  def _update_input_transform_watermarks(self, applied_ptransform):
+    assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
+    input_transform_watermarks = []
+    for input_pvalue in applied_ptransform.inputs:
+      assert input_pvalue.producer or isinstance(input_pvalue, pvalue.PBegin)
+      if input_pvalue.producer:
+        input_transform_watermarks.append(
+            self.get_watermarks(input_pvalue.producer))
+    self._transform_to_watermarks[
+        applied_ptransform].update_input_transform_watermarks(
+            input_transform_watermarks)
+
+  def get_watermarks(self, applied_ptransform):
+    """Gets the input and output watermarks for an AppliedPTransform.
+
+    If the applied_ptransform has not processed any elements, return a
+    watermark with minimum value.
+
+    Args:
+      applied_ptransform: AppliedPTransform to get the watermarks for.
+
+    Returns:
+      A snapshot (TransformWatermarks) of the input watermark and output
+      watermark for the provided transform.
+    """
+
+    # TODO(altay): Composite transforms should have a composite watermark. Until
+    # then they are represented by their last transform.
+    while applied_ptransform.parts:
+      applied_ptransform = applied_ptransform.parts[-1]
+
+    return self._transform_to_watermarks[applied_ptransform]
+
+  def update_watermarks(self, completed_committed_bundle, applied_ptransform,
+                        timer_update, outputs, earliest_hold):
+    assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
+    self._update_pending(
+        completed_committed_bundle, applied_ptransform, timer_update, outputs)
+    tw = self.get_watermarks(applied_ptransform)
+    tw.hold(earliest_hold)
+    self._refresh_watermarks(applied_ptransform)
+
+  def _update_pending(self, input_committed_bundle, applied_ptransform,
+                      timer_update, output_committed_bundles):
+    """Updated list of pending bundles for the given AppliedPTransform."""
+
+    # Update pending elements. Filter out empty bundles. They do not impact
+    # watermarks and should not trigger downstream execution.
+    for output in output_committed_bundles:
+      if output.elements:
+        if output.pcollection in self._value_to_consumers:
+          consumers = self._value_to_consumers[output.pcollection]
+          for consumer in consumers:
+            consumer_tw = self._transform_to_watermarks[consumer]
+            consumer_tw.add_pending(output)
+
+    completed_tw = self._transform_to_watermarks[applied_ptransform]
+    completed_tw.update_timers(timer_update)
+
+    assert input_committed_bundle or applied_ptransform in self._root_transforms
+    if input_committed_bundle and input_committed_bundle.elements:
+      completed_tw.remove_pending(input_committed_bundle)
+
+  def _refresh_watermarks(self, applied_ptransform):
+    assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
+    tw = self.get_watermarks(applied_ptransform)
+    if tw.refresh():
+      for pval in applied_ptransform.outputs.values():
+        if isinstance(pval, pvalue.DoOutputsTuple):
+          pvals = (v for v in pval)
+        else:
+          pvals = (pval,)
+        for v in pvals:
+          if v in self._value_to_consumers:  # If there are downstream consumers
+            consumers = self._value_to_consumers[v]
+            for consumer in consumers:
+              self._refresh_watermarks(consumer)
+
+  def extract_fired_timers(self):
+    all_timers = []
+    for applied_ptransform, tw in self._transform_to_watermarks.iteritems():
+      if tw.extract_fired_timers():
+        all_timers.append(applied_ptransform)
+    return all_timers
+
+
+class TransformWatermarks(object):
+  """Tracks input and output watermarks for aan AppliedPTransform."""
+
+  def __init__(self, clock):
+    self._clock = clock
+    self._input_transform_watermarks = []
+    self._input_watermark = InProcessWatermarkManager.WATERMARK_NEG_INF
+    self._output_watermark = InProcessWatermarkManager.WATERMARK_NEG_INF
+    self._earliest_hold = InProcessWatermarkManager.WATERMARK_POS_INF
+    self._pending = set()  # Scheduled bundles targeted for this transform.
+    self._fired_timers = False
+    self._lock = threading.Lock()
+
+  def update_input_transform_watermarks(self, input_transform_watermarks):
+    with self._lock:
+      self._input_transform_watermarks = input_transform_watermarks
+
+  def update_timers(self, timer_update):
+    with self._lock:
+      if timer_update:
+        assert self._fired_timers
+        self._fired_timers = False
+
+  @property
+  def input_watermark(self):
+    with self._lock:
+      return self._input_watermark
+
+  @property
+  def output_watermark(self):
+    with self._lock:
+      return self._output_watermark
+
+  def hold(self, value):
+    with self._lock:
+      if value is None:
+        value = InProcessWatermarkManager.WATERMARK_POS_INF
+      self._earliest_hold = value
+
+  def add_pending(self, pending):
+    with self._lock:
+      self._pending.add(pending)
+
+  def remove_pending(self, completed):
+    with self._lock:
+      # Ignore repeated removes. This will happen if a transform has a repeated
+      # input.
+      if completed in self._pending:
+        self._pending.remove(completed)
+
+  def refresh(self):
+    with self._lock:
+      pending_holder = (InProcessWatermarkManager.WATERMARK_NEG_INF
+                        if self._pending else
+                        InProcessWatermarkManager.WATERMARK_POS_INF)
+
+      input_watermarks = [
+          tw.output_watermark for tw in self._input_transform_watermarks]
+      input_watermarks.append(InProcessWatermarkManager.WATERMARK_POS_INF)
+      producer_watermark = min(input_watermarks)
+
+      self._input_watermark = max(self._input_watermark,
+                                  min(pending_holder, producer_watermark))
+      new_output_watermark = min(self._input_watermark, self._earliest_hold)
+
+      advanced = new_output_watermark > self._output_watermark
+      self._output_watermark = new_output_watermark
+      return advanced
+
+  @property
+  def synchronized_processing_output_time(self):
+    return self._clock.now
+
+  def extract_fired_timers(self):
+    with self._lock:
+      if self._fired_timers:
+        return  False
+
+      should_fire = (
+          self._earliest_hold < InProcessWatermarkManager.WATERMARK_POS_INF and
+          self._input_watermark == InProcessWatermarkManager.WATERMARK_POS_INF)
+      self._fired_timers = should_fire
+      return should_fire