You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/08 01:59:34 UTC

[4/5] incubator-beam git commit: Renames InprocessPipelineRunner to DirectPipelineRunner and removes the existing DirectPipelineRunner. Renamed the folder to direct to keep all related files in the same folder. Removed inprocess prefix from file names/cl

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/direct/watermark_manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py
new file mode 100644
index 0000000..f439731
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -0,0 +1,224 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Manages watermarks of PCollections and AppliedPTransforms."""
+
+from __future__ import absolute_import
+
+import threading
+
+from apache_beam import pipeline
+from apache_beam import pvalue
+from apache_beam.transforms.timeutil import MAX_TIMESTAMP
+from apache_beam.transforms.timeutil import MIN_TIMESTAMP
+
+
+class WatermarkManager(object):
+  """Tracks and updates watermarks for all AppliedPTransforms."""
+
+  WATERMARK_POS_INF = MAX_TIMESTAMP
+  WATERMARK_NEG_INF = MIN_TIMESTAMP
+
+  def __init__(self, clock, root_transforms, value_to_consumers):
+    self._clock = clock  # processing time clock
+    self._value_to_consumers = value_to_consumers
+    self._root_transforms = root_transforms
+    # AppliedPTransform -> TransformWatermarks
+    self._transform_to_watermarks = {}
+
+    for root_transform in root_transforms:
+      self._transform_to_watermarks[root_transform] = TransformWatermarks(
+          self._clock)
+
+    for consumers in value_to_consumers.values():
+      for consumer in consumers:
+        self._transform_to_watermarks[consumer] = TransformWatermarks(
+            self._clock)
+
+    for consumers in value_to_consumers.values():
+      for consumer in consumers:
+        self._update_input_transform_watermarks(consumer)
+
+  def _update_input_transform_watermarks(self, applied_ptransform):
+    assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
+    input_transform_watermarks = []
+    for input_pvalue in applied_ptransform.inputs:
+      assert input_pvalue.producer or isinstance(input_pvalue, pvalue.PBegin)
+      if input_pvalue.producer:
+        input_transform_watermarks.append(
+            self.get_watermarks(input_pvalue.producer))
+    self._transform_to_watermarks[
+        applied_ptransform].update_input_transform_watermarks(
+            input_transform_watermarks)
+
+  def get_watermarks(self, applied_ptransform):
+    """Gets the input and output watermarks for an AppliedPTransform.
+
+    If the applied_ptransform has not processed any elements, return a
+    watermark with minimum value.
+
+    Args:
+      applied_ptransform: AppliedPTransform to get the watermarks for.
+
+    Returns:
+      A snapshot (TransformWatermarks) of the input watermark and output
+      watermark for the provided transform.
+    """
+
+    # TODO(altay): Composite transforms should have a composite watermark. Until
+    # then they are represented by their last transform.
+    while applied_ptransform.parts:
+      applied_ptransform = applied_ptransform.parts[-1]
+
+    return self._transform_to_watermarks[applied_ptransform]
+
+  def update_watermarks(self, completed_committed_bundle, applied_ptransform,
+                        timer_update, outputs, earliest_hold):
+    assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
+    self._update_pending(
+        completed_committed_bundle, applied_ptransform, timer_update, outputs)
+    tw = self.get_watermarks(applied_ptransform)
+    tw.hold(earliest_hold)
+    self._refresh_watermarks(applied_ptransform)
+
+  def _update_pending(self, input_committed_bundle, applied_ptransform,
+                      timer_update, output_committed_bundles):
+    """Updated list of pending bundles for the given AppliedPTransform."""
+
+    # Update pending elements. Filter out empty bundles. They do not impact
+    # watermarks and should not trigger downstream execution.
+    for output in output_committed_bundles:
+      if output.elements:
+        if output.pcollection in self._value_to_consumers:
+          consumers = self._value_to_consumers[output.pcollection]
+          for consumer in consumers:
+            consumer_tw = self._transform_to_watermarks[consumer]
+            consumer_tw.add_pending(output)
+
+    completed_tw = self._transform_to_watermarks[applied_ptransform]
+    completed_tw.update_timers(timer_update)
+
+    assert input_committed_bundle or applied_ptransform in self._root_transforms
+    if input_committed_bundle and input_committed_bundle.elements:
+      completed_tw.remove_pending(input_committed_bundle)
+
+  def _refresh_watermarks(self, applied_ptransform):
+    assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
+    tw = self.get_watermarks(applied_ptransform)
+    if tw.refresh():
+      for pval in applied_ptransform.outputs.values():
+        if isinstance(pval, pvalue.DoOutputsTuple):
+          pvals = (v for v in pval)
+        else:
+          pvals = (pval,)
+        for v in pvals:
+          if v in self._value_to_consumers:  # If there are downstream consumers
+            consumers = self._value_to_consumers[v]
+            for consumer in consumers:
+              self._refresh_watermarks(consumer)
+
+  def extract_fired_timers(self):
+    all_timers = []
+    for applied_ptransform, tw in self._transform_to_watermarks.iteritems():
+      if tw.extract_fired_timers():
+        all_timers.append(applied_ptransform)
+    return all_timers
+
+
+class TransformWatermarks(object):
+  """Tracks input and output watermarks for aan AppliedPTransform."""
+
+  def __init__(self, clock):
+    self._clock = clock
+    self._input_transform_watermarks = []
+    self._input_watermark = WatermarkManager.WATERMARK_NEG_INF
+    self._output_watermark = WatermarkManager.WATERMARK_NEG_INF
+    self._earliest_hold = WatermarkManager.WATERMARK_POS_INF
+    self._pending = set()  # Scheduled bundles targeted for this transform.
+    self._fired_timers = False
+    self._lock = threading.Lock()
+
+  def update_input_transform_watermarks(self, input_transform_watermarks):
+    with self._lock:
+      self._input_transform_watermarks = input_transform_watermarks
+
+  def update_timers(self, timer_update):
+    with self._lock:
+      if timer_update:
+        assert self._fired_timers
+        self._fired_timers = False
+
+  @property
+  def input_watermark(self):
+    with self._lock:
+      return self._input_watermark
+
+  @property
+  def output_watermark(self):
+    with self._lock:
+      return self._output_watermark
+
+  def hold(self, value):
+    with self._lock:
+      if value is None:
+        value = WatermarkManager.WATERMARK_POS_INF
+      self._earliest_hold = value
+
+  def add_pending(self, pending):
+    with self._lock:
+      self._pending.add(pending)
+
+  def remove_pending(self, completed):
+    with self._lock:
+      # Ignore repeated removes. This will happen if a transform has a repeated
+      # input.
+      if completed in self._pending:
+        self._pending.remove(completed)
+
+  def refresh(self):
+    with self._lock:
+      pending_holder = (WatermarkManager.WATERMARK_NEG_INF
+                        if self._pending else
+                        WatermarkManager.WATERMARK_POS_INF)
+
+      input_watermarks = [
+          tw.output_watermark for tw in self._input_transform_watermarks]
+      input_watermarks.append(WatermarkManager.WATERMARK_POS_INF)
+      producer_watermark = min(input_watermarks)
+
+      self._input_watermark = max(self._input_watermark,
+                                  min(pending_holder, producer_watermark))
+      new_output_watermark = min(self._input_watermark, self._earliest_hold)
+
+      advanced = new_output_watermark > self._output_watermark
+      self._output_watermark = new_output_watermark
+      return advanced
+
+  @property
+  def synchronized_processing_output_time(self):
+    return self._clock.now
+
+  def extract_fired_timers(self):
+    with self._lock:
+      if self._fired_timers:
+        return  False
+
+      should_fire = (
+          self._earliest_hold < WatermarkManager.WATERMARK_POS_INF and
+          self._input_watermark == WatermarkManager.WATERMARK_POS_INF)
+      self._fired_timers = should_fire
+      return should_fire

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct_runner.py b/sdks/python/apache_beam/runners/direct_runner.py
deleted file mode 100644
index c4c52b3..0000000
--- a/sdks/python/apache_beam/runners/direct_runner.py
+++ /dev/null
@@ -1,308 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""DirectPipelineRunner, executing on the local machine.
-
-The DirectPipelineRunner class implements what is called in Dataflow
-parlance the "direct runner". Such a runner executes the entire graph
-of transformations belonging to a pipeline on the local machine.
-"""
-
-from __future__ import absolute_import
-
-import collections
-import itertools
-import logging
-
-from apache_beam import coders
-from apache_beam import error
-from apache_beam.runners.common import DoFnRunner
-from apache_beam.runners.common import DoFnState
-from apache_beam.runners.runner import PipelineResult
-from apache_beam.runners.runner import PipelineRunner
-from apache_beam.runners.runner import PipelineState
-from apache_beam.runners.runner import PValueCache
-from apache_beam.transforms import sideinputs
-from apache_beam.transforms.window import GlobalWindows
-from apache_beam.transforms.window import WindowedValue
-from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
-from apache_beam.typehints.typecheck import TypeCheckError
-from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn
-from apache_beam.utils import counters
-from apache_beam.utils.options import TypeOptions
-
-
-class DirectPipelineRunner(PipelineRunner):
-  """A local pipeline runner.
-
-  The runner computes everything locally and does not make any attempt to
-  optimize for time or space.
-  """
-
-  def __init__(self, cache=None):
-    # Cache of values computed while the runner executes a pipeline.
-    self._cache = cache if cache is not None else PValueCache()
-    self._counter_factory = counters.CounterFactory()
-    # Element counts used for debugging footprint issues in the direct runner.
-    # The values computed are used only for logging and do not take part in
-    # any decision making logic. The key for the counter dictionary is either
-    # the full label for the transform producing the elements or a tuple
-    # (full label, output tag) for ParDo transforms since they can output values
-    # on multiple outputs.
-    self.debug_counters = {}
-    self.debug_counters['element_counts'] = collections.Counter()
-
-  @property
-  def cache(self):
-    return self._cache
-
-  def get_pvalue(self, pvalue):
-    """Gets the PValue's computed value from the runner's cache."""
-    try:
-      return self._cache.get_pvalue(pvalue)
-    except KeyError:
-      raise error.PValueError('PValue is not computed.')
-
-  def clear_pvalue(self, pvalue):
-    """Removes a PValue from the runner's cache."""
-    self._cache.clear_pvalue(pvalue)
-
-  def skip_if_cached(func):  # pylint: disable=no-self-argument
-    """Decorator to skip execution of a transform if value is cached."""
-
-    def func_wrapper(self, pvalue, *args, **kwargs):
-      logging.debug('Current: Debug counters: %s', self.debug_counters)
-      if self._cache.is_cached(pvalue):  # pylint: disable=protected-access
-        return
-      else:
-        func(self, pvalue, *args, **kwargs)
-    return func_wrapper
-
-  def run(self, pipeline):
-    super(DirectPipelineRunner, self).run(pipeline)
-    logging.info('Final: Debug counters: %s', self.debug_counters)
-    return DirectPipelineResult(state=PipelineState.DONE,
-                                counter_factory=self._counter_factory)
-
-  @skip_if_cached
-  def run_CreatePCollectionView(self, transform_node):
-    transform = transform_node.transform
-    view = transform.view
-    values = self._cache.get_pvalue(transform_node.inputs[0])
-    result = sideinputs.SideInputMap(type(view), view._view_options(), values)
-    self._cache.cache_output(transform_node, result)
-
-  @skip_if_cached
-  def run_ParDo(self, transform_node):
-    transform = transform_node.transform
-
-    side_inputs = [self._cache.get_pvalue(view)
-                   for view in transform_node.side_inputs]
-
-    # TODO(robertwb): Do this type checking inside DoFnRunner to get it on
-    # remote workers as well?
-    options = transform_node.inputs[0].pipeline.options
-    if options is not None and options.view_as(TypeOptions).runtime_type_check:
-      transform.dofn = TypeCheckWrapperDoFn(
-          transform.dofn, transform.get_type_hints())
-
-    # TODO(robertwb): Should this be conditionally done on the workers as well?
-    transform.dofn = OutputCheckWrapperDoFn(
-        transform.dofn, transform_node.full_label)
-
-    class RecordingReceiverSet(object):
-
-      def __init__(self, tag):
-        self.tag = tag
-
-      def output(self, element):
-        results[self.tag].append(element)
-
-    class TaggedReceivers(dict):
-
-      def __missing__(self, key):
-        return RecordingReceiverSet(key)
-
-    results = collections.defaultdict(list)
-    # Some tags may be empty.
-    for tag in transform.side_output_tags:
-      results[tag] = []
-
-    runner = DoFnRunner(transform.dofn, transform.args, transform.kwargs,
-                        side_inputs, transform_node.inputs[0].windowing,
-                        tagged_receivers=TaggedReceivers(),
-                        step_name=transform_node.full_label,
-                        state=DoFnState(self._counter_factory))
-    runner.start()
-    for v in self._cache.get_pvalue(transform_node.inputs[0]):
-      runner.process(v)
-    runner.finish()
-
-    self._cache.cache_output(transform_node, [])
-    for tag, value in results.items():
-      self.debug_counters['element_counts'][
-          (transform_node.full_label, tag)] += len(value)
-      self._cache.cache_output(transform_node, tag, value)
-
-  @skip_if_cached
-  def run_GroupByKeyOnly(self, transform_node):
-    result_dict = collections.defaultdict(list)
-    # The input type of a GroupByKey will be KV[Any, Any] or more specific.
-    kv_type_hint = transform_node.transform.get_type_hints().input_types[0]
-    key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0])
-
-    for wv in self._cache.get_pvalue(transform_node.inputs[0]):
-      if (isinstance(wv, WindowedValue) and
-          isinstance(wv.value, collections.Iterable) and len(wv.value) == 2):
-        k, v = wv.value
-        # We use as key a string encoding of the key object to support keys
-        # that are based on custom classes. This mimics also the remote
-        # execution behavior where key objects are encoded before being written
-        # to the shuffler system responsible for grouping.
-        result_dict[key_coder.encode(k)].append(v)
-      else:
-        raise TypeCheckError('Input to GroupByKeyOnly must be a PCollection of '
-                             'windowed key-value pairs. Instead received: %r.'
-                             % wv)
-
-    gbk_result = map(
-        GlobalWindows.windowed_value,
-        ((key_coder.decode(k), v) for k, v in result_dict.iteritems()))
-    self.debug_counters['element_counts'][
-        transform_node.full_label] += len(gbk_result)
-    self._cache.cache_output(transform_node, gbk_result)
-
-  @skip_if_cached
-  def run_Create(self, transform_node):
-    transform = transform_node.transform
-    create_result = [GlobalWindows.windowed_value(v) for v in transform.value]
-    self.debug_counters['element_counts'][
-        transform_node.full_label] += len(create_result)
-    self._cache.cache_output(transform_node, create_result)
-
-  @skip_if_cached
-  def run_Flatten(self, transform_node):
-    flatten_result = list(
-        itertools.chain.from_iterable(
-            self._cache.get_pvalue(pc) for pc in transform_node.inputs))
-    self.debug_counters['element_counts'][
-        transform_node.full_label] += len(flatten_result)
-    self._cache.cache_output(transform_node, flatten_result)
-
-  @skip_if_cached
-  def run_Read(self, transform_node):
-    # TODO(chamikara) Implement a more generic way for passing PipelineOptions
-    # to sources and sinks when using DirectRunner.
-    source = transform_node.transform.source
-    source.pipeline_options = transform_node.inputs[0].pipeline.options
-
-    def read_values(reader):
-      read_result = [GlobalWindows.windowed_value(e) for e in reader]
-      self.debug_counters['element_counts'][
-          transform_node.full_label] += len(read_result)
-      self._cache.cache_output(transform_node, read_result)
-
-    # pylint: disable=wrong-import-position
-    from apache_beam.io import iobase
-
-    if isinstance(source, iobase.BoundedSource):
-      # Getting a RangeTracker for the default range of the source and reading
-      # the full source using that.
-      range_tracker = source.get_range_tracker(None, None)
-      reader = source.read(range_tracker)
-      read_values(reader)
-    else:
-      with source.reader() as reader:
-        read_values(reader)
-
-  @skip_if_cached
-  def run__NativeWrite(self, transform_node):
-    sink = transform_node.transform.sink
-
-    # pylint: disable=wrong-import-position
-    from apache_beam.io import fileio
-
-    if isinstance(sink, fileio.NativeTextFileSink):
-      assert sink.num_shards in (0, 1)
-      if sink.shard_name_template:
-        sink.file_path += '-00000-of-00001'
-      sink.file_path += sink.file_name_suffix
-    sink.pipeline_options = transform_node.inputs[0].pipeline.options
-    with sink.writer() as writer:
-      for v in self._cache.get_pvalue(transform_node.inputs[0]):
-        self.debug_counters['element_counts'][transform_node.full_label] += 1
-        writer.Write(v.value)
-
-
-class DirectPipelineResult(PipelineResult):
-  """A DirectPipelineResult provides access to info about a pipeline."""
-
-  def __init__(self, state, counter_factory=None):
-    super(DirectPipelineResult, self).__init__(state)
-    self._counter_factory = counter_factory
-
-  def aggregated_values(self, aggregator_or_name):
-    return self._counter_factory.get_aggregator_values(aggregator_or_name)
-
-
-class EagerPipelineRunner(DirectPipelineRunner):
-
-  is_eager = True
-
-  def __init__(self):
-    super(EagerPipelineRunner, self).__init__()
-    self._seen_transforms = set()
-
-  def run_transform(self, transform):
-    if transform not in self._seen_transforms:
-      self._seen_transforms.add(transform)
-      super(EagerPipelineRunner, self).run_transform(transform)
-
-
-class DiskCachedPipelineRunner(DirectPipelineRunner):
-  """A DirectPipelineRunner that uses a disk backed cache.
-
-  DiskCachedPipelineRunner uses a temporary disk backed cache for running
-  pipelines. This allows for running pipelines that will require more memory
-  than it is available, however this comes with a performance cost due to disk
-  IO.
-
-  Memory requirement for DiskCachedPipelineRunner is approximately capped by the
-  single transform in the pipeline that consumes and outputs largest total
-  collection (i.e. inputs, side-inputs and outputs in aggregate). In the extreme
-  case a where a transform will use all previous intermediate values as input,
-  memory requirements for DiskCachedPipelineRunner will be the same as
-  DirectPipelineRunner.
-  """
-
-  def __init__(self):
-    self._null_cache = ()
-    super(DiskCachedPipelineRunner, self).__init__(self._null_cache)
-
-  def run(self, pipeline):
-    try:
-      self._cache = PValueCache(use_disk_backed_cache=True)
-      return super(DirectPipelineRunner, self).run(pipeline)
-    finally:
-      del self._cache
-      self._cache = self._null_cache
-
-  @property
-  def cache(self):
-    raise NotImplementedError(
-        'DiskCachedPipelineRunner does not keep cache outside the scope of its '
-        'run method.')

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/__init__.py b/sdks/python/apache_beam/runners/inprocess/__init__.py
deleted file mode 100644
index 53e725a..0000000
--- a/sdks/python/apache_beam/runners/inprocess/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Inprocess runner executes pipelines locally in a single process."""
-from apache_beam.runners.inprocess.inprocess_runner import InProcessPipelineRunner

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/bundle_factory.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/bundle_factory.py b/sdks/python/apache_beam/runners/inprocess/bundle_factory.py
deleted file mode 100644
index d284449..0000000
--- a/sdks/python/apache_beam/runners/inprocess/bundle_factory.py
+++ /dev/null
@@ -1,102 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A factory that creates UncommittedBundles."""
-
-from __future__ import absolute_import
-
-from apache_beam import pvalue
-
-
-class BundleFactory(object):
-  """BundleFactory creates output bundles to be used by transform evaluators."""
-
-  def create_bundle(self, output_pcollection):
-    return Bundle(output_pcollection)
-
-  def create_empty_committed_bundle(self, output_pcollection):
-    bundle = self.create_bundle(output_pcollection)
-    bundle.commit(None)
-    return bundle
-
-
-# a bundle represents a unit of work that will be processed by a transform.
-class Bundle(object):
-  """Part of a PCollection with output elements.
-
-  Part of a PCollection. Elements are output to a bundle, which will cause them
-  to be executed by PTransform that consume the PCollection this bundle is a
-  part of at a later point. It starts as an uncommitted bundle and can have
-  elements added to it. It needs to be committed to make it immutable before
-  passing it to a downstream ptransform.
-  """
-
-  def __init__(self, pcollection):
-    assert (isinstance(pcollection, pvalue.PCollection)
-            or isinstance(pcollection, pvalue.PCollectionView))
-    self._pcollection = pcollection
-    self._elements = []
-    self._committed = False
-    self._tag = None  # optional tag information for this bundle
-
-  @property
-  def elements(self):
-    """Returns iterable elements. If not committed will return a copy."""
-    if self._committed:
-      return self._elements
-    else:
-      return list(self._elements)
-
-  @property
-  def tag(self):
-    return self._tag
-
-  @tag.setter
-  def tag(self, value):
-    assert not self._tag
-    self._tag = value
-
-  @property
-  def pcollection(self):
-    """PCollection that the elements of this UncommittedBundle belong to."""
-    return self._pcollection
-
-  def add(self, element):
-    """Outputs an element to this bundle.
-
-    Args:
-      element: WindowedValue
-    """
-    assert not self._committed
-    self._elements.append(element)
-
-  def output(self, element):
-    self.add(element)
-
-  def commit(self, synchronized_processing_time):
-    """Commits this bundle.
-
-    Uncommitted bundle will become committed (immutable) after this call.
-
-    Args:
-      synchronized_processing_time: the synchronized processing time at which
-      this bundle was committed
-    """
-    assert not self._committed
-    self._committed = True
-    self._elements = tuple(self._elements)
-    self._synchronized_processing_time = synchronized_processing_time

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/clock.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/clock.py b/sdks/python/apache_beam/runners/inprocess/clock.py
deleted file mode 100644
index 11e49cd..0000000
--- a/sdks/python/apache_beam/runners/inprocess/clock.py
+++ /dev/null
@@ -1,50 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Clock implementations for real time processing and testing."""
-
-from __future__ import absolute_import
-
-import time
-
-
-class Clock(object):
-
-  @property
-  def now(self):
-    """Returns the number of milliseconds since epoch."""
-    return int(time.time() * 1000)
-
-
-class MockClock(Clock):
-  """Mock clock implementation for testing."""
-
-  def __init__(self, now_in_ms):
-    self._now_in_ms = now_in_ms
-
-  @property
-  def now(self):
-    return self._now_in_ms
-
-  @now.setter
-  def now(self, value_in_ms):
-    assert value_in_ms >= self._now_in_ms
-    self._now_in_ms = value_in_ms
-
-  def advance(self, duration_in_ms):
-    assert duration_in_ms >= 0
-    self._now_in_ms += duration_in_ms

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor.py b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor.py
deleted file mode 100644
index 6f1757a..0000000
--- a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor.py
+++ /dev/null
@@ -1,59 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""ConsumerTrackingPipelineVisitor, a PipelineVisitor object."""
-
-from __future__ import absolute_import
-
-from apache_beam import pvalue
-from apache_beam.pipeline import PipelineVisitor
-
-
-class ConsumerTrackingPipelineVisitor(PipelineVisitor):
-  """Visitor for extracting value-consumer relations from the graph.
-
-  Tracks the AppliedPTransforms that consume each PValue in the Pipeline. This
-  is used to schedule consuming PTransforms to consume input after the upstream
-  transform has produced and committed output.
-  """
-
-  def __init__(self):
-    self.value_to_consumers = {}  # Map from PValue to [AppliedPTransform].
-    self.root_transforms = set()  # set of (root) AppliedPTransforms.
-    self.views = []               # list of PCollectionViews.
-    self.step_names = {}          # Map from AppliedPTransform to String.
-
-    self._num_transforms = 0
-
-  def visit_value(self, value, producer_node):
-    if value:
-      if isinstance(value, pvalue.PCollectionView):
-        self.views.append(value)
-
-  def visit_transform(self, applied_ptransform):
-    inputs = applied_ptransform.inputs
-    if inputs:
-      for input_value in inputs:
-        if isinstance(input_value, pvalue.PBegin):
-          self.root_transforms.add(applied_ptransform)
-        if input_value not in self.value_to_consumers:
-          self.value_to_consumers[input_value] = []
-        self.value_to_consumers[input_value].append(applied_ptransform)
-    else:
-      self.root_transforms.add(applied_ptransform)
-    self.step_names[applied_ptransform] = 's%d' % (self._num_transforms)
-    self._num_transforms += 1

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
deleted file mode 100644
index 3cd8d73..0000000
--- a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
+++ /dev/null
@@ -1,122 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Tests for consumer_tracking_pipeline_visitor."""
-
-import logging
-import unittest
-
-from apache_beam import pvalue
-from apache_beam.io import Read
-from apache_beam.io import TextFileSource
-from apache_beam.pipeline import Pipeline
-from apache_beam.pvalue import AsList
-from apache_beam.runners.inprocess import InProcessPipelineRunner
-from apache_beam.runners.inprocess.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
-from apache_beam.transforms import CoGroupByKey
-from apache_beam.transforms import Create
-from apache_beam.transforms import DoFn
-from apache_beam.transforms import FlatMap
-from apache_beam.transforms import Flatten
-from apache_beam.transforms import ParDo
-
-# Disable frequent lint warning due to pipe operator for chaining transforms.
-# pylint: disable=expression-not-assigned
-# pylint: disable=pointless-statement
-
-
-class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
-
-  def setUp(self):
-    self.pipeline = Pipeline(InProcessPipelineRunner())
-    self.visitor = ConsumerTrackingPipelineVisitor()
-
-  def test_root_transforms(self):
-    root_create = Create('create', [[1, 2, 3]])
-    root_read = Read('read', TextFileSource('/tmp/somefile'))
-    root_flatten = Flatten('flatten', pipeline=self.pipeline)
-
-    pbegin = pvalue.PBegin(self.pipeline)
-    pcoll_create = pbegin | root_create
-    pbegin | root_read
-    pcoll_create | FlatMap(lambda x: x)
-    [] | root_flatten
-
-    self.pipeline.visit(self.visitor)
-
-    root_transforms = sorted(
-        [t.transform for t in self.visitor.root_transforms])
-    self.assertEqual(root_transforms, sorted(
-        [root_read, root_create, root_flatten]))
-
-    pbegin_consumers = sorted(
-        [c.transform for c in self.visitor.value_to_consumers[pbegin]])
-    self.assertEqual(pbegin_consumers, sorted([root_read, root_create]))
-    self.assertEqual(len(self.visitor.step_names), 4)
-
-  def test_side_inputs(self):
-
-    class SplitNumbersFn(DoFn):
-
-      def process(self, context):
-        if context.element < 0:
-          yield pvalue.SideOutputValue('tag_negative', context.element)
-        else:
-          yield context.element
-
-    class ProcessNumbersFn(DoFn):
-
-      def process(self, context, negatives):
-        yield context.element
-
-    root_create = Create('create', [[-1, 2, 3]])
-
-    result = (self.pipeline
-              | root_create
-              | ParDo(SplitNumbersFn()).with_outputs('tag_negative',
-                                                     main='positive'))
-    positive, negative = result
-    positive | ParDo(ProcessNumbersFn(), AsList(negative))
-
-    self.pipeline.visit(self.visitor)
-
-    root_transforms = sorted(
-        [t.transform for t in self.visitor.root_transforms])
-    self.assertEqual(root_transforms, sorted([root_create]))
-    self.assertEqual(len(self.visitor.step_names), 4)
-    self.assertEqual(len(self.visitor.views), 1)
-    self.assertTrue(isinstance(self.visitor.views[0],
-                               pvalue.ListPCollectionView))
-
-  def test_co_group_by_key(self):
-    emails = self.pipeline | 'email' >> Create([('joe', 'joe@example.com')])
-    phones = self.pipeline | 'phone' >> Create([('mary', '111-222-3333')])
-    {'emails': emails, 'phones': phones} | CoGroupByKey()
-
-    self.pipeline.visit(self.visitor)
-
-    root_transforms = sorted(
-        [t.transform for t in self.visitor.root_transforms])
-    self.assertEqual(len(root_transforms), 2)
-    self.assertGreater(
-        len(self.visitor.step_names), 3)  # 2 creates + expanded CoGBK
-    self.assertEqual(len(self.visitor.views), 0)
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.DEBUG)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
deleted file mode 100644
index 7af1608..0000000
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
+++ /dev/null
@@ -1,272 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""InProcessEvaluationContext tracks global state, triggers and watermarks."""
-
-from __future__ import absolute_import
-
-import collections
-import threading
-
-from apache_beam.transforms import sideinputs
-from apache_beam.runners.inprocess.clock import Clock
-from apache_beam.runners.inprocess.inprocess_watermark_manager import InProcessWatermarkManager
-from apache_beam.runners.inprocess.inprocess_executor import TransformExecutor
-from apache_beam.utils import counters
-
-
-class _InProcessExecutionContext(object):
-
-  def __init__(self, watermarks, existing_state):
-    self._watermarks = watermarks
-    self._existing_state = existing_state
-
-  @property
-  def watermarks(self):
-    return self._watermarks
-
-  @property
-  def existing_state(self):
-    return self._existing_state
-
-
-class _InProcessSideInputView(object):
-
-  def __init__(self, view):
-    self._view = view
-    self.callable_queue = collections.deque()
-    self.value = None
-    self.has_result = False
-
-
-class _InProcessSideInputsContainer(object):
-  """An in-process container for PCollectionViews.
-
-  It provides methods for blocking until a side-input is available and writing
-  to a side input.
-  """
-
-  def __init__(self, views):
-    self._lock = threading.Lock()
-    self._views = {}
-    for view in views:
-      self._views[view] = _InProcessSideInputView(view)
-
-  def get_value_or_schedule_after_output(self, pcollection_view, task):
-    with self._lock:
-      view = self._views[pcollection_view]
-      if not view.has_result:
-        view.callable_queue.append(task)
-        task.blocked = True
-      return (view.has_result, view.value)
-
-  def set_value_and_get_callables(self, pcollection_view, values):
-    with self._lock:
-      view = self._views[pcollection_view]
-      assert not view.has_result
-      assert view.value is None
-      assert view.callable_queue is not None
-      view.value = self._pvalue_to_value(pcollection_view, values)
-      result = tuple(view.callable_queue)
-      for task in result:
-        task.blocked = False
-      view.callable_queue = None
-      view.has_result = True
-      return result
-
-  def _pvalue_to_value(self, view, values):
-    """Given a PCollectionView, returns the associated value in requested form.
-
-    Args:
-      view: PCollectionView for the requested side input.
-      values: Iterable values associated with the side input.
-
-    Returns:
-      The side input in its requested form.
-
-    Raises:
-      ValueError: If values cannot be converted into the requested form.
-    """
-    return sideinputs.SideInputMap(type(view), view._view_options(), values)
-
-
-class InProcessEvaluationContext(object):
-  """Evaluation context with the global state information of the pipeline.
-
-  The evaluation context for a specific pipeline being executed by the
-  InProcessPipelineRunner. Contains state shared within the execution across all
-  transforms.
-
-  InProcessEvaluationContext contains shared state for an execution of the
-  InProcessPipelineRunner that can be used while evaluating a PTransform. This
-  consists of views into underlying state and watermark implementations, access
-  to read and write PCollectionViews, and constructing counter sets and
-  execution contexts. This includes executing callbacks asynchronously when
-  state changes to the appropriate point (e.g. when a PCollectionView is
-  requested and known to be empty).
-
-  InProcessEvaluationContext also handles results by committing finalizing
-  bundles based on the current global state and updating the global state
-  appropriately. This includes updating the per-(step,key) state, updating
-  global watermarks, and executing any callbacks that can be executed.
-  """
-
-  def __init__(self, pipeline_options, bundle_factory, root_transforms,
-               value_to_consumers, step_names, views):
-    self.pipeline_options = pipeline_options
-    self._bundle_factory = bundle_factory
-    self._root_transforms = root_transforms
-    self._value_to_consumers = value_to_consumers
-    self._step_names = step_names
-    self.views = views
-
-    # AppliedPTransform -> Evaluator specific state objects
-    self._application_state_interals = {}
-    self._watermark_manager = InProcessWatermarkManager(
-        Clock(), root_transforms, value_to_consumers)
-    self._side_inputs_container = _InProcessSideInputsContainer(views)
-    self._pending_unblocked_tasks = []
-    self._counter_factory = counters.CounterFactory()
-    self._cache = None
-
-    self._lock = threading.Lock()
-
-  def use_pvalue_cache(self, cache):
-    assert not self._cache
-    self._cache = cache
-
-  @property
-  def has_cache(self):
-    return self._cache is not None
-
-  def append_to_cache(self, applied_ptransform, tag, elements):
-    with self._lock:
-      assert self._cache
-      self._cache.append(applied_ptransform, tag, elements)
-
-  def is_root_transform(self, applied_ptransform):
-    return applied_ptransform in self._root_transforms
-
-  def handle_result(
-      self, completed_bundle, completed_timers, result):
-    """Handle the provided result produced after evaluating the input bundle.
-
-    Handle the provided InProcessTransformResult, produced after evaluating
-    the provided committed bundle (potentially None, if the result of a root
-    PTransform).
-
-    The result is the output of running the transform contained in the
-    InProcessTransformResult on the contents of the provided bundle.
-
-    Args:
-      completed_bundle: the bundle that was processed to produce the result.
-      completed_timers: the timers that were delivered to produce the
-                        completed_bundle.
-      result: the InProcessTransformResult of evaluating the input bundle
-
-    Returns:
-      the committed bundles contained within the handled result.
-    """
-    with self._lock:
-      committed_bundles = self._commit_bundles(result.output_bundles)
-      self._watermark_manager.update_watermarks(
-          completed_bundle, result.transform, completed_timers,
-          committed_bundles, result.watermark_hold)
-
-      # If the result is for a view, update side inputs container.
-      if (result.output_bundles
-          and result.output_bundles[0].pcollection in self.views):
-        if committed_bundles:
-          assert len(committed_bundles) == 1
-          side_input_result = committed_bundles[0].elements
-        else:
-          side_input_result = []
-        tasks = self._side_inputs_container.set_value_and_get_callables(
-            result.output_bundles[0].pcollection, side_input_result)
-        self._pending_unblocked_tasks.extend(tasks)
-
-      if result.counters:
-        for counter in result.counters:
-          merged_counter = self._counter_factory.get_counter(
-              counter.name, counter.combine_fn)
-          merged_counter.accumulator.merge([counter.accumulator])
-
-      self._application_state_interals[result.transform] = result.state
-      return committed_bundles
-
-  def get_aggregator_values(self, aggregator_or_name):
-    return self._counter_factory.get_aggregator_values(aggregator_or_name)
-
-  def schedule_pending_unblocked_tasks(self, executor_service):
-    if self._pending_unblocked_tasks:
-      with self._lock:
-        for task in self._pending_unblocked_tasks:
-          executor_service.submit(task)
-        self._pending_unblocked_tasks = []
-
-  def _commit_bundles(self, uncommitted_bundles):
-    """Commits bundles and returns a immutable set of committed bundles."""
-    for in_progress_bundle in uncommitted_bundles:
-      producing_applied_ptransform = in_progress_bundle.pcollection.producer
-      watermarks = self._watermark_manager.get_watermarks(
-          producing_applied_ptransform)
-      in_progress_bundle.commit(watermarks.synchronized_processing_output_time)
-    return tuple(uncommitted_bundles)
-
-  def get_execution_context(self, applied_ptransform):
-    return _InProcessExecutionContext(
-        self._watermark_manager.get_watermarks(applied_ptransform),
-        self._application_state_interals.get(applied_ptransform))
-
-  def create_bundle(self, output_pcollection):
-    """Create an uncommitted bundle for the specified PCollection."""
-    return self._bundle_factory.create_bundle(output_pcollection)
-
-  def create_empty_committed_bundle(self, output_pcollection):
-    """Create empty bundle useful for triggering evaluation."""
-    return self._bundle_factory.create_empty_committed_bundle(
-        output_pcollection)
-
-  def extract_fired_timers(self):
-    return self._watermark_manager.extract_fired_timers()
-
-  def is_done(self, transform=None):
-    """Checks completion of a step or the pipeline.
-
-    Args:
-      transform: AppliedPTransform to check for completion.
-
-    Returns:
-      True if the step will not produce additional output. If transform is None
-      returns true if all steps are done.
-    """
-    if transform:
-      return self._is_transform_done(transform)
-    else:
-      for applied_ptransform in self._step_names:
-        if not self._is_transform_done(applied_ptransform):
-          return False
-      return True
-
-  def _is_transform_done(self, transform):
-    tw = self._watermark_manager.get_watermarks(transform)
-    return tw.output_watermark == InProcessWatermarkManager.WATERMARK_POS_INF
-
-  def get_value_or_schedule_after_output(self, pcollection_view, task):
-    assert isinstance(task, TransformExecutor)
-    return self._side_inputs_container.get_value_or_schedule_after_output(
-        pcollection_view, task)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/inprocess_executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_executor.py b/sdks/python/apache_beam/runners/inprocess/inprocess_executor.py
deleted file mode 100644
index 2136855..0000000
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_executor.py
+++ /dev/null
@@ -1,550 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""An executor that schedules and executes applied ptransforms."""
-
-from __future__ import absolute_import
-
-import collections
-import logging
-import Queue
-import threading
-import traceback
-from weakref import WeakValueDictionary
-
-
-class ExecutorService(object):
-  """Thread pool for executing tasks in parallel."""
-
-  class CallableTask(object):
-
-    def __call__(self):
-      pass
-
-    @property
-    def name(self):
-      return None
-
-  class ExecutorServiceWorker(threading.Thread):
-    """Worker thread for executing a single task at a time."""
-
-    # Amount to block waiting for getting an item from the queue in seconds.
-    TIMEOUT = 5
-
-    def __init__(self, queue, index):
-      super(ExecutorService.ExecutorServiceWorker, self).__init__()
-      self.queue = queue
-      self._index = index
-      self._default_name = 'ExecutorServiceWorker-' + str(index)
-      self._update_name()
-      self.shutdown_requested = False
-      self.start()
-
-    def _update_name(self, task=None):
-      if task and task.name:
-        name = task.name
-      else:
-        name = self._default_name
-      self.name = 'Thread: %d, %s (%s)' % (
-          self._index, name, 'executing' if task else 'idle')
-
-    def _get_task_or_none(self):
-      try:
-        # Do not block indefinitely, otherwise we may not act for a requested
-        # shutdown.
-        return self.queue.get(
-            timeout=ExecutorService.ExecutorServiceWorker.TIMEOUT)
-      except Queue.Empty:
-        return None
-
-    def run(self):
-      while not self.shutdown_requested:
-        task = self._get_task_or_none()
-        if task:
-          try:
-            if not self.shutdown_requested:
-              self._update_name(task)
-              task()
-              self._update_name()
-          finally:
-            self.queue.task_done()
-
-    def shutdown(self):
-      self.shutdown_requested = True
-
-  def __init__(self, num_workers):
-    self.queue = Queue.Queue()
-    self.workers = [ExecutorService.ExecutorServiceWorker(
-        self.queue, i) for i in range(num_workers)]
-    self.shutdown_requested = False
-
-  def submit(self, task):
-    assert isinstance(task, ExecutorService.CallableTask)
-    if not self.shutdown_requested:
-      self.queue.put(task)
-
-  def await_completion(self):
-    for worker in self.workers:
-      worker.join()
-
-  def shutdown(self):
-    self.shutdown_requested = True
-
-    for worker in self.workers:
-      worker.shutdown()
-
-    # Consume all the remaining items in the queue
-    while not self.queue.empty():
-      try:
-        self.queue.get_nowait()
-        self.queue.task_done()
-      except Queue.Empty:
-        continue
-    # All existing threads will eventually terminate (after they complete their
-    # last task).
-
-
-class TransformEvaluationState(object):
-
-  def __init__(self, executor_service, scheduled):
-    self.executor_service = executor_service
-    self.scheduled = scheduled
-
-  def schedule(self, work):
-    self.scheduled.add(work)
-    self.executor_service.submit(work)
-
-  def complete(self, completed_work):
-    self.scheduled.remove(completed_work)
-
-
-class ParallelEvaluationState(TransformEvaluationState):
-  """A TransformEvaluationState with unlimited parallelism.
-
-  Any TransformExecutor scheduled will be immediately submitted to the
-  ExecutorService.
-
-  A principal use of this is for evaluators that can generate output bundles
-  only using the input bundle (e.g. ParDo).
-  """
-  pass
-
-
-class SerialEvaluationState(TransformEvaluationState):
-  """A TransformEvaluationState with a single work queue.
-
-  Any TransformExecutor scheduled will be placed on the work queue. Only one
-  item of work will be submitted to the ExecutorService at any time.
-
-  A principal use of this is for evaluators that keeps a global state such as
-  GroupByKeyOnly.
-  """
-
-  def __init__(self, executor_service, scheduled):
-    super(SerialEvaluationState, self).__init__(executor_service, scheduled)
-    self.serial_queue = collections.deque()
-    self.currently_evaluating = None
-    self._lock = threading.Lock()
-
-  def complete(self, completed_work):
-    self._update_currently_evaluating(None, completed_work)
-    super(SerialEvaluationState, self).complete(completed_work)
-
-  def schedule(self, new_work):
-    self._update_currently_evaluating(new_work, None)
-
-  def _update_currently_evaluating(self, new_work, completed_work):
-    with self._lock:
-      if new_work:
-        self.serial_queue.append(new_work)
-      if completed_work:
-        assert self.currently_evaluating == completed_work
-        self.currently_evaluating = None
-      if self.serial_queue and not self.currently_evaluating:
-        next_work = self.serial_queue.pop()
-        self.currently_evaluating = next_work
-        super(SerialEvaluationState, self).schedule(next_work)
-
-
-class TransformExecutorServices(object):
-  """Schedules and completes TransformExecutors.
-
-  Controls the concurrency as appropriate for the applied transform the executor
-  exists for.
-  """
-
-  def __init__(self, executor_service):
-    self._executor_service = executor_service
-    self._scheduled = set()
-    self._parallel = ParallelEvaluationState(
-        self._executor_service, self._scheduled)
-    self._serial_cache = WeakValueDictionary()
-
-  def parallel(self):
-    return self._parallel
-
-  def serial(self, step):
-    cached = self._serial_cache.get(step)
-    if not cached:
-      cached = SerialEvaluationState(self._executor_service, self._scheduled)
-      self._serial_cache[step] = cached
-    return  cached
-
-  @property
-  def executors(self):
-    return frozenset(self._scheduled)
-
-
-class _CompletionCallback(object):
-  """The default completion callback.
-
-  The default completion callback is used to complete transform evaluations
-  that are triggered due to the arrival of elements from an upstream transform,
-  or for a source transform.
-  """
-
-  def __init__(self, evaluation_context, all_updates, timers=None):
-    self._evaluation_context = evaluation_context
-    self._all_updates = all_updates
-    self._timers = timers
-
-  def handle_result(self, input_committed_bundle, transform_result):
-    output_committed_bundles = self._evaluation_context.handle_result(
-        input_committed_bundle, self._timers, transform_result)
-    for output_committed_bundle in output_committed_bundles:
-      self._all_updates.offer(_ExecutorServiceParallelExecutor.ExecutorUpdate(
-          output_committed_bundle, None))
-    return output_committed_bundles
-
-  def handle_exception(self, exception):
-    self._all_updates.offer(
-        _ExecutorServiceParallelExecutor.ExecutorUpdate(None, exception))
-
-
-class _TimerCompletionCallback(_CompletionCallback):
-
-  def __init__(self, evaluation_context, all_updates, timers):
-    super(_TimerCompletionCallback, self).__init__(
-        evaluation_context, all_updates, timers)
-
-
-class TransformExecutor(ExecutorService.CallableTask):
-  """TransformExecutor will evaluate a bundle using an applied ptransform.
-
-  A CallableTask responsible for constructing a TransformEvaluator andevaluating
-  it on some bundle of input, and registering the result using the completion
-  callback.
-  """
-
-  def __init__(self, transform_evaluator_registry, evaluation_context,
-               input_bundle, applied_transform, completion_callback,
-               transform_evaluation_state):
-    self._transform_evaluator_registry = transform_evaluator_registry
-    self._evaluation_context = evaluation_context
-    self._input_bundle = input_bundle
-    self._applied_transform = applied_transform
-    self._completion_callback = completion_callback
-    self._transform_evaluation_state = transform_evaluation_state
-    self._side_input_values = {}
-    self.blocked = False
-    self._call_count = 0
-
-  def __call__(self):
-    self._call_count += 1
-    assert self._call_count <= (1 + len(self._applied_transform.side_inputs))
-
-    for side_input in self._applied_transform.side_inputs:
-      if side_input not in self._side_input_values:
-        has_result, value = (
-            self._evaluation_context.get_value_or_schedule_after_output(
-                side_input, self))
-        if not has_result:
-          # Monitor task will reschedule this executor once the side input is
-          # available.
-          return
-        self._side_input_values[side_input] = value
-
-    side_input_values = [self._side_input_values[side_input]
-                         for side_input in self._applied_transform.side_inputs]
-
-    try:
-      evaluator = self._transform_evaluator_registry.for_application(
-          self._applied_transform, self._input_bundle, side_input_values)
-
-      if self._input_bundle:
-        for value in self._input_bundle.elements:
-          evaluator.process_element(value)
-
-      result = evaluator.finish_bundle()
-
-      if self._evaluation_context.has_cache:
-        for uncommitted_bundle in result.output_bundles:
-          self._evaluation_context.append_to_cache(
-              self._applied_transform, uncommitted_bundle.tag,
-              uncommitted_bundle.elements)
-        undeclared_tag_values = result.undeclared_tag_values
-        if undeclared_tag_values:
-          for tag, value in undeclared_tag_values.iteritems():
-            self._evaluation_context.append_to_cache(
-                self._applied_transform, tag, value)
-
-      self._completion_callback.handle_result(self._input_bundle, result)
-      return result
-    except Exception as e:  # pylint: disable=broad-except
-      logging.warning('Task failed: %s', traceback.format_exc(), exc_info=True)
-      self._completion_callback.handle_exception(e)
-    finally:
-      self._transform_evaluation_state.complete(self)
-
-
-class InProcessExecutor(object):
-
-  def __init__(self, *args, **kwargs):
-    self._executor = _ExecutorServiceParallelExecutor(*args, **kwargs)
-
-  def start(self, roots):
-    self._executor.start(roots)
-
-  def await_completion(self):
-    self._executor.await_completion()
-
-
-class _ExecutorServiceParallelExecutor(object):
-  """An internal implementation for InProcessExecutor."""
-
-  NUM_WORKERS = 1
-
-  def __init__(self, value_to_consumers, transform_evaluator_registry,
-               evaluation_context):
-    self.executor_service = ExecutorService(
-        _ExecutorServiceParallelExecutor.NUM_WORKERS)
-    self.transform_executor_services = TransformExecutorServices(
-        self.executor_service)
-    self.value_to_consumers = value_to_consumers
-    self.transform_evaluator_registry = transform_evaluator_registry
-    self.evaluation_context = evaluation_context
-    self.all_updates = _ExecutorServiceParallelExecutor._TypedUpdateQueue(
-        _ExecutorServiceParallelExecutor.ExecutorUpdate)
-    self.visible_updates = _ExecutorServiceParallelExecutor._TypedUpdateQueue(
-        _ExecutorServiceParallelExecutor.VisibleExecutorUpdate)
-    self.default_completion_callback = _CompletionCallback(
-        evaluation_context, self.all_updates)
-
-  def start(self, roots):
-    self.root_nodes = frozenset(roots)
-    self.executor_service.submit(
-        _ExecutorServiceParallelExecutor._MonitorTask(self))
-
-  def await_completion(self):
-    update = self.visible_updates.take()
-    try:
-      if update.exception:
-        raise update.exception
-    finally:
-      self.executor_service.shutdown()
-
-  def schedule_consumers(self, committed_bundle):
-    if committed_bundle.pcollection in self.value_to_consumers:
-      consumers = self.value_to_consumers[committed_bundle.pcollection]
-      for applied_ptransform in consumers:
-        self.schedule_consumption(applied_ptransform, committed_bundle,
-                                  self.default_completion_callback)
-
-  def schedule_consumption(self, consumer_applied_transform, committed_bundle,
-                           on_complete):
-    """Schedules evaluation of the given bundle with the transform."""
-    assert all([consumer_applied_transform, on_complete])
-    assert committed_bundle or consumer_applied_transform in self.root_nodes
-    if (committed_bundle
-        and self.transform_evaluator_registry.should_execute_serially(
-            consumer_applied_transform)):
-      transform_executor_service = self.transform_executor_services.serial(
-          consumer_applied_transform)
-    else:
-      transform_executor_service = self.transform_executor_services.parallel()
-
-    transform_executor = TransformExecutor(
-        self.transform_evaluator_registry, self.evaluation_context,
-        committed_bundle, consumer_applied_transform, on_complete,
-        transform_executor_service)
-    transform_executor_service.schedule(transform_executor)
-
-  class _TypedUpdateQueue(object):
-    """Type checking update queue with blocking and non-blocking operations."""
-
-    def __init__(self, item_type):
-      self._item_type = item_type
-      self._queue = Queue.Queue()
-
-    def poll(self):
-      try:
-        item = self._queue.get_nowait()
-        self._queue.task_done()
-        return  item
-      except Queue.Empty:
-        return None
-
-    def take(self):
-      item = self._queue.get()
-      self._queue.task_done()
-      return item
-
-    def offer(self, item):
-      assert isinstance(item, self._item_type)
-      self._queue.put_nowait(item)
-
-  class ExecutorUpdate(object):
-    """An internal status update on the state of the executor."""
-
-    def __init__(self, produced_bundle=None, exception=None):
-      # Exactly one of them should be not-None
-      assert bool(produced_bundle) != bool(exception)
-      self.committed_bundle = produced_bundle
-      self.exception = exception
-
-  class VisibleExecutorUpdate(object):
-    """An update of interest to the user.
-
-    Used for awaiting the completion to decide whether to return normally or
-    raise an exception.
-    """
-
-    def __init__(self, exception=None):
-      self.finished = exception is not None
-      self.exception = exception
-
-  class _MonitorTask(ExecutorService.CallableTask):
-    """MonitorTask continuously runs to ensure that pipeline makes progress."""
-
-    def __init__(self, executor):
-      self._executor = executor
-
-    @property
-    def name(self):
-      return 'monitor'
-
-    def __call__(self):
-      try:
-        update = self._executor.all_updates.poll()
-        while update:
-          if update.committed_bundle:
-            self._executor.schedule_consumers(update.committed_bundle)
-          else:
-            assert update.exception
-            logging.warning('A task failed with exception.\n %s',
-                            update.exception)
-            self._executor.visible_updates.offer(
-                _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
-                    update.exception))
-          update = self._executor.all_updates.poll()
-        self._executor.evaluation_context.schedule_pending_unblocked_tasks(
-            self._executor.executor_service)
-        self._add_work_if_necessary(self._fire_timers())
-      except Exception as e:  # pylint: disable=broad-except
-        logging.error('Monitor task died due to exception.\n %s', e)
-        self._executor.visible_updates.offer(
-            _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(e))
-      finally:
-        if not self._should_shutdown():
-          self._executor.executor_service.submit(self)
-
-    def _should_shutdown(self):
-      """_should_shutdown checks whether pipeline is completed or not.
-
-      It will check for successful completion by checking the watermarks of all
-      transforms. If they all reached the maximum watermark it means that
-      pipeline successfully reached to completion.
-
-      If the above is not true, it will check that at least one executor is
-      making progress. Otherwise pipeline will be declared stalled.
-
-      If the pipeline reached to a terminal state as explained above
-      _should_shutdown will request executor to gracefully shutdown.
-
-      Returns:
-        True if pipeline reached a terminal state and monitor task could finish.
-        Otherwise monitor task should schedule itself again for future
-        execution.
-      """
-      if self._executor.evaluation_context.is_done():
-        self._executor.visible_updates.offer(
-            _ExecutorServiceParallelExecutor.VisibleExecutorUpdate())
-        self._executor.executor_service.shutdown()
-        return True
-      elif not self._is_executing:
-        self._executor.visible_updates.offer(
-            _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
-                Exception('Monitor task detected a pipeline stall.')))
-        self._executor.executor_service.shutdown()
-        return True
-      return False
-
-    def _fire_timers(self):
-      """Schedules triggered consumers if any timers fired.
-
-      Returns:
-        True if timers fired.
-      """
-      fired_timers = self._executor.evaluation_context.extract_fired_timers()
-      for applied_ptransform in fired_timers:
-        # Use an empty committed bundle. just to trigger.
-        empty_bundle = (
-            self._executor.evaluation_context.create_empty_committed_bundle(
-                applied_ptransform.inputs[0]))
-        timer_completion_callback = _TimerCompletionCallback(
-            self._executor.evaluation_context, self._executor.all_updates,
-            applied_ptransform)
-
-        self._executor.schedule_consumption(
-            applied_ptransform, empty_bundle, timer_completion_callback)
-      return bool(fired_timers)
-
-    def _is_executing(self):
-      """Returns True if there is at least one non-blocked TransformExecutor."""
-      for transform_executor in (
-          self._executor.transform_executor_services.executors):
-        if not transform_executor.blocked:
-          return True
-      return False
-
-    def _add_work_if_necessary(self, timers_fired):
-      """Adds more work from the roots if pipeline requires more input.
-
-      If all active TransformExecutors are in a blocked state, add more work
-      from root nodes that may have additional work. This ensures that if a
-      pipeline has elements available from the root nodes it will add those
-      elements when necessary.
-
-      Args:
-        timers_fired: True if any timers fired prior to this call.
-      """
-      # If any timers have fired, they will add more work; No need to add more.
-      if timers_fired:
-        return
-
-      if self._is_executing():
-        # We have at least one executor that can proceed without adding
-        # additional work.
-        return
-
-      # All current TransformExecutors are blocked; add more work from the
-      # roots.
-      for applied_transform in self._executor.root_nodes:
-        if not self._executor.evaluation_context.is_done(applied_transform):
-          self._executor.schedule_consumption(
-              applied_transform, None,
-              self._executor.default_completion_callback)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/inprocess_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_runner.py b/sdks/python/apache_beam/runners/inprocess/inprocess_runner.py
deleted file mode 100644
index 287c170..0000000
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_runner.py
+++ /dev/null
@@ -1,142 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""InProcessPipelineRunner, executing on the local machine."""
-
-from __future__ import absolute_import
-
-import collections
-import logging
-
-from apache_beam.runners.inprocess.bundle_factory import BundleFactory
-from apache_beam.runners.inprocess.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
-from apache_beam.runners.inprocess.inprocess_evaluation_context import InProcessEvaluationContext
-from apache_beam.runners.inprocess.inprocess_executor import InProcessExecutor
-from apache_beam.runners.inprocess.transform_evaluator import TransformEvaluatorRegistry
-from apache_beam.runners.runner import PipelineResult
-from apache_beam.runners.runner import PipelineRunner
-from apache_beam.runners.runner import PipelineState
-from apache_beam.runners.runner import PValueCache
-
-
-class InProcessPipelineRunner(PipelineRunner):
-  """Executes a single pipeline on the local machine."""
-
-  def __init__(self):
-    self._cache = None
-
-  def run(self, pipeline):
-    """Execute the entire pipeline and returns an InProcessPipelineResult."""
-    logging.info('Running pipeline with InProcessPipelineRunner.')
-    self.visitor = ConsumerTrackingPipelineVisitor()
-    pipeline.visit(self.visitor)
-
-    evaluation_context = InProcessEvaluationContext(
-        pipeline.options,
-        BundleFactory(),
-        self.visitor.root_transforms,
-        self.visitor.value_to_consumers,
-        self.visitor.step_names,
-        self.visitor.views)
-
-    evaluation_context.use_pvalue_cache(self._cache)
-
-    executor = InProcessExecutor(self.visitor.value_to_consumers,
-                                 TransformEvaluatorRegistry(evaluation_context),
-                                 evaluation_context)
-    # Start the executor. This is a non-blocking call, it will start the
-    # execution in background threads and return.
-    executor.start(self.visitor.root_transforms)
-    result = InProcessPipelineResult(executor, evaluation_context)
-
-    # TODO(altay): If blocking:
-    # Block until the pipeline completes. This call will return after the
-    # pipeline was fully terminated (successfully or with a failure).
-    result.await_completion()
-
-    if self._cache:
-      self._cache.finalize()
-
-    return result
-
-  @property
-  def cache(self):
-    if not self._cache:
-      self._cache = InProcessBufferingInMemoryCache()
-    return self._cache.pvalue_cache
-
-  def apply(self, transform, input):  # pylint: disable=redefined-builtin
-    """Runner callback for a pipeline.apply call."""
-    return transform.apply(input)
-
-
-class InProcessBufferingInMemoryCache(object):
-  """PValueCache wrapper for buffering bundles until a PValue is fully computed.
-
-  InProcessBufferingInMemoryCache keeps an in memory cache of
-  (applied_ptransform, tag) tuples. It accepts appending to existing cache
-  entries until it is finalized. finalize() will make all the existing cached
-  entries visible to the underyling PValueCache in their entirety, clean the in
-  memory cache and stop accepting new cache entries.
-  """
-
-  def __init__(self):
-    self._cache = collections.defaultdict(list)
-    self._pvalue_cache = PValueCache()
-    self._finalized = False
-
-  @property
-  def pvalue_cache(self):
-    return self._pvalue_cache
-
-  def append(self, applied_ptransform, tag, elements):
-    assert not self._finalized
-    assert elements is not None
-    self._cache[(applied_ptransform, tag)].extend(elements)
-
-  def finalize(self):
-    """Make buffered cache elements visible to the underlying PValueCache."""
-    assert not self._finalized
-    for key, value in self._cache.iteritems():
-      applied_ptransform, tag = key
-      self._pvalue_cache.cache_output(applied_ptransform, tag, value)
-      self._cache = None
-
-
-class InProcessPipelineResult(PipelineResult):
-  """A InProcessPipelineResult provides access to info about a pipeline."""
-
-  def __init__(self, executor, evaluation_context):
-    super(InProcessPipelineResult, self).__init__(PipelineState.RUNNING)
-    self._executor = executor
-    self._evaluation_context = evaluation_context
-
-  def _is_in_terminal_state(self):
-    return self._state is not PipelineState.RUNNING
-
-  def await_completion(self):
-    if not self._is_in_terminal_state():
-      try:
-        self._executor.await_completion()
-        self._state = PipelineState.DONE
-      except:  # pylint: disable=broad-except
-        self._state = PipelineState.FAILED
-        raise
-    return self._state
-
-  def aggregated_values(self, aggregator_or_name):
-    return self._evaluation_context.get_aggregator_values(aggregator_or_name)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py b/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
deleted file mode 100644
index aa9db24..0000000
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
+++ /dev/null
@@ -1,121 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Tests for InProcessPipelineRunner."""
-
-import logging
-import unittest
-
-from apache_beam import Pipeline
-import apache_beam.examples.snippets.snippets_test as snippets_test
-import apache_beam.io.fileio_test as fileio_test
-import apache_beam.io.textio_test as textio_test
-import apache_beam.io.sources_test as sources_test
-import apache_beam.pipeline_test as pipeline_test
-import apache_beam.pvalue_test as pvalue_test
-from apache_beam.runners.inprocess.inprocess_runner import InProcessPipelineRunner
-import apache_beam.transforms.aggregator_test as aggregator_test
-import apache_beam.transforms.combiners_test as combiners_test
-import apache_beam.transforms.ptransform_test as ptransform_test
-import apache_beam.transforms.trigger_test as trigger_test
-import apache_beam.transforms.window_test as window_test
-import apache_beam.transforms.write_ptransform_test as write_ptransform_test
-import apache_beam.typehints.typed_pipeline_test as typed_pipeline_test
-
-
-class TestWithInProcessPipelineRunner(object):
-
-  def setUp(self):
-    original_init = Pipeline.__init__
-
-    def override_pipeline_init(self, runner=None, options=None, argv=None):
-      runner = InProcessPipelineRunner()
-      return original_init(self, runner, options, argv)
-
-    self.runner_name = None
-    self.original_init = original_init
-    Pipeline.__init__ = override_pipeline_init
-
-  def tearDown(self):
-    Pipeline.__init__ = self.original_init
-
-
-class InProcessPipelineRunnerPipelineTest(
-    TestWithInProcessPipelineRunner, pipeline_test.PipelineTest):
-
-  def test_cached_pvalues_are_refcounted(self):
-    # InProcessPipelineRunner does not have a refcounted cache.
-    pass
-
-  def test_eager_pipeline(self):
-    # Tests eager runner only
-    pass
-
-
-class InProcessPipelineRunnerSnippetsTest(
-    TestWithInProcessPipelineRunner, snippets_test.SnippetsTest,
-    snippets_test.ParDoTest, snippets_test.TypeHintsTest,
-    snippets_test.CombineTest):
-  pass
-
-
-class InProcessPipelineRunnerTransform(
-    TestWithInProcessPipelineRunner, aggregator_test.AggregatorTest,
-    combiners_test.CombineTest, ptransform_test.PTransformTest,
-    pvalue_test.PValueTest, window_test.WindowTest,
-    typed_pipeline_test.MainInputTest, typed_pipeline_test.SideInputTest,
-    typed_pipeline_test.CustomTransformTest, trigger_test.TriggerPipelineTest,
-    write_ptransform_test.WriteTest):
-  pass
-
-
-class TestTextFileSource(
-    TestWithInProcessPipelineRunner, fileio_test.TestTextFileSource):
-  pass
-
-
-class TestNativeTextFileSink(
-    TestWithInProcessPipelineRunner, fileio_test.TestNativeTextFileSink):
-
-  def setUp(self):
-    TestWithInProcessPipelineRunner.setUp(self)
-    fileio_test.TestNativeTextFileSink.setUp(self)
-
-
-class TestTextFileSink(
-    TestWithInProcessPipelineRunner, textio_test.TextSinkTest):
-
-  def setUp(self):
-    TestWithInProcessPipelineRunner.setUp(self)
-    textio_test.TextSinkTest.setUp(self)
-
-
-class MyFileSink(TestWithInProcessPipelineRunner, fileio_test.MyFileSink):
-  pass
-
-
-class TestFileSink(TestWithInProcessPipelineRunner, fileio_test.TestFileSink):
-  pass
-
-
-class SourcesTest(TestWithInProcessPipelineRunner, sources_test.SourcesTest):
-  pass
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.DEBUG)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/inprocess_transform_result.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_transform_result.py b/sdks/python/apache_beam/runners/inprocess/inprocess_transform_result.py
deleted file mode 100644
index 798ebfb..0000000
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_transform_result.py
+++ /dev/null
@@ -1,60 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""The result of evaluating an AppliedPTransform with a TransformEvaluator."""
-
-from __future__ import absolute_import
-
-
-class InProcessTransformResult(object):
-  """The result of evaluating an AppliedPTransform with a TransformEvaluator."""
-
-  def __init__(self, applied_ptransform, uncommitted_output_bundles, state,
-               timer_update, counters, watermark_hold,
-               undeclared_tag_values=None):
-    self._applied_ptransform = applied_ptransform
-    self._uncommitted_output_bundles = uncommitted_output_bundles
-    self._state = state
-    self._timer_update = timer_update
-    self._counters = counters
-    self._watermark_hold = watermark_hold
-    # Only used when caching (materializing) all values is requested.
-    self._undeclared_tag_values = undeclared_tag_values
-
-  @property
-  def transform(self):
-    return self._applied_ptransform
-
-  @property
-  def output_bundles(self):
-    return self._uncommitted_output_bundles
-
-  @property
-  def state(self):
-    return self._state
-
-  @property
-  def counters(self):
-    return self._counters
-
-  @property
-  def watermark_hold(self):
-    return self._watermark_hold
-
-  @property
-  def undeclared_tag_values(self):
-    return self._undeclared_tag_values