You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/08 01:59:34 UTC
[4/5] incubator-beam git commit: Renames InprocessPipelineRunner to
DirectPipelineRunner and removes the existing DirectPipelineRunner. Renamed
the folder to direct to keep all related files in the same folder. Removed
inprocess prefix from file names/cl
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/direct/watermark_manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py
new file mode 100644
index 0000000..f439731
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -0,0 +1,224 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Manages watermarks of PCollections and AppliedPTransforms."""
+
+from __future__ import absolute_import
+
+import threading
+
+from apache_beam import pipeline
+from apache_beam import pvalue
+from apache_beam.transforms.timeutil import MAX_TIMESTAMP
+from apache_beam.transforms.timeutil import MIN_TIMESTAMP
+
+
+class WatermarkManager(object):
+ """Tracks and updates watermarks for all AppliedPTransforms."""
+
+ WATERMARK_POS_INF = MAX_TIMESTAMP
+ WATERMARK_NEG_INF = MIN_TIMESTAMP
+
+ def __init__(self, clock, root_transforms, value_to_consumers):
+ self._clock = clock # processing time clock
+ self._value_to_consumers = value_to_consumers
+ self._root_transforms = root_transforms
+ # AppliedPTransform -> TransformWatermarks
+ self._transform_to_watermarks = {}
+
+ for root_transform in root_transforms:
+ self._transform_to_watermarks[root_transform] = TransformWatermarks(
+ self._clock)
+
+ for consumers in value_to_consumers.values():
+ for consumer in consumers:
+ self._transform_to_watermarks[consumer] = TransformWatermarks(
+ self._clock)
+
+ for consumers in value_to_consumers.values():
+ for consumer in consumers:
+ self._update_input_transform_watermarks(consumer)
+
+ def _update_input_transform_watermarks(self, applied_ptransform):
+ assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
+ input_transform_watermarks = []
+ for input_pvalue in applied_ptransform.inputs:
+ assert input_pvalue.producer or isinstance(input_pvalue, pvalue.PBegin)
+ if input_pvalue.producer:
+ input_transform_watermarks.append(
+ self.get_watermarks(input_pvalue.producer))
+ self._transform_to_watermarks[
+ applied_ptransform].update_input_transform_watermarks(
+ input_transform_watermarks)
+
+ def get_watermarks(self, applied_ptransform):
+ """Gets the input and output watermarks for an AppliedPTransform.
+
+ If the applied_ptransform has not processed any elements, return a
+ watermark with minimum value.
+
+ Args:
+ applied_ptransform: AppliedPTransform to get the watermarks for.
+
+ Returns:
+ A snapshot (TransformWatermarks) of the input watermark and output
+ watermark for the provided transform.
+ """
+
+ # TODO(altay): Composite transforms should have a composite watermark. Until
+ # then they are represented by their last transform.
+ while applied_ptransform.parts:
+ applied_ptransform = applied_ptransform.parts[-1]
+
+ return self._transform_to_watermarks[applied_ptransform]
+
+ def update_watermarks(self, completed_committed_bundle, applied_ptransform,
+ timer_update, outputs, earliest_hold):
+ assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
+ self._update_pending(
+ completed_committed_bundle, applied_ptransform, timer_update, outputs)
+ tw = self.get_watermarks(applied_ptransform)
+ tw.hold(earliest_hold)
+ self._refresh_watermarks(applied_ptransform)
+
+ def _update_pending(self, input_committed_bundle, applied_ptransform,
+ timer_update, output_committed_bundles):
+ """Updated list of pending bundles for the given AppliedPTransform."""
+
+ # Update pending elements. Filter out empty bundles. They do not impact
+ # watermarks and should not trigger downstream execution.
+ for output in output_committed_bundles:
+ if output.elements:
+ if output.pcollection in self._value_to_consumers:
+ consumers = self._value_to_consumers[output.pcollection]
+ for consumer in consumers:
+ consumer_tw = self._transform_to_watermarks[consumer]
+ consumer_tw.add_pending(output)
+
+ completed_tw = self._transform_to_watermarks[applied_ptransform]
+ completed_tw.update_timers(timer_update)
+
+ assert input_committed_bundle or applied_ptransform in self._root_transforms
+ if input_committed_bundle and input_committed_bundle.elements:
+ completed_tw.remove_pending(input_committed_bundle)
+
+ def _refresh_watermarks(self, applied_ptransform):
+ assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
+ tw = self.get_watermarks(applied_ptransform)
+ if tw.refresh():
+ for pval in applied_ptransform.outputs.values():
+ if isinstance(pval, pvalue.DoOutputsTuple):
+ pvals = (v for v in pval)
+ else:
+ pvals = (pval,)
+ for v in pvals:
+ if v in self._value_to_consumers: # If there are downstream consumers
+ consumers = self._value_to_consumers[v]
+ for consumer in consumers:
+ self._refresh_watermarks(consumer)
+
+ def extract_fired_timers(self):
+ all_timers = []
+ for applied_ptransform, tw in self._transform_to_watermarks.iteritems():
+ if tw.extract_fired_timers():
+ all_timers.append(applied_ptransform)
+ return all_timers
+
+
+class TransformWatermarks(object):
+ """Tracks input and output watermarks for aan AppliedPTransform."""
+
+ def __init__(self, clock):
+ self._clock = clock
+ self._input_transform_watermarks = []
+ self._input_watermark = WatermarkManager.WATERMARK_NEG_INF
+ self._output_watermark = WatermarkManager.WATERMARK_NEG_INF
+ self._earliest_hold = WatermarkManager.WATERMARK_POS_INF
+ self._pending = set() # Scheduled bundles targeted for this transform.
+ self._fired_timers = False
+ self._lock = threading.Lock()
+
+ def update_input_transform_watermarks(self, input_transform_watermarks):
+ with self._lock:
+ self._input_transform_watermarks = input_transform_watermarks
+
+ def update_timers(self, timer_update):
+ with self._lock:
+ if timer_update:
+ assert self._fired_timers
+ self._fired_timers = False
+
+ @property
+ def input_watermark(self):
+ with self._lock:
+ return self._input_watermark
+
+ @property
+ def output_watermark(self):
+ with self._lock:
+ return self._output_watermark
+
+ def hold(self, value):
+ with self._lock:
+ if value is None:
+ value = WatermarkManager.WATERMARK_POS_INF
+ self._earliest_hold = value
+
+ def add_pending(self, pending):
+ with self._lock:
+ self._pending.add(pending)
+
+ def remove_pending(self, completed):
+ with self._lock:
+ # Ignore repeated removes. This will happen if a transform has a repeated
+ # input.
+ if completed in self._pending:
+ self._pending.remove(completed)
+
+ def refresh(self):
+ with self._lock:
+ pending_holder = (WatermarkManager.WATERMARK_NEG_INF
+ if self._pending else
+ WatermarkManager.WATERMARK_POS_INF)
+
+ input_watermarks = [
+ tw.output_watermark for tw in self._input_transform_watermarks]
+ input_watermarks.append(WatermarkManager.WATERMARK_POS_INF)
+ producer_watermark = min(input_watermarks)
+
+ self._input_watermark = max(self._input_watermark,
+ min(pending_holder, producer_watermark))
+ new_output_watermark = min(self._input_watermark, self._earliest_hold)
+
+ advanced = new_output_watermark > self._output_watermark
+ self._output_watermark = new_output_watermark
+ return advanced
+
+ @property
+ def synchronized_processing_output_time(self):
+ return self._clock.now
+
+ def extract_fired_timers(self):
+ with self._lock:
+ if self._fired_timers:
+ return False
+
+ should_fire = (
+ self._earliest_hold < WatermarkManager.WATERMARK_POS_INF and
+ self._input_watermark == WatermarkManager.WATERMARK_POS_INF)
+ self._fired_timers = should_fire
+ return should_fire
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct_runner.py b/sdks/python/apache_beam/runners/direct_runner.py
deleted file mode 100644
index c4c52b3..0000000
--- a/sdks/python/apache_beam/runners/direct_runner.py
+++ /dev/null
@@ -1,308 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""DirectPipelineRunner, executing on the local machine.
-
-The DirectPipelineRunner class implements what is called in Dataflow
-parlance the "direct runner". Such a runner executes the entire graph
-of transformations belonging to a pipeline on the local machine.
-"""
-
-from __future__ import absolute_import
-
-import collections
-import itertools
-import logging
-
-from apache_beam import coders
-from apache_beam import error
-from apache_beam.runners.common import DoFnRunner
-from apache_beam.runners.common import DoFnState
-from apache_beam.runners.runner import PipelineResult
-from apache_beam.runners.runner import PipelineRunner
-from apache_beam.runners.runner import PipelineState
-from apache_beam.runners.runner import PValueCache
-from apache_beam.transforms import sideinputs
-from apache_beam.transforms.window import GlobalWindows
-from apache_beam.transforms.window import WindowedValue
-from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
-from apache_beam.typehints.typecheck import TypeCheckError
-from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn
-from apache_beam.utils import counters
-from apache_beam.utils.options import TypeOptions
-
-
-class DirectPipelineRunner(PipelineRunner):
- """A local pipeline runner.
-
- The runner computes everything locally and does not make any attempt to
- optimize for time or space.
- """
-
- def __init__(self, cache=None):
- # Cache of values computed while the runner executes a pipeline.
- self._cache = cache if cache is not None else PValueCache()
- self._counter_factory = counters.CounterFactory()
- # Element counts used for debugging footprint issues in the direct runner.
- # The values computed are used only for logging and do not take part in
- # any decision making logic. The key for the counter dictionary is either
- # the full label for the transform producing the elements or a tuple
- # (full label, output tag) for ParDo transforms since they can output values
- # on multiple outputs.
- self.debug_counters = {}
- self.debug_counters['element_counts'] = collections.Counter()
-
- @property
- def cache(self):
- return self._cache
-
- def get_pvalue(self, pvalue):
- """Gets the PValue's computed value from the runner's cache."""
- try:
- return self._cache.get_pvalue(pvalue)
- except KeyError:
- raise error.PValueError('PValue is not computed.')
-
- def clear_pvalue(self, pvalue):
- """Removes a PValue from the runner's cache."""
- self._cache.clear_pvalue(pvalue)
-
- def skip_if_cached(func): # pylint: disable=no-self-argument
- """Decorator to skip execution of a transform if value is cached."""
-
- def func_wrapper(self, pvalue, *args, **kwargs):
- logging.debug('Current: Debug counters: %s', self.debug_counters)
- if self._cache.is_cached(pvalue): # pylint: disable=protected-access
- return
- else:
- func(self, pvalue, *args, **kwargs)
- return func_wrapper
-
- def run(self, pipeline):
- super(DirectPipelineRunner, self).run(pipeline)
- logging.info('Final: Debug counters: %s', self.debug_counters)
- return DirectPipelineResult(state=PipelineState.DONE,
- counter_factory=self._counter_factory)
-
- @skip_if_cached
- def run_CreatePCollectionView(self, transform_node):
- transform = transform_node.transform
- view = transform.view
- values = self._cache.get_pvalue(transform_node.inputs[0])
- result = sideinputs.SideInputMap(type(view), view._view_options(), values)
- self._cache.cache_output(transform_node, result)
-
- @skip_if_cached
- def run_ParDo(self, transform_node):
- transform = transform_node.transform
-
- side_inputs = [self._cache.get_pvalue(view)
- for view in transform_node.side_inputs]
-
- # TODO(robertwb): Do this type checking inside DoFnRunner to get it on
- # remote workers as well?
- options = transform_node.inputs[0].pipeline.options
- if options is not None and options.view_as(TypeOptions).runtime_type_check:
- transform.dofn = TypeCheckWrapperDoFn(
- transform.dofn, transform.get_type_hints())
-
- # TODO(robertwb): Should this be conditionally done on the workers as well?
- transform.dofn = OutputCheckWrapperDoFn(
- transform.dofn, transform_node.full_label)
-
- class RecordingReceiverSet(object):
-
- def __init__(self, tag):
- self.tag = tag
-
- def output(self, element):
- results[self.tag].append(element)
-
- class TaggedReceivers(dict):
-
- def __missing__(self, key):
- return RecordingReceiverSet(key)
-
- results = collections.defaultdict(list)
- # Some tags may be empty.
- for tag in transform.side_output_tags:
- results[tag] = []
-
- runner = DoFnRunner(transform.dofn, transform.args, transform.kwargs,
- side_inputs, transform_node.inputs[0].windowing,
- tagged_receivers=TaggedReceivers(),
- step_name=transform_node.full_label,
- state=DoFnState(self._counter_factory))
- runner.start()
- for v in self._cache.get_pvalue(transform_node.inputs[0]):
- runner.process(v)
- runner.finish()
-
- self._cache.cache_output(transform_node, [])
- for tag, value in results.items():
- self.debug_counters['element_counts'][
- (transform_node.full_label, tag)] += len(value)
- self._cache.cache_output(transform_node, tag, value)
-
- @skip_if_cached
- def run_GroupByKeyOnly(self, transform_node):
- result_dict = collections.defaultdict(list)
- # The input type of a GroupByKey will be KV[Any, Any] or more specific.
- kv_type_hint = transform_node.transform.get_type_hints().input_types[0]
- key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0])
-
- for wv in self._cache.get_pvalue(transform_node.inputs[0]):
- if (isinstance(wv, WindowedValue) and
- isinstance(wv.value, collections.Iterable) and len(wv.value) == 2):
- k, v = wv.value
- # We use as key a string encoding of the key object to support keys
- # that are based on custom classes. This mimics also the remote
- # execution behavior where key objects are encoded before being written
- # to the shuffler system responsible for grouping.
- result_dict[key_coder.encode(k)].append(v)
- else:
- raise TypeCheckError('Input to GroupByKeyOnly must be a PCollection of '
- 'windowed key-value pairs. Instead received: %r.'
- % wv)
-
- gbk_result = map(
- GlobalWindows.windowed_value,
- ((key_coder.decode(k), v) for k, v in result_dict.iteritems()))
- self.debug_counters['element_counts'][
- transform_node.full_label] += len(gbk_result)
- self._cache.cache_output(transform_node, gbk_result)
-
- @skip_if_cached
- def run_Create(self, transform_node):
- transform = transform_node.transform
- create_result = [GlobalWindows.windowed_value(v) for v in transform.value]
- self.debug_counters['element_counts'][
- transform_node.full_label] += len(create_result)
- self._cache.cache_output(transform_node, create_result)
-
- @skip_if_cached
- def run_Flatten(self, transform_node):
- flatten_result = list(
- itertools.chain.from_iterable(
- self._cache.get_pvalue(pc) for pc in transform_node.inputs))
- self.debug_counters['element_counts'][
- transform_node.full_label] += len(flatten_result)
- self._cache.cache_output(transform_node, flatten_result)
-
- @skip_if_cached
- def run_Read(self, transform_node):
- # TODO(chamikara) Implement a more generic way for passing PipelineOptions
- # to sources and sinks when using DirectRunner.
- source = transform_node.transform.source
- source.pipeline_options = transform_node.inputs[0].pipeline.options
-
- def read_values(reader):
- read_result = [GlobalWindows.windowed_value(e) for e in reader]
- self.debug_counters['element_counts'][
- transform_node.full_label] += len(read_result)
- self._cache.cache_output(transform_node, read_result)
-
- # pylint: disable=wrong-import-position
- from apache_beam.io import iobase
-
- if isinstance(source, iobase.BoundedSource):
- # Getting a RangeTracker for the default range of the source and reading
- # the full source using that.
- range_tracker = source.get_range_tracker(None, None)
- reader = source.read(range_tracker)
- read_values(reader)
- else:
- with source.reader() as reader:
- read_values(reader)
-
- @skip_if_cached
- def run__NativeWrite(self, transform_node):
- sink = transform_node.transform.sink
-
- # pylint: disable=wrong-import-position
- from apache_beam.io import fileio
-
- if isinstance(sink, fileio.NativeTextFileSink):
- assert sink.num_shards in (0, 1)
- if sink.shard_name_template:
- sink.file_path += '-00000-of-00001'
- sink.file_path += sink.file_name_suffix
- sink.pipeline_options = transform_node.inputs[0].pipeline.options
- with sink.writer() as writer:
- for v in self._cache.get_pvalue(transform_node.inputs[0]):
- self.debug_counters['element_counts'][transform_node.full_label] += 1
- writer.Write(v.value)
-
-
-class DirectPipelineResult(PipelineResult):
- """A DirectPipelineResult provides access to info about a pipeline."""
-
- def __init__(self, state, counter_factory=None):
- super(DirectPipelineResult, self).__init__(state)
- self._counter_factory = counter_factory
-
- def aggregated_values(self, aggregator_or_name):
- return self._counter_factory.get_aggregator_values(aggregator_or_name)
-
-
-class EagerPipelineRunner(DirectPipelineRunner):
-
- is_eager = True
-
- def __init__(self):
- super(EagerPipelineRunner, self).__init__()
- self._seen_transforms = set()
-
- def run_transform(self, transform):
- if transform not in self._seen_transforms:
- self._seen_transforms.add(transform)
- super(EagerPipelineRunner, self).run_transform(transform)
-
-
-class DiskCachedPipelineRunner(DirectPipelineRunner):
- """A DirectPipelineRunner that uses a disk backed cache.
-
- DiskCachedPipelineRunner uses a temporary disk backed cache for running
- pipelines. This allows for running pipelines that will require more memory
- than it is available, however this comes with a performance cost due to disk
- IO.
-
- Memory requirement for DiskCachedPipelineRunner is approximately capped by the
- single transform in the pipeline that consumes and outputs largest total
- collection (i.e. inputs, side-inputs and outputs in aggregate). In the extreme
- case a where a transform will use all previous intermediate values as input,
- memory requirements for DiskCachedPipelineRunner will be the same as
- DirectPipelineRunner.
- """
-
- def __init__(self):
- self._null_cache = ()
- super(DiskCachedPipelineRunner, self).__init__(self._null_cache)
-
- def run(self, pipeline):
- try:
- self._cache = PValueCache(use_disk_backed_cache=True)
- return super(DirectPipelineRunner, self).run(pipeline)
- finally:
- del self._cache
- self._cache = self._null_cache
-
- @property
- def cache(self):
- raise NotImplementedError(
- 'DiskCachedPipelineRunner does not keep cache outside the scope of its '
- 'run method.')
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/__init__.py b/sdks/python/apache_beam/runners/inprocess/__init__.py
deleted file mode 100644
index 53e725a..0000000
--- a/sdks/python/apache_beam/runners/inprocess/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Inprocess runner executes pipelines locally in a single process."""
-from apache_beam.runners.inprocess.inprocess_runner import InProcessPipelineRunner
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/bundle_factory.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/bundle_factory.py b/sdks/python/apache_beam/runners/inprocess/bundle_factory.py
deleted file mode 100644
index d284449..0000000
--- a/sdks/python/apache_beam/runners/inprocess/bundle_factory.py
+++ /dev/null
@@ -1,102 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A factory that creates UncommittedBundles."""
-
-from __future__ import absolute_import
-
-from apache_beam import pvalue
-
-
-class BundleFactory(object):
- """BundleFactory creates output bundles to be used by transform evaluators."""
-
- def create_bundle(self, output_pcollection):
- return Bundle(output_pcollection)
-
- def create_empty_committed_bundle(self, output_pcollection):
- bundle = self.create_bundle(output_pcollection)
- bundle.commit(None)
- return bundle
-
-
-# a bundle represents a unit of work that will be processed by a transform.
-class Bundle(object):
- """Part of a PCollection with output elements.
-
- Part of a PCollection. Elements are output to a bundle, which will cause them
- to be executed by PTransform that consume the PCollection this bundle is a
- part of at a later point. It starts as an uncommitted bundle and can have
- elements added to it. It needs to be committed to make it immutable before
- passing it to a downstream ptransform.
- """
-
- def __init__(self, pcollection):
- assert (isinstance(pcollection, pvalue.PCollection)
- or isinstance(pcollection, pvalue.PCollectionView))
- self._pcollection = pcollection
- self._elements = []
- self._committed = False
- self._tag = None # optional tag information for this bundle
-
- @property
- def elements(self):
- """Returns iterable elements. If not committed will return a copy."""
- if self._committed:
- return self._elements
- else:
- return list(self._elements)
-
- @property
- def tag(self):
- return self._tag
-
- @tag.setter
- def tag(self, value):
- assert not self._tag
- self._tag = value
-
- @property
- def pcollection(self):
- """PCollection that the elements of this UncommittedBundle belong to."""
- return self._pcollection
-
- def add(self, element):
- """Outputs an element to this bundle.
-
- Args:
- element: WindowedValue
- """
- assert not self._committed
- self._elements.append(element)
-
- def output(self, element):
- self.add(element)
-
- def commit(self, synchronized_processing_time):
- """Commits this bundle.
-
- Uncommitted bundle will become committed (immutable) after this call.
-
- Args:
- synchronized_processing_time: the synchronized processing time at which
- this bundle was committed
- """
- assert not self._committed
- self._committed = True
- self._elements = tuple(self._elements)
- self._synchronized_processing_time = synchronized_processing_time
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/clock.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/clock.py b/sdks/python/apache_beam/runners/inprocess/clock.py
deleted file mode 100644
index 11e49cd..0000000
--- a/sdks/python/apache_beam/runners/inprocess/clock.py
+++ /dev/null
@@ -1,50 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Clock implementations for real time processing and testing."""
-
-from __future__ import absolute_import
-
-import time
-
-
-class Clock(object):
-
- @property
- def now(self):
- """Returns the number of milliseconds since epoch."""
- return int(time.time() * 1000)
-
-
-class MockClock(Clock):
- """Mock clock implementation for testing."""
-
- def __init__(self, now_in_ms):
- self._now_in_ms = now_in_ms
-
- @property
- def now(self):
- return self._now_in_ms
-
- @now.setter
- def now(self, value_in_ms):
- assert value_in_ms >= self._now_in_ms
- self._now_in_ms = value_in_ms
-
- def advance(self, duration_in_ms):
- assert duration_in_ms >= 0
- self._now_in_ms += duration_in_ms
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor.py b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor.py
deleted file mode 100644
index 6f1757a..0000000
--- a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor.py
+++ /dev/null
@@ -1,59 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""ConsumerTrackingPipelineVisitor, a PipelineVisitor object."""
-
-from __future__ import absolute_import
-
-from apache_beam import pvalue
-from apache_beam.pipeline import PipelineVisitor
-
-
-class ConsumerTrackingPipelineVisitor(PipelineVisitor):
- """Visitor for extracting value-consumer relations from the graph.
-
- Tracks the AppliedPTransforms that consume each PValue in the Pipeline. This
- is used to schedule consuming PTransforms to consume input after the upstream
- transform has produced and committed output.
- """
-
- def __init__(self):
- self.value_to_consumers = {} # Map from PValue to [AppliedPTransform].
- self.root_transforms = set() # set of (root) AppliedPTransforms.
- self.views = [] # list of PCollectionViews.
- self.step_names = {} # Map from AppliedPTransform to String.
-
- self._num_transforms = 0
-
- def visit_value(self, value, producer_node):
- if value:
- if isinstance(value, pvalue.PCollectionView):
- self.views.append(value)
-
- def visit_transform(self, applied_ptransform):
- inputs = applied_ptransform.inputs
- if inputs:
- for input_value in inputs:
- if isinstance(input_value, pvalue.PBegin):
- self.root_transforms.add(applied_ptransform)
- if input_value not in self.value_to_consumers:
- self.value_to_consumers[input_value] = []
- self.value_to_consumers[input_value].append(applied_ptransform)
- else:
- self.root_transforms.add(applied_ptransform)
- self.step_names[applied_ptransform] = 's%d' % (self._num_transforms)
- self._num_transforms += 1
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
deleted file mode 100644
index 3cd8d73..0000000
--- a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
+++ /dev/null
@@ -1,122 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Tests for consumer_tracking_pipeline_visitor."""
-
-import logging
-import unittest
-
-from apache_beam import pvalue
-from apache_beam.io import Read
-from apache_beam.io import TextFileSource
-from apache_beam.pipeline import Pipeline
-from apache_beam.pvalue import AsList
-from apache_beam.runners.inprocess import InProcessPipelineRunner
-from apache_beam.runners.inprocess.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
-from apache_beam.transforms import CoGroupByKey
-from apache_beam.transforms import Create
-from apache_beam.transforms import DoFn
-from apache_beam.transforms import FlatMap
-from apache_beam.transforms import Flatten
-from apache_beam.transforms import ParDo
-
-# Disable frequent lint warning due to pipe operator for chaining transforms.
-# pylint: disable=expression-not-assigned
-# pylint: disable=pointless-statement
-
-
-class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
-
- def setUp(self):
- self.pipeline = Pipeline(InProcessPipelineRunner())
- self.visitor = ConsumerTrackingPipelineVisitor()
-
- def test_root_transforms(self):
- root_create = Create('create', [[1, 2, 3]])
- root_read = Read('read', TextFileSource('/tmp/somefile'))
- root_flatten = Flatten('flatten', pipeline=self.pipeline)
-
- pbegin = pvalue.PBegin(self.pipeline)
- pcoll_create = pbegin | root_create
- pbegin | root_read
- pcoll_create | FlatMap(lambda x: x)
- [] | root_flatten
-
- self.pipeline.visit(self.visitor)
-
- root_transforms = sorted(
- [t.transform for t in self.visitor.root_transforms])
- self.assertEqual(root_transforms, sorted(
- [root_read, root_create, root_flatten]))
-
- pbegin_consumers = sorted(
- [c.transform for c in self.visitor.value_to_consumers[pbegin]])
- self.assertEqual(pbegin_consumers, sorted([root_read, root_create]))
- self.assertEqual(len(self.visitor.step_names), 4)
-
- def test_side_inputs(self):
-
- class SplitNumbersFn(DoFn):
-
- def process(self, context):
- if context.element < 0:
- yield pvalue.SideOutputValue('tag_negative', context.element)
- else:
- yield context.element
-
- class ProcessNumbersFn(DoFn):
-
- def process(self, context, negatives):
- yield context.element
-
- root_create = Create('create', [[-1, 2, 3]])
-
- result = (self.pipeline
- | root_create
- | ParDo(SplitNumbersFn()).with_outputs('tag_negative',
- main='positive'))
- positive, negative = result
- positive | ParDo(ProcessNumbersFn(), AsList(negative))
-
- self.pipeline.visit(self.visitor)
-
- root_transforms = sorted(
- [t.transform for t in self.visitor.root_transforms])
- self.assertEqual(root_transforms, sorted([root_create]))
- self.assertEqual(len(self.visitor.step_names), 4)
- self.assertEqual(len(self.visitor.views), 1)
- self.assertTrue(isinstance(self.visitor.views[0],
- pvalue.ListPCollectionView))
-
- def test_co_group_by_key(self):
- emails = self.pipeline | 'email' >> Create([('joe', 'joe@example.com')])
- phones = self.pipeline | 'phone' >> Create([('mary', '111-222-3333')])
- {'emails': emails, 'phones': phones} | CoGroupByKey()
-
- self.pipeline.visit(self.visitor)
-
- root_transforms = sorted(
- [t.transform for t in self.visitor.root_transforms])
- self.assertEqual(len(root_transforms), 2)
- self.assertGreater(
- len(self.visitor.step_names), 3) # 2 creates + expanded CoGBK
- self.assertEqual(len(self.visitor.views), 0)
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.DEBUG)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
deleted file mode 100644
index 7af1608..0000000
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
+++ /dev/null
@@ -1,272 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""InProcessEvaluationContext tracks global state, triggers and watermarks."""
-
-from __future__ import absolute_import
-
-import collections
-import threading
-
-from apache_beam.transforms import sideinputs
-from apache_beam.runners.inprocess.clock import Clock
-from apache_beam.runners.inprocess.inprocess_watermark_manager import InProcessWatermarkManager
-from apache_beam.runners.inprocess.inprocess_executor import TransformExecutor
-from apache_beam.utils import counters
-
-
-class _InProcessExecutionContext(object):
-
- def __init__(self, watermarks, existing_state):
- self._watermarks = watermarks
- self._existing_state = existing_state
-
- @property
- def watermarks(self):
- return self._watermarks
-
- @property
- def existing_state(self):
- return self._existing_state
-
-
-class _InProcessSideInputView(object):
-
- def __init__(self, view):
- self._view = view
- self.callable_queue = collections.deque()
- self.value = None
- self.has_result = False
-
-
-class _InProcessSideInputsContainer(object):
- """An in-process container for PCollectionViews.
-
- It provides methods for blocking until a side-input is available and writing
- to a side input.
- """
-
- def __init__(self, views):
- self._lock = threading.Lock()
- self._views = {}
- for view in views:
- self._views[view] = _InProcessSideInputView(view)
-
- def get_value_or_schedule_after_output(self, pcollection_view, task):
- with self._lock:
- view = self._views[pcollection_view]
- if not view.has_result:
- view.callable_queue.append(task)
- task.blocked = True
- return (view.has_result, view.value)
-
- def set_value_and_get_callables(self, pcollection_view, values):
- with self._lock:
- view = self._views[pcollection_view]
- assert not view.has_result
- assert view.value is None
- assert view.callable_queue is not None
- view.value = self._pvalue_to_value(pcollection_view, values)
- result = tuple(view.callable_queue)
- for task in result:
- task.blocked = False
- view.callable_queue = None
- view.has_result = True
- return result
-
- def _pvalue_to_value(self, view, values):
- """Given a PCollectionView, returns the associated value in requested form.
-
- Args:
- view: PCollectionView for the requested side input.
- values: Iterable values associated with the side input.
-
- Returns:
- The side input in its requested form.
-
- Raises:
- ValueError: If values cannot be converted into the requested form.
- """
- return sideinputs.SideInputMap(type(view), view._view_options(), values)
-
-
-class InProcessEvaluationContext(object):
- """Evaluation context with the global state information of the pipeline.
-
- The evaluation context for a specific pipeline being executed by the
- InProcessPipelineRunner. Contains state shared within the execution across all
- transforms.
-
- InProcessEvaluationContext contains shared state for an execution of the
- InProcessPipelineRunner that can be used while evaluating a PTransform. This
- consists of views into underlying state and watermark implementations, access
- to read and write PCollectionViews, and constructing counter sets and
- execution contexts. This includes executing callbacks asynchronously when
- state changes to the appropriate point (e.g. when a PCollectionView is
- requested and known to be empty).
-
- InProcessEvaluationContext also handles results by committing finalizing
- bundles based on the current global state and updating the global state
- appropriately. This includes updating the per-(step,key) state, updating
- global watermarks, and executing any callbacks that can be executed.
- """
-
- def __init__(self, pipeline_options, bundle_factory, root_transforms,
- value_to_consumers, step_names, views):
- self.pipeline_options = pipeline_options
- self._bundle_factory = bundle_factory
- self._root_transforms = root_transforms
- self._value_to_consumers = value_to_consumers
- self._step_names = step_names
- self.views = views
-
- # AppliedPTransform -> Evaluator specific state objects
- self._application_state_interals = {}
- self._watermark_manager = InProcessWatermarkManager(
- Clock(), root_transforms, value_to_consumers)
- self._side_inputs_container = _InProcessSideInputsContainer(views)
- self._pending_unblocked_tasks = []
- self._counter_factory = counters.CounterFactory()
- self._cache = None
-
- self._lock = threading.Lock()
-
- def use_pvalue_cache(self, cache):
- assert not self._cache
- self._cache = cache
-
- @property
- def has_cache(self):
- return self._cache is not None
-
- def append_to_cache(self, applied_ptransform, tag, elements):
- with self._lock:
- assert self._cache
- self._cache.append(applied_ptransform, tag, elements)
-
- def is_root_transform(self, applied_ptransform):
- return applied_ptransform in self._root_transforms
-
- def handle_result(
- self, completed_bundle, completed_timers, result):
- """Handle the provided result produced after evaluating the input bundle.
-
- Handle the provided InProcessTransformResult, produced after evaluating
- the provided committed bundle (potentially None, if the result of a root
- PTransform).
-
- The result is the output of running the transform contained in the
- InProcessTransformResult on the contents of the provided bundle.
-
- Args:
- completed_bundle: the bundle that was processed to produce the result.
- completed_timers: the timers that were delivered to produce the
- completed_bundle.
- result: the InProcessTransformResult of evaluating the input bundle
-
- Returns:
- the committed bundles contained within the handled result.
- """
- with self._lock:
- committed_bundles = self._commit_bundles(result.output_bundles)
- self._watermark_manager.update_watermarks(
- completed_bundle, result.transform, completed_timers,
- committed_bundles, result.watermark_hold)
-
- # If the result is for a view, update side inputs container.
- if (result.output_bundles
- and result.output_bundles[0].pcollection in self.views):
- if committed_bundles:
- assert len(committed_bundles) == 1
- side_input_result = committed_bundles[0].elements
- else:
- side_input_result = []
- tasks = self._side_inputs_container.set_value_and_get_callables(
- result.output_bundles[0].pcollection, side_input_result)
- self._pending_unblocked_tasks.extend(tasks)
-
- if result.counters:
- for counter in result.counters:
- merged_counter = self._counter_factory.get_counter(
- counter.name, counter.combine_fn)
- merged_counter.accumulator.merge([counter.accumulator])
-
- self._application_state_interals[result.transform] = result.state
- return committed_bundles
-
- def get_aggregator_values(self, aggregator_or_name):
- return self._counter_factory.get_aggregator_values(aggregator_or_name)
-
- def schedule_pending_unblocked_tasks(self, executor_service):
- if self._pending_unblocked_tasks:
- with self._lock:
- for task in self._pending_unblocked_tasks:
- executor_service.submit(task)
- self._pending_unblocked_tasks = []
-
- def _commit_bundles(self, uncommitted_bundles):
- """Commits bundles and returns a immutable set of committed bundles."""
- for in_progress_bundle in uncommitted_bundles:
- producing_applied_ptransform = in_progress_bundle.pcollection.producer
- watermarks = self._watermark_manager.get_watermarks(
- producing_applied_ptransform)
- in_progress_bundle.commit(watermarks.synchronized_processing_output_time)
- return tuple(uncommitted_bundles)
-
- def get_execution_context(self, applied_ptransform):
- return _InProcessExecutionContext(
- self._watermark_manager.get_watermarks(applied_ptransform),
- self._application_state_interals.get(applied_ptransform))
-
- def create_bundle(self, output_pcollection):
- """Create an uncommitted bundle for the specified PCollection."""
- return self._bundle_factory.create_bundle(output_pcollection)
-
- def create_empty_committed_bundle(self, output_pcollection):
- """Create empty bundle useful for triggering evaluation."""
- return self._bundle_factory.create_empty_committed_bundle(
- output_pcollection)
-
- def extract_fired_timers(self):
- return self._watermark_manager.extract_fired_timers()
-
- def is_done(self, transform=None):
- """Checks completion of a step or the pipeline.
-
- Args:
- transform: AppliedPTransform to check for completion.
-
- Returns:
- True if the step will not produce additional output. If transform is None
- returns true if all steps are done.
- """
- if transform:
- return self._is_transform_done(transform)
- else:
- for applied_ptransform in self._step_names:
- if not self._is_transform_done(applied_ptransform):
- return False
- return True
-
- def _is_transform_done(self, transform):
- tw = self._watermark_manager.get_watermarks(transform)
- return tw.output_watermark == InProcessWatermarkManager.WATERMARK_POS_INF
-
- def get_value_or_schedule_after_output(self, pcollection_view, task):
- assert isinstance(task, TransformExecutor)
- return self._side_inputs_container.get_value_or_schedule_after_output(
- pcollection_view, task)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/inprocess_executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_executor.py b/sdks/python/apache_beam/runners/inprocess/inprocess_executor.py
deleted file mode 100644
index 2136855..0000000
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_executor.py
+++ /dev/null
@@ -1,550 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""An executor that schedules and executes applied ptransforms."""
-
-from __future__ import absolute_import
-
-import collections
-import logging
-import Queue
-import threading
-import traceback
-from weakref import WeakValueDictionary
-
-
-class ExecutorService(object):
- """Thread pool for executing tasks in parallel."""
-
- class CallableTask(object):
-
- def __call__(self):
- pass
-
- @property
- def name(self):
- return None
-
- class ExecutorServiceWorker(threading.Thread):
- """Worker thread for executing a single task at a time."""
-
- # Amount to block waiting for getting an item from the queue in seconds.
- TIMEOUT = 5
-
- def __init__(self, queue, index):
- super(ExecutorService.ExecutorServiceWorker, self).__init__()
- self.queue = queue
- self._index = index
- self._default_name = 'ExecutorServiceWorker-' + str(index)
- self._update_name()
- self.shutdown_requested = False
- self.start()
-
- def _update_name(self, task=None):
- if task and task.name:
- name = task.name
- else:
- name = self._default_name
- self.name = 'Thread: %d, %s (%s)' % (
- self._index, name, 'executing' if task else 'idle')
-
- def _get_task_or_none(self):
- try:
- # Do not block indefinitely, otherwise we may not act for a requested
- # shutdown.
- return self.queue.get(
- timeout=ExecutorService.ExecutorServiceWorker.TIMEOUT)
- except Queue.Empty:
- return None
-
- def run(self):
- while not self.shutdown_requested:
- task = self._get_task_or_none()
- if task:
- try:
- if not self.shutdown_requested:
- self._update_name(task)
- task()
- self._update_name()
- finally:
- self.queue.task_done()
-
- def shutdown(self):
- self.shutdown_requested = True
-
- def __init__(self, num_workers):
- self.queue = Queue.Queue()
- self.workers = [ExecutorService.ExecutorServiceWorker(
- self.queue, i) for i in range(num_workers)]
- self.shutdown_requested = False
-
- def submit(self, task):
- assert isinstance(task, ExecutorService.CallableTask)
- if not self.shutdown_requested:
- self.queue.put(task)
-
- def await_completion(self):
- for worker in self.workers:
- worker.join()
-
- def shutdown(self):
- self.shutdown_requested = True
-
- for worker in self.workers:
- worker.shutdown()
-
- # Consume all the remaining items in the queue
- while not self.queue.empty():
- try:
- self.queue.get_nowait()
- self.queue.task_done()
- except Queue.Empty:
- continue
- # All existing threads will eventually terminate (after they complete their
- # last task).
-
-
-class TransformEvaluationState(object):
-
- def __init__(self, executor_service, scheduled):
- self.executor_service = executor_service
- self.scheduled = scheduled
-
- def schedule(self, work):
- self.scheduled.add(work)
- self.executor_service.submit(work)
-
- def complete(self, completed_work):
- self.scheduled.remove(completed_work)
-
-
-class ParallelEvaluationState(TransformEvaluationState):
- """A TransformEvaluationState with unlimited parallelism.
-
- Any TransformExecutor scheduled will be immediately submitted to the
- ExecutorService.
-
- A principal use of this is for evaluators that can generate output bundles
- only using the input bundle (e.g. ParDo).
- """
- pass
-
-
-class SerialEvaluationState(TransformEvaluationState):
- """A TransformEvaluationState with a single work queue.
-
- Any TransformExecutor scheduled will be placed on the work queue. Only one
- item of work will be submitted to the ExecutorService at any time.
-
- A principal use of this is for evaluators that keeps a global state such as
- GroupByKeyOnly.
- """
-
- def __init__(self, executor_service, scheduled):
- super(SerialEvaluationState, self).__init__(executor_service, scheduled)
- self.serial_queue = collections.deque()
- self.currently_evaluating = None
- self._lock = threading.Lock()
-
- def complete(self, completed_work):
- self._update_currently_evaluating(None, completed_work)
- super(SerialEvaluationState, self).complete(completed_work)
-
- def schedule(self, new_work):
- self._update_currently_evaluating(new_work, None)
-
- def _update_currently_evaluating(self, new_work, completed_work):
- with self._lock:
- if new_work:
- self.serial_queue.append(new_work)
- if completed_work:
- assert self.currently_evaluating == completed_work
- self.currently_evaluating = None
- if self.serial_queue and not self.currently_evaluating:
- next_work = self.serial_queue.pop()
- self.currently_evaluating = next_work
- super(SerialEvaluationState, self).schedule(next_work)
-
-
-class TransformExecutorServices(object):
- """Schedules and completes TransformExecutors.
-
- Controls the concurrency as appropriate for the applied transform the executor
- exists for.
- """
-
- def __init__(self, executor_service):
- self._executor_service = executor_service
- self._scheduled = set()
- self._parallel = ParallelEvaluationState(
- self._executor_service, self._scheduled)
- self._serial_cache = WeakValueDictionary()
-
- def parallel(self):
- return self._parallel
-
- def serial(self, step):
- cached = self._serial_cache.get(step)
- if not cached:
- cached = SerialEvaluationState(self._executor_service, self._scheduled)
- self._serial_cache[step] = cached
- return cached
-
- @property
- def executors(self):
- return frozenset(self._scheduled)
-
-
-class _CompletionCallback(object):
- """The default completion callback.
-
- The default completion callback is used to complete transform evaluations
- that are triggered due to the arrival of elements from an upstream transform,
- or for a source transform.
- """
-
- def __init__(self, evaluation_context, all_updates, timers=None):
- self._evaluation_context = evaluation_context
- self._all_updates = all_updates
- self._timers = timers
-
- def handle_result(self, input_committed_bundle, transform_result):
- output_committed_bundles = self._evaluation_context.handle_result(
- input_committed_bundle, self._timers, transform_result)
- for output_committed_bundle in output_committed_bundles:
- self._all_updates.offer(_ExecutorServiceParallelExecutor.ExecutorUpdate(
- output_committed_bundle, None))
- return output_committed_bundles
-
- def handle_exception(self, exception):
- self._all_updates.offer(
- _ExecutorServiceParallelExecutor.ExecutorUpdate(None, exception))
-
-
-class _TimerCompletionCallback(_CompletionCallback):
-
- def __init__(self, evaluation_context, all_updates, timers):
- super(_TimerCompletionCallback, self).__init__(
- evaluation_context, all_updates, timers)
-
-
-class TransformExecutor(ExecutorService.CallableTask):
- """TransformExecutor will evaluate a bundle using an applied ptransform.
-
- A CallableTask responsible for constructing a TransformEvaluator andevaluating
- it on some bundle of input, and registering the result using the completion
- callback.
- """
-
- def __init__(self, transform_evaluator_registry, evaluation_context,
- input_bundle, applied_transform, completion_callback,
- transform_evaluation_state):
- self._transform_evaluator_registry = transform_evaluator_registry
- self._evaluation_context = evaluation_context
- self._input_bundle = input_bundle
- self._applied_transform = applied_transform
- self._completion_callback = completion_callback
- self._transform_evaluation_state = transform_evaluation_state
- self._side_input_values = {}
- self.blocked = False
- self._call_count = 0
-
- def __call__(self):
- self._call_count += 1
- assert self._call_count <= (1 + len(self._applied_transform.side_inputs))
-
- for side_input in self._applied_transform.side_inputs:
- if side_input not in self._side_input_values:
- has_result, value = (
- self._evaluation_context.get_value_or_schedule_after_output(
- side_input, self))
- if not has_result:
- # Monitor task will reschedule this executor once the side input is
- # available.
- return
- self._side_input_values[side_input] = value
-
- side_input_values = [self._side_input_values[side_input]
- for side_input in self._applied_transform.side_inputs]
-
- try:
- evaluator = self._transform_evaluator_registry.for_application(
- self._applied_transform, self._input_bundle, side_input_values)
-
- if self._input_bundle:
- for value in self._input_bundle.elements:
- evaluator.process_element(value)
-
- result = evaluator.finish_bundle()
-
- if self._evaluation_context.has_cache:
- for uncommitted_bundle in result.output_bundles:
- self._evaluation_context.append_to_cache(
- self._applied_transform, uncommitted_bundle.tag,
- uncommitted_bundle.elements)
- undeclared_tag_values = result.undeclared_tag_values
- if undeclared_tag_values:
- for tag, value in undeclared_tag_values.iteritems():
- self._evaluation_context.append_to_cache(
- self._applied_transform, tag, value)
-
- self._completion_callback.handle_result(self._input_bundle, result)
- return result
- except Exception as e: # pylint: disable=broad-except
- logging.warning('Task failed: %s', traceback.format_exc(), exc_info=True)
- self._completion_callback.handle_exception(e)
- finally:
- self._transform_evaluation_state.complete(self)
-
-
-class InProcessExecutor(object):
-
- def __init__(self, *args, **kwargs):
- self._executor = _ExecutorServiceParallelExecutor(*args, **kwargs)
-
- def start(self, roots):
- self._executor.start(roots)
-
- def await_completion(self):
- self._executor.await_completion()
-
-
-class _ExecutorServiceParallelExecutor(object):
- """An internal implementation for InProcessExecutor."""
-
- NUM_WORKERS = 1
-
- def __init__(self, value_to_consumers, transform_evaluator_registry,
- evaluation_context):
- self.executor_service = ExecutorService(
- _ExecutorServiceParallelExecutor.NUM_WORKERS)
- self.transform_executor_services = TransformExecutorServices(
- self.executor_service)
- self.value_to_consumers = value_to_consumers
- self.transform_evaluator_registry = transform_evaluator_registry
- self.evaluation_context = evaluation_context
- self.all_updates = _ExecutorServiceParallelExecutor._TypedUpdateQueue(
- _ExecutorServiceParallelExecutor.ExecutorUpdate)
- self.visible_updates = _ExecutorServiceParallelExecutor._TypedUpdateQueue(
- _ExecutorServiceParallelExecutor.VisibleExecutorUpdate)
- self.default_completion_callback = _CompletionCallback(
- evaluation_context, self.all_updates)
-
- def start(self, roots):
- self.root_nodes = frozenset(roots)
- self.executor_service.submit(
- _ExecutorServiceParallelExecutor._MonitorTask(self))
-
- def await_completion(self):
- update = self.visible_updates.take()
- try:
- if update.exception:
- raise update.exception
- finally:
- self.executor_service.shutdown()
-
- def schedule_consumers(self, committed_bundle):
- if committed_bundle.pcollection in self.value_to_consumers:
- consumers = self.value_to_consumers[committed_bundle.pcollection]
- for applied_ptransform in consumers:
- self.schedule_consumption(applied_ptransform, committed_bundle,
- self.default_completion_callback)
-
- def schedule_consumption(self, consumer_applied_transform, committed_bundle,
- on_complete):
- """Schedules evaluation of the given bundle with the transform."""
- assert all([consumer_applied_transform, on_complete])
- assert committed_bundle or consumer_applied_transform in self.root_nodes
- if (committed_bundle
- and self.transform_evaluator_registry.should_execute_serially(
- consumer_applied_transform)):
- transform_executor_service = self.transform_executor_services.serial(
- consumer_applied_transform)
- else:
- transform_executor_service = self.transform_executor_services.parallel()
-
- transform_executor = TransformExecutor(
- self.transform_evaluator_registry, self.evaluation_context,
- committed_bundle, consumer_applied_transform, on_complete,
- transform_executor_service)
- transform_executor_service.schedule(transform_executor)
-
- class _TypedUpdateQueue(object):
- """Type checking update queue with blocking and non-blocking operations."""
-
- def __init__(self, item_type):
- self._item_type = item_type
- self._queue = Queue.Queue()
-
- def poll(self):
- try:
- item = self._queue.get_nowait()
- self._queue.task_done()
- return item
- except Queue.Empty:
- return None
-
- def take(self):
- item = self._queue.get()
- self._queue.task_done()
- return item
-
- def offer(self, item):
- assert isinstance(item, self._item_type)
- self._queue.put_nowait(item)
-
- class ExecutorUpdate(object):
- """An internal status update on the state of the executor."""
-
- def __init__(self, produced_bundle=None, exception=None):
- # Exactly one of them should be not-None
- assert bool(produced_bundle) != bool(exception)
- self.committed_bundle = produced_bundle
- self.exception = exception
-
- class VisibleExecutorUpdate(object):
- """An update of interest to the user.
-
- Used for awaiting the completion to decide whether to return normally or
- raise an exception.
- """
-
- def __init__(self, exception=None):
- self.finished = exception is not None
- self.exception = exception
-
- class _MonitorTask(ExecutorService.CallableTask):
- """MonitorTask continuously runs to ensure that pipeline makes progress."""
-
- def __init__(self, executor):
- self._executor = executor
-
- @property
- def name(self):
- return 'monitor'
-
- def __call__(self):
- try:
- update = self._executor.all_updates.poll()
- while update:
- if update.committed_bundle:
- self._executor.schedule_consumers(update.committed_bundle)
- else:
- assert update.exception
- logging.warning('A task failed with exception.\n %s',
- update.exception)
- self._executor.visible_updates.offer(
- _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
- update.exception))
- update = self._executor.all_updates.poll()
- self._executor.evaluation_context.schedule_pending_unblocked_tasks(
- self._executor.executor_service)
- self._add_work_if_necessary(self._fire_timers())
- except Exception as e: # pylint: disable=broad-except
- logging.error('Monitor task died due to exception.\n %s', e)
- self._executor.visible_updates.offer(
- _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(e))
- finally:
- if not self._should_shutdown():
- self._executor.executor_service.submit(self)
-
- def _should_shutdown(self):
- """_should_shutdown checks whether pipeline is completed or not.
-
- It will check for successful completion by checking the watermarks of all
- transforms. If they all reached the maximum watermark it means that
- pipeline successfully reached to completion.
-
- If the above is not true, it will check that at least one executor is
- making progress. Otherwise pipeline will be declared stalled.
-
- If the pipeline reached to a terminal state as explained above
- _should_shutdown will request executor to gracefully shutdown.
-
- Returns:
- True if pipeline reached a terminal state and monitor task could finish.
- Otherwise monitor task should schedule itself again for future
- execution.
- """
- if self._executor.evaluation_context.is_done():
- self._executor.visible_updates.offer(
- _ExecutorServiceParallelExecutor.VisibleExecutorUpdate())
- self._executor.executor_service.shutdown()
- return True
- elif not self._is_executing:
- self._executor.visible_updates.offer(
- _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
- Exception('Monitor task detected a pipeline stall.')))
- self._executor.executor_service.shutdown()
- return True
- return False
-
- def _fire_timers(self):
- """Schedules triggered consumers if any timers fired.
-
- Returns:
- True if timers fired.
- """
- fired_timers = self._executor.evaluation_context.extract_fired_timers()
- for applied_ptransform in fired_timers:
- # Use an empty committed bundle. just to trigger.
- empty_bundle = (
- self._executor.evaluation_context.create_empty_committed_bundle(
- applied_ptransform.inputs[0]))
- timer_completion_callback = _TimerCompletionCallback(
- self._executor.evaluation_context, self._executor.all_updates,
- applied_ptransform)
-
- self._executor.schedule_consumption(
- applied_ptransform, empty_bundle, timer_completion_callback)
- return bool(fired_timers)
-
- def _is_executing(self):
- """Returns True if there is at least one non-blocked TransformExecutor."""
- for transform_executor in (
- self._executor.transform_executor_services.executors):
- if not transform_executor.blocked:
- return True
- return False
-
- def _add_work_if_necessary(self, timers_fired):
- """Adds more work from the roots if pipeline requires more input.
-
- If all active TransformExecutors are in a blocked state, add more work
- from root nodes that may have additional work. This ensures that if a
- pipeline has elements available from the root nodes it will add those
- elements when necessary.
-
- Args:
- timers_fired: True if any timers fired prior to this call.
- """
- # If any timers have fired, they will add more work; No need to add more.
- if timers_fired:
- return
-
- if self._is_executing():
- # We have at least one executor that can proceed without adding
- # additional work.
- return
-
- # All current TransformExecutors are blocked; add more work from the
- # roots.
- for applied_transform in self._executor.root_nodes:
- if not self._executor.evaluation_context.is_done(applied_transform):
- self._executor.schedule_consumption(
- applied_transform, None,
- self._executor.default_completion_callback)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/inprocess_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_runner.py b/sdks/python/apache_beam/runners/inprocess/inprocess_runner.py
deleted file mode 100644
index 287c170..0000000
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_runner.py
+++ /dev/null
@@ -1,142 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""InProcessPipelineRunner, executing on the local machine."""
-
-from __future__ import absolute_import
-
-import collections
-import logging
-
-from apache_beam.runners.inprocess.bundle_factory import BundleFactory
-from apache_beam.runners.inprocess.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
-from apache_beam.runners.inprocess.inprocess_evaluation_context import InProcessEvaluationContext
-from apache_beam.runners.inprocess.inprocess_executor import InProcessExecutor
-from apache_beam.runners.inprocess.transform_evaluator import TransformEvaluatorRegistry
-from apache_beam.runners.runner import PipelineResult
-from apache_beam.runners.runner import PipelineRunner
-from apache_beam.runners.runner import PipelineState
-from apache_beam.runners.runner import PValueCache
-
-
-class InProcessPipelineRunner(PipelineRunner):
- """Executes a single pipeline on the local machine."""
-
- def __init__(self):
- self._cache = None
-
- def run(self, pipeline):
- """Execute the entire pipeline and returns an InProcessPipelineResult."""
- logging.info('Running pipeline with InProcessPipelineRunner.')
- self.visitor = ConsumerTrackingPipelineVisitor()
- pipeline.visit(self.visitor)
-
- evaluation_context = InProcessEvaluationContext(
- pipeline.options,
- BundleFactory(),
- self.visitor.root_transforms,
- self.visitor.value_to_consumers,
- self.visitor.step_names,
- self.visitor.views)
-
- evaluation_context.use_pvalue_cache(self._cache)
-
- executor = InProcessExecutor(self.visitor.value_to_consumers,
- TransformEvaluatorRegistry(evaluation_context),
- evaluation_context)
- # Start the executor. This is a non-blocking call, it will start the
- # execution in background threads and return.
- executor.start(self.visitor.root_transforms)
- result = InProcessPipelineResult(executor, evaluation_context)
-
- # TODO(altay): If blocking:
- # Block until the pipeline completes. This call will return after the
- # pipeline was fully terminated (successfully or with a failure).
- result.await_completion()
-
- if self._cache:
- self._cache.finalize()
-
- return result
-
- @property
- def cache(self):
- if not self._cache:
- self._cache = InProcessBufferingInMemoryCache()
- return self._cache.pvalue_cache
-
- def apply(self, transform, input): # pylint: disable=redefined-builtin
- """Runner callback for a pipeline.apply call."""
- return transform.apply(input)
-
-
-class InProcessBufferingInMemoryCache(object):
- """PValueCache wrapper for buffering bundles until a PValue is fully computed.
-
- InProcessBufferingInMemoryCache keeps an in memory cache of
- (applied_ptransform, tag) tuples. It accepts appending to existing cache
- entries until it is finalized. finalize() will make all the existing cached
- entries visible to the underyling PValueCache in their entirety, clean the in
- memory cache and stop accepting new cache entries.
- """
-
- def __init__(self):
- self._cache = collections.defaultdict(list)
- self._pvalue_cache = PValueCache()
- self._finalized = False
-
- @property
- def pvalue_cache(self):
- return self._pvalue_cache
-
- def append(self, applied_ptransform, tag, elements):
- assert not self._finalized
- assert elements is not None
- self._cache[(applied_ptransform, tag)].extend(elements)
-
- def finalize(self):
- """Make buffered cache elements visible to the underlying PValueCache."""
- assert not self._finalized
- for key, value in self._cache.iteritems():
- applied_ptransform, tag = key
- self._pvalue_cache.cache_output(applied_ptransform, tag, value)
- self._cache = None
-
-
-class InProcessPipelineResult(PipelineResult):
- """A InProcessPipelineResult provides access to info about a pipeline."""
-
- def __init__(self, executor, evaluation_context):
- super(InProcessPipelineResult, self).__init__(PipelineState.RUNNING)
- self._executor = executor
- self._evaluation_context = evaluation_context
-
- def _is_in_terminal_state(self):
- return self._state is not PipelineState.RUNNING
-
- def await_completion(self):
- if not self._is_in_terminal_state():
- try:
- self._executor.await_completion()
- self._state = PipelineState.DONE
- except: # pylint: disable=broad-except
- self._state = PipelineState.FAILED
- raise
- return self._state
-
- def aggregated_values(self, aggregator_or_name):
- return self._evaluation_context.get_aggregator_values(aggregator_or_name)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py b/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
deleted file mode 100644
index aa9db24..0000000
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
+++ /dev/null
@@ -1,121 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Tests for InProcessPipelineRunner."""
-
-import logging
-import unittest
-
-from apache_beam import Pipeline
-import apache_beam.examples.snippets.snippets_test as snippets_test
-import apache_beam.io.fileio_test as fileio_test
-import apache_beam.io.textio_test as textio_test
-import apache_beam.io.sources_test as sources_test
-import apache_beam.pipeline_test as pipeline_test
-import apache_beam.pvalue_test as pvalue_test
-from apache_beam.runners.inprocess.inprocess_runner import InProcessPipelineRunner
-import apache_beam.transforms.aggregator_test as aggregator_test
-import apache_beam.transforms.combiners_test as combiners_test
-import apache_beam.transforms.ptransform_test as ptransform_test
-import apache_beam.transforms.trigger_test as trigger_test
-import apache_beam.transforms.window_test as window_test
-import apache_beam.transforms.write_ptransform_test as write_ptransform_test
-import apache_beam.typehints.typed_pipeline_test as typed_pipeline_test
-
-
-class TestWithInProcessPipelineRunner(object):
-
- def setUp(self):
- original_init = Pipeline.__init__
-
- def override_pipeline_init(self, runner=None, options=None, argv=None):
- runner = InProcessPipelineRunner()
- return original_init(self, runner, options, argv)
-
- self.runner_name = None
- self.original_init = original_init
- Pipeline.__init__ = override_pipeline_init
-
- def tearDown(self):
- Pipeline.__init__ = self.original_init
-
-
-class InProcessPipelineRunnerPipelineTest(
- TestWithInProcessPipelineRunner, pipeline_test.PipelineTest):
-
- def test_cached_pvalues_are_refcounted(self):
- # InProcessPipelineRunner does not have a refcounted cache.
- pass
-
- def test_eager_pipeline(self):
- # Tests eager runner only
- pass
-
-
-class InProcessPipelineRunnerSnippetsTest(
- TestWithInProcessPipelineRunner, snippets_test.SnippetsTest,
- snippets_test.ParDoTest, snippets_test.TypeHintsTest,
- snippets_test.CombineTest):
- pass
-
-
-class InProcessPipelineRunnerTransform(
- TestWithInProcessPipelineRunner, aggregator_test.AggregatorTest,
- combiners_test.CombineTest, ptransform_test.PTransformTest,
- pvalue_test.PValueTest, window_test.WindowTest,
- typed_pipeline_test.MainInputTest, typed_pipeline_test.SideInputTest,
- typed_pipeline_test.CustomTransformTest, trigger_test.TriggerPipelineTest,
- write_ptransform_test.WriteTest):
- pass
-
-
-class TestTextFileSource(
- TestWithInProcessPipelineRunner, fileio_test.TestTextFileSource):
- pass
-
-
-class TestNativeTextFileSink(
- TestWithInProcessPipelineRunner, fileio_test.TestNativeTextFileSink):
-
- def setUp(self):
- TestWithInProcessPipelineRunner.setUp(self)
- fileio_test.TestNativeTextFileSink.setUp(self)
-
-
-class TestTextFileSink(
- TestWithInProcessPipelineRunner, textio_test.TextSinkTest):
-
- def setUp(self):
- TestWithInProcessPipelineRunner.setUp(self)
- textio_test.TextSinkTest.setUp(self)
-
-
-class MyFileSink(TestWithInProcessPipelineRunner, fileio_test.MyFileSink):
- pass
-
-
-class TestFileSink(TestWithInProcessPipelineRunner, fileio_test.TestFileSink):
- pass
-
-
-class SourcesTest(TestWithInProcessPipelineRunner, sources_test.SourcesTest):
- pass
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.DEBUG)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/inprocess/inprocess_transform_result.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_transform_result.py b/sdks/python/apache_beam/runners/inprocess/inprocess_transform_result.py
deleted file mode 100644
index 798ebfb..0000000
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_transform_result.py
+++ /dev/null
@@ -1,60 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""The result of evaluating an AppliedPTransform with a TransformEvaluator."""
-
-from __future__ import absolute_import
-
-
-class InProcessTransformResult(object):
- """The result of evaluating an AppliedPTransform with a TransformEvaluator."""
-
- def __init__(self, applied_ptransform, uncommitted_output_bundles, state,
- timer_update, counters, watermark_hold,
- undeclared_tag_values=None):
- self._applied_ptransform = applied_ptransform
- self._uncommitted_output_bundles = uncommitted_output_bundles
- self._state = state
- self._timer_update = timer_update
- self._counters = counters
- self._watermark_hold = watermark_hold
- # Only used when caching (materializing) all values is requested.
- self._undeclared_tag_values = undeclared_tag_values
-
- @property
- def transform(self):
- return self._applied_ptransform
-
- @property
- def output_bundles(self):
- return self._uncommitted_output_bundles
-
- @property
- def state(self):
- return self._state
-
- @property
- def counters(self):
- return self._counters
-
- @property
- def watermark_hold(self):
- return self._watermark_hold
-
- @property
- def undeclared_tag_values(self):
- return self._undeclared_tag_values