You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/26 23:29:04 UTC
[1/2] beam git commit: Refactoring metrics infrastructure
Repository: beam
Updated Branches:
refs/heads/python-sdk e3849af8c -> 3d6f20d67
Refactoring metrics infrastructure
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b148f5cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b148f5cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b148f5cc
Branch: refs/heads/python-sdk
Commit: b148f5cc9f3e414b9cd1f605b25d50e21f626b7a
Parents: e3849af
Author: Pablo <pa...@google.com>
Authored: Mon Jan 23 17:50:21 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jan 26 15:28:49 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/metrics/execution.pxd | 31 +++++++++
sdks/python/apache_beam/metrics/execution.py | 70 ++++++++++++--------
sdks/python/apache_beam/runners/common.pxd | 2 +
sdks/python/apache_beam/runners/common.py | 11 ++-
.../apache_beam/runners/direct/executor.py | 12 ++--
.../runners/direct/transform_evaluator.py | 54 ++++++++-------
sdks/python/setup.py | 1 +
7 files changed, 125 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/metrics/execution.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution.pxd b/sdks/python/apache_beam/metrics/execution.pxd
new file mode 100644
index 0000000..d89004f
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/execution.pxd
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cimport cython
+
+
+cdef class MetricsContainer(object):
+ cdef object step_name
+ cdef public object counters
+ cdef public object distributions
+
+
+cdef class ScopedMetricsContainer(object):
+ cpdef enter(self)
+ cpdef exit(self)
+ cdef list _stack
+ cdef MetricsContainer _container
http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/metrics/execution.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py
index 8f04b7b..3ba1735 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -98,36 +98,49 @@ class MetricResult(object):
self.key, self.committed, self.attempted)
-class MetricsEnvironment(object):
+class _MetricsEnvironment(object):
"""Holds the MetricsContainer for every thread and other metric information.
This class is not meant to be instantiated, instead being used to keep
track of global state.
"""
- METRICS_SUPPORTED = False
- _METRICS_SUPPORTED_LOCK = threading.Lock()
-
- PER_THREAD = threading.local()
+ def __init__(self):
+ self.METRICS_SUPPORTED = False
+ self._METRICS_SUPPORTED_LOCK = threading.Lock()
+ self.PER_THREAD = threading.local()
+ self.set_container_stack()
+
+ def set_container_stack(self):
+ if not hasattr(self.PER_THREAD, 'container'):
+ self.PER_THREAD.container = []
+
+ def container_stack(self):
+ self.set_container_stack()
+ return self.PER_THREAD.container
+
+ def set_metrics_supported(self, supported):
+ self.set_container_stack()
+ with self._METRICS_SUPPORTED_LOCK:
+ self.METRICS_SUPPORTED = supported
+
+ def current_container(self):
+ self.set_container_stack()
+ index = len(self.PER_THREAD.container) - 1
+ if index < 0:
+ return None
+ else:
+ return self.PER_THREAD.container[index]
- @classmethod
- def set_metrics_supported(cls, supported):
- with cls._METRICS_SUPPORTED_LOCK:
- cls.METRICS_SUPPORTED = supported
+ def set_current_container(self, container):
+ self.set_container_stack()
+ self.PER_THREAD.container.append(container)
- @classmethod
- def current_container(cls):
- try:
- return cls.PER_THREAD.container
- except AttributeError:
- return None
+ def unset_current_container(self):
+ self.set_container_stack()
+ self.PER_THREAD.container.pop()
- @classmethod
- def set_current_container(cls, container):
- cls.PER_THREAD.container = container
- @classmethod
- def unset_current_container(cls):
- del cls.PER_THREAD.container
+MetricsEnvironment = _MetricsEnvironment()
class MetricsContainer(object):
@@ -180,16 +193,21 @@ class MetricsContainer(object):
class ScopedMetricsContainer(object):
- def __init__(self, container):
- self._old_container = MetricsEnvironment.current_container()
+ def __init__(self, container=None):
+ self._stack = MetricsEnvironment.container_stack()
self._container = container
+ def enter(self):
+ self._stack.append(self._container)
+
+ def exit(self):
+ self._stack.pop()
+
def __enter__(self):
- MetricsEnvironment.set_current_container(self._container)
- return self._container
+ self.enter()
def __exit__(self, type, value, traceback):
- MetricsEnvironment.set_current_container(self._old_container)
+ self.exit()
class MetricUpdates(object):
http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index 10d1f96..f41b313 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -18,6 +18,7 @@
cimport cython
from apache_beam.utils.windowed_value cimport WindowedValue
+from apache_beam.metrics.execution cimport ScopedMetricsContainer
cdef type SideOutputValue, TimestampedValue
@@ -40,6 +41,7 @@ cdef class DoFnRunner(Receiver):
cdef object args
cdef dict kwargs
cdef object side_inputs
+ cdef ScopedMetricsContainer scoped_metrics_container
cdef bint has_windowed_side_inputs
cdef Receiver main_receivers
http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 3741582..cb47513 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -22,6 +22,7 @@
import sys
from apache_beam.internal import util
+from apache_beam.metrics.execution import ScopedMetricsContainer
from apache_beam.pvalue import SideOutputValue
from apache_beam.transforms import core
from apache_beam.transforms import window
@@ -69,7 +70,8 @@ class DoFnRunner(Receiver):
logging_context=None,
# Preferred alternative to context
# TODO(robertwb): Remove once all runners are updated.
- state=None):
+ state=None,
+ scoped_metrics_container=None):
"""Initializes a DoFnRunner.
Args:
@@ -84,10 +86,13 @@ class DoFnRunner(Receiver):
step_name: the name of this step
logging_context: a LoggingContext object
state: handle for accessing DoFn state
+ scoped_metrics_container: Context switcher for metrics container
"""
self.step_name = step_name
self.window_fn = windowing.windowfn
self.tagged_receivers = tagged_receivers
+ self.scoped_metrics_container = (scoped_metrics_container
+ or ScopedMetricsContainer())
global_window = window.GlobalWindow()
@@ -236,6 +241,7 @@ class DoFnRunner(Receiver):
def _invoke_bundle_method(self, method):
try:
self.logging_context.enter()
+ self.scoped_metrics_container.enter()
self.context.set_element(None)
f = getattr(self.dofn, method)
@@ -251,6 +257,7 @@ class DoFnRunner(Receiver):
except BaseException as exn:
self.reraise_augmented(exn)
finally:
+ self.scoped_metrics_container.exit()
self.logging_context.exit()
def start(self):
@@ -262,6 +269,7 @@ class DoFnRunner(Receiver):
def process(self, element):
try:
self.logging_context.enter()
+ self.scoped_metrics_container.enter()
if self.is_new_dofn:
self.new_dofn_process(element)
else:
@@ -269,6 +277,7 @@ class DoFnRunner(Receiver):
except BaseException as exn:
self.reraise_augmented(exn)
finally:
+ self.scoped_metrics_container.exit()
self.logging_context.exit()
def reraise_augmented(self, exn):
http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 7e404f8..2d4a8bd 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -27,7 +27,7 @@ import traceback
from weakref import WeakValueDictionary
from apache_beam.metrics.execution import MetricsContainer
-from apache_beam.metrics.execution import MetricsEnvironment
+from apache_beam.metrics.execution import ScopedMetricsContainer
class ExecutorService(object):
@@ -270,7 +270,7 @@ class TransformExecutor(ExecutorService.CallableTask):
self._call_count += 1
assert self._call_count <= (1 + len(self._applied_transform.side_inputs))
metrics_container = MetricsContainer(self._applied_transform.full_label)
- MetricsEnvironment.set_current_container(metrics_container)
+ scoped_metrics_container = ScopedMetricsContainer(metrics_container)
for side_input in self._applied_transform.side_inputs:
if side_input not in self._side_input_values:
@@ -288,14 +288,16 @@ class TransformExecutor(ExecutorService.CallableTask):
try:
evaluator = self._transform_evaluator_registry.for_application(
- self._applied_transform, self._input_bundle, side_input_values)
+ self._applied_transform, self._input_bundle,
+ side_input_values, scoped_metrics_container)
if self._input_bundle:
for value in self._input_bundle.elements:
evaluator.process_element(value)
- result = evaluator.finish_bundle()
- result.metric_updates = metrics_container.get_cumulative()
+ with scoped_metrics_container:
+ result = evaluator.finish_bundle()
+ result.metric_updates = metrics_container.get_cumulative()
if self._evaluation_context.has_cache:
for uncommitted_bundle in result.output_bundles:
http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index e8d8c4c..13c87c5 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -61,7 +61,8 @@ class TransformEvaluatorRegistry(object):
}
def for_application(
- self, applied_ptransform, input_committed_bundle, side_inputs):
+ self, applied_ptransform, input_committed_bundle,
+ side_inputs, scoped_metrics_container):
"""Returns a TransformEvaluator suitable for processing given inputs."""
assert applied_ptransform
assert bool(applied_ptransform.side_inputs) == bool(side_inputs)
@@ -78,7 +79,8 @@ class TransformEvaluatorRegistry(object):
'Execution of [%s] not implemented in runner %s.' % (
type(applied_ptransform.transform), self))
return evaluator(self._evaluation_context, applied_ptransform,
- input_committed_bundle, side_inputs)
+ input_committed_bundle, side_inputs,
+ scoped_metrics_container)
def should_execute_serially(self, applied_ptransform):
"""Returns True if this applied_ptransform should run one bundle at a time.
@@ -108,7 +110,7 @@ class _TransformEvaluator(object):
"""An evaluator of a specific application of a transform."""
def __init__(self, evaluation_context, applied_ptransform,
- input_committed_bundle, side_inputs):
+ input_committed_bundle, side_inputs, scoped_metrics_container):
self._evaluation_context = evaluation_context
self._applied_ptransform = applied_ptransform
self._input_committed_bundle = input_committed_bundle
@@ -116,7 +118,9 @@ class _TransformEvaluator(object):
self._expand_outputs()
self._execution_context = evaluation_context.get_execution_context(
applied_ptransform)
- self.start_bundle()
+ self.scoped_metrics_container = scoped_metrics_container
+ with scoped_metrics_container:
+ self.start_bundle()
def _expand_outputs(self):
outputs = set()
@@ -176,14 +180,14 @@ class _BoundedReadEvaluator(_TransformEvaluator):
MAX_ELEMENT_PER_BUNDLE = 100
def __init__(self, evaluation_context, applied_ptransform,
- input_committed_bundle, side_inputs):
+ input_committed_bundle, side_inputs, scoped_metrics_container):
assert not input_committed_bundle
assert not side_inputs
self._source = applied_ptransform.transform.source
self._source.pipeline_options = evaluation_context.pipeline_options
super(_BoundedReadEvaluator, self).__init__(
evaluation_context, applied_ptransform, input_committed_bundle,
- side_inputs)
+ side_inputs, scoped_metrics_container)
def finish_bundle(self):
assert len(self._outputs) == 1
@@ -213,11 +217,11 @@ class _FlattenEvaluator(_TransformEvaluator):
"""TransformEvaluator for Flatten transform."""
def __init__(self, evaluation_context, applied_ptransform,
- input_committed_bundle, side_inputs):
+ input_committed_bundle, side_inputs, scoped_metrics_container):
assert not side_inputs
super(_FlattenEvaluator, self).__init__(
evaluation_context, applied_ptransform, input_committed_bundle,
- side_inputs)
+ side_inputs, scoped_metrics_container)
def start_bundle(self):
assert len(self._outputs) == 1
@@ -237,12 +241,12 @@ class _CreateEvaluator(_TransformEvaluator):
"""TransformEvaluator for Create transform."""
def __init__(self, evaluation_context, applied_ptransform,
- input_committed_bundle, side_inputs):
+ input_committed_bundle, side_inputs, scoped_metrics_container):
assert not input_committed_bundle
assert not side_inputs
super(_CreateEvaluator, self).__init__(
evaluation_context, applied_ptransform, input_committed_bundle,
- side_inputs)
+ side_inputs, scoped_metrics_container)
def start_bundle(self):
assert len(self._outputs) == 1
@@ -311,10 +315,10 @@ class _ParDoEvaluator(_TransformEvaluator):
"""TransformEvaluator for ParDo transform."""
def __init__(self, evaluation_context, applied_ptransform,
- input_committed_bundle, side_inputs):
+ input_committed_bundle, side_inputs, scoped_metrics_container):
super(_ParDoEvaluator, self).__init__(
evaluation_context, applied_ptransform, input_committed_bundle,
- side_inputs)
+ side_inputs, scoped_metrics_container)
def start_bundle(self):
transform = self._applied_ptransform.transform
@@ -358,12 +362,14 @@ class _ParDoEvaluator(_TransformEvaluator):
dofn, self._applied_ptransform.full_label)
else:
dofn = OutputCheckWrapperDoFn(dofn, self._applied_ptransform.full_label)
- self.runner = DoFnRunner(dofn, transform.args, transform.kwargs,
- self._side_inputs,
- self._applied_ptransform.inputs[0].windowing,
- tagged_receivers=self._tagged_receivers,
- step_name=self._applied_ptransform.full_label,
- state=DoFnState(self._counter_factory))
+ self.runner = DoFnRunner(
+ dofn, transform.args, transform.kwargs,
+ self._side_inputs,
+ self._applied_ptransform.inputs[0].windowing,
+ tagged_receivers=self._tagged_receivers,
+ step_name=self._applied_ptransform.full_label,
+ state=DoFnState(self._counter_factory),
+ scoped_metrics_container=self.scoped_metrics_container)
self.runner.start()
def process_element(self, element):
@@ -391,11 +397,11 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
self.completed = False
def __init__(self, evaluation_context, applied_ptransform,
- input_committed_bundle, side_inputs):
+ input_committed_bundle, side_inputs, scoped_metrics_container):
assert not side_inputs
super(_GroupByKeyOnlyEvaluator, self).__init__(
evaluation_context, applied_ptransform, input_committed_bundle,
- side_inputs)
+ side_inputs, scoped_metrics_container)
@property
def _is_final_bundle(self):
@@ -463,11 +469,11 @@ class _CreatePCollectionViewEvaluator(_TransformEvaluator):
"""TransformEvaluator for CreatePCollectionView transform."""
def __init__(self, evaluation_context, applied_ptransform,
- input_committed_bundle, side_inputs):
+ input_committed_bundle, side_inputs, scoped_metrics_container):
assert not side_inputs
super(_CreatePCollectionViewEvaluator, self).__init__(
evaluation_context, applied_ptransform, input_committed_bundle,
- side_inputs)
+ side_inputs, scoped_metrics_container)
@property
def _is_final_bundle(self):
@@ -509,11 +515,11 @@ class _NativeWriteEvaluator(_TransformEvaluator):
"""TransformEvaluator for _NativeWrite transform."""
def __init__(self, evaluation_context, applied_ptransform,
- input_committed_bundle, side_inputs):
+ input_committed_bundle, side_inputs, scoped_metrics_container):
assert not side_inputs
super(_NativeWriteEvaluator, self).__init__(
evaluation_context, applied_ptransform, input_committed_bundle,
- side_inputs)
+ side_inputs, scoped_metrics_container)
assert applied_ptransform.transform.sink
self._sink = applied_ptransform.transform.sink
http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 1fd622f..37125c2 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -118,6 +118,7 @@ setuptools.setup(
'**/*.pyx',
'apache_beam/coders/coder_impl.py',
'apache_beam/runners/common.py',
+ 'apache_beam/metrics/execution.py',
'apache_beam/transforms/cy_combiners.py',
'apache_beam/utils/counters.py',
'apache_beam/utils/windowed_value.py',
[2/2] beam git commit: Closes #1835
Posted by ro...@apache.org.
Closes #1835
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3d6f20d6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3d6f20d6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3d6f20d6
Branch: refs/heads/python-sdk
Commit: 3d6f20d677bd7397e6e2d099c829bf4c439f8d18
Parents: e3849af b148f5c
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jan 26 15:28:50 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jan 26 15:28:50 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/metrics/execution.pxd | 31 +++++++++
sdks/python/apache_beam/metrics/execution.py | 70 ++++++++++++--------
sdks/python/apache_beam/runners/common.pxd | 2 +
sdks/python/apache_beam/runners/common.py | 11 ++-
.../apache_beam/runners/direct/executor.py | 12 ++--
.../runners/direct/transform_evaluator.py | 54 ++++++++-------
sdks/python/setup.py | 1 +
7 files changed, 125 insertions(+), 56 deletions(-)
----------------------------------------------------------------------