You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/12/30 00:25:47 UTC
[1/2] beam git commit: [BEAM-147] Adding Metrics API to Python SDK
Repository: beam
Updated Branches:
refs/heads/python-sdk 8bf4c8059 -> d1906416e
[BEAM-147] Adding Metrics API to Python SDK
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/88bad042
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/88bad042
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/88bad042
Branch: refs/heads/python-sdk
Commit: 88bad042a0de8be89786c3e1c909f18b3be088f5
Parents: 8bf4c80
Author: Pablo <pa...@google.com>
Authored: Tue Nov 22 14:31:26 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Thu Dec 29 16:02:31 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/metrics/__init__.py | 17 +
sdks/python/apache_beam/metrics/cells.py | 315 +++++++++++++++++++
sdks/python/apache_beam/metrics/cells_test.py | 143 +++++++++
sdks/python/apache_beam/metrics/execution.py | 210 +++++++++++++
.../apache_beam/metrics/execution_test.py | 131 ++++++++
sdks/python/apache_beam/metrics/metric.py | 165 ++++++++++
sdks/python/apache_beam/metrics/metric_test.py | 85 +++++
sdks/python/apache_beam/metrics/metricbase.py | 81 +++++
.../runners/direct/direct_metrics.py | 112 +++++++
.../runners/direct/direct_metrics_test.py | 211 +++++++++++++
.../apache_beam/runners/direct/direct_runner.py | 5 +
.../runners/direct/evaluation_context.py | 9 +
.../apache_beam/runners/direct/executor.py | 9 +
.../runners/direct/transform_result.py | 4 +
sdks/python/apache_beam/runners/runner_test.py | 43 +++
15 files changed, 1540 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/__init__.py b/sdks/python/apache_beam/metrics/__init__.py
new file mode 100644
index 0000000..164d1a8
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from apache_beam.metrics.metric import Metrics
http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/cells.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py
new file mode 100644
index 0000000..5a571f5
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -0,0 +1,315 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+This file contains metric cell classes. A metric cell is used to accumulate
+in-memory changes to a metric. It represents a specific metric in a single
+context.
+
+Cells depend on a 'dirty-bit' in the CellCommitState class that tracks whether
+a cell's updates have been committed.
+"""
+
+import threading
+
+from apache_beam.metrics.metricbase import Counter
+from apache_beam.metrics.metricbase import Distribution
+
+
+class CellCommitState(object):
+ """Atomically tracks a cell's dirty/clean commit status.
+
+ Reporting a metric update works in a two-step process: First, updates to the
+ metric are received, and the metric is marked as 'dirty'. Later, updates are
+ committed, and then the cell may be marked as 'clean'.
+
+ The tracking of a cell's state is done conservatively: A metric may be
+ reported DIRTY even if updates have not occurred.
+
+ This class is thread-safe.
+ """
+
+ # Indicates that there have been changes to the cell since the last commit.
+ DIRTY = 0
+ # Indicates that there have NOT been changes to the cell since last commit.
+ CLEAN = 1
+ # Indicates that a commit of the current value is in progress.
+ COMMITTING = 2
+
+ def __init__(self):
+ """Initializes ``CellCommitState``.
+
+ A cell is initialized as dirty.
+ """
+ self._lock = threading.Lock()
+ self._state = CellCommitState.DIRTY
+
+ @property
+ def state(self):
+ with self._lock:
+ return self._state
+
+ def after_modification(self):
+ """Indicate that changes have been made to the metric being tracked.
+
+ Should be called after modification of the metric value.
+ """
+ with self._lock:
+ self._state = CellCommitState.DIRTY
+
+ def after_commit(self):
+ """Mark changes made up to the last call to ``before_commit`` as committed.
+
+ The next call to ``before_commit`` will return ``False`` unless there have
+ been changes made.
+ """
+ with self._lock:
+ if self._state == CellCommitState.COMMITTING:
+ self._state = CellCommitState.CLEAN
+
+ def before_commit(self):
+ """Check the dirty state, and mark the metric as committing.
+
+ After this call, the state is either CLEAN, or COMMITTING. If the state
+ was already CLEAN, then we simply return. If it was either DIRTY or
+ COMMITTING, then we set the cell as COMMITTING (e.g. in the middle of
+ a commit).
+
+ After a commit is successful, ``after_commit`` should be called.
+
+ Returns:
+ A boolean, which is false if the cell is CLEAN, and true otherwise.
+ """
+ with self._lock:
+ if self._state == CellCommitState.CLEAN:
+ return False
+ else:
+ self._state = CellCommitState.COMMITTING
+ return True
+
+
+class MetricCell(object):
+ """Accumulates in-memory changes to a metric.
+
+ A MetricCell represents a specific metric in a single context and bundle.
+ All subclasses must be thread safe, as these are used in the pipeline runners,
+ and may be subject to parallel/concurrent updates. Cells should only be used
+ directly within a runner.
+ """
+ def __init__(self):
+ self.commit = CellCommitState()
+ self._lock = threading.Lock()
+
+ def get_cumulative(self):
+ raise NotImplementedError
+
+
+class CounterCell(Counter, MetricCell):
+ """Tracks the current value and delta of a counter metric.
+
+ Each cell tracks the state of a metric independently per context per bundle.
+ Therefore, each metric has a different cell in each bundle, cells are
+ aggregated by the runner.
+
+ This class is thread safe.
+ """
+ def __init__(self, *args):
+ super(CounterCell, self).__init__(*args)
+ self.value = 0
+
+ def combine(self, other):
+ result = CounterCell()
+ result.inc(self.value + other.value)
+ return result
+
+ def inc(self, n=1):
+ with self._lock:
+ self.value += n
+ self.commit.after_modification()
+
+ def get_cumulative(self):
+ with self._lock:
+ return self.value
+
+
+class DistributionCell(Distribution, MetricCell):
+ """Tracks the current value and delta for a distribution metric.
+
+ Each cell tracks the state of a metric independently per context per bundle.
+ Therefore, each metric has a different cell in each bundle, that is later
+ aggregated.
+
+ This class is thread safe.
+ """
+ def __init__(self, *args):
+ super(DistributionCell, self).__init__(*args)
+ self.data = DistributionData(0, 0, None, None)
+
+ def combine(self, other):
+ result = DistributionCell()
+ result.data = self.data.combine(other.data)
+ return result
+
+ def update(self, value):
+ with self._lock:
+ self.commit.after_modification()
+ self._update(value)
+
+ def _update(self, value):
+ value = int(value)
+ self.data.count += 1
+ self.data.sum += value
+ self.data.min = (value
+ if self.data.min is None or self.data.min > value
+ else self.data.min)
+ self.data.max = (value
+ if self.data.max is None or self.data.max < value
+ else self.data.max)
+
+ def get_cumulative(self):
+ with self._lock:
+ return self.data.get_cumulative()
+
+
+class DistributionResult(object):
+ """The result of a Distribution metric.
+ """
+ def __init__(self, data):
+ self.data = data
+
+ def __eq__(self, other):
+ return self.data == other.data
+
+ @property
+ def max(self):
+ return self.data.max
+
+ @property
+ def min(self):
+ return self.data.min
+
+ @property
+ def count(self):
+ return self.data.count
+
+ @property
+ def sum(self):
+ return self.data.sum
+
+ @property
+ def mean(self):
+ """Returns the float mean of the distribution.
+
+ If the distribution contains no elements, it returns None.
+ """
+ if self.data.count == 0:
+ return None
+ else:
+ return float(self.data.sum)/self.data.count
+
+
+class DistributionData(object):
+ """The data structure that holds data about a distribution metric.
+
+ Distribution metrics are restricted to distributions of integers only.
+
+ This object is not thread safe, so it's not supposed to be modified
+ by other than the DistributionCell that contains it.
+ """
+ def __init__(self, sum, count, min, max):
+ self.sum = sum
+ self.count = count
+ self.min = min
+ self.max = max
+
+ def __eq__(self, other):
+ return (self.sum == other.sum and
+ self.count == other.count and
+ self.min == other.min and
+ self.max == other.max)
+
+ def __neq__(self, other):
+ return not self.__eq__(other)
+
+ def __repr__(self):
+ return '<DistributionData({}, {}, {}, {})>'.format(self.sum,
+ self.count,
+ self.min,
+ self.max)
+
+ def get_cumulative(self):
+ return DistributionData(self.sum, self.count, self.min, self.max)
+
+ def combine(self, other):
+ if other is None:
+ return self
+ else:
+ new_min = (None if self.min is None and other.min is None else
+ min(x for x in (self.min, other.min) if x is not None))
+ new_max = (None if self.max is None and other.max is None else
+ max(x for x in (self.max, other.max) if x is not None))
+ return DistributionData(
+ self.sum + other.sum,
+ self.count + other.count,
+ new_min,
+ new_max)
+
+ @classmethod
+ def singleton(cls, value):
+ return DistributionData(value, 1, value, value)
+
+
+class MetricAggregator(object):
+ """Base interface for aggregating metric data during pipeline execution."""
+ def zero(self):
+ raise NotImplementedError
+
+ def combine(self, updates):
+ raise NotImplementedError
+
+ def result(self, x):
+ raise NotImplementedError
+
+
+class CounterAggregator(MetricAggregator):
+ """Aggregator for Counter metric data during pipeline execution.
+
+ Values aggregated should be ``int`` objects.
+ """
+ def zero(self):
+ return 0
+
+ def combine(self, x, y):
+ return int(x) + int(y)
+
+ def result(self, x):
+ return int(x)
+
+
+class DistributionAggregator(MetricAggregator):
+ """Aggregator for Distribution metric data during pipeline execution.
+
+ Values aggregated should be ``DistributionData`` objects.
+ """
+ def zero(self):
+ return DistributionData(0, 0, None, None)
+
+ def combine(self, x, y):
+ return x.combine(y)
+
+ def result(self, x):
+ return DistributionResult(x.get_cumulative())
http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/cells_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/cells_test.py b/sdks/python/apache_beam/metrics/cells_test.py
new file mode 100644
index 0000000..a4c8a43
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/cells_test.py
@@ -0,0 +1,143 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import threading
+import unittest
+
+from apache_beam.metrics.cells import CounterCell
+from apache_beam.metrics.cells import DistributionCell
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.metrics.cells import CellCommitState
+
+
+class TestCounterCell(unittest.TestCase):
+ @classmethod
+ def _modify_counter(cls, d):
+ for i in range(cls.NUM_ITERATIONS):
+ d.inc(i)
+
+ NUM_THREADS = 5
+ NUM_ITERATIONS = 100
+
+ def test_parallel_access(self):
+ # We create NUM_THREADS threads that concurrently modify the counter.
+ threads = []
+ c = CounterCell()
+ for _ in range(TestCounterCell.NUM_THREADS):
+ t = threading.Thread(target=TestCounterCell._modify_counter,
+ args=(c,))
+ threads.append(t)
+ t.start()
+
+ for t in threads:
+ t.join()
+
+ total = (self.NUM_ITERATIONS * (self.NUM_ITERATIONS-1)/2 * self.NUM_THREADS)
+ self.assertEqual(c.get_cumulative(), total)
+
+ def test_basic_operations(self):
+ c = CounterCell()
+ c.inc(2)
+ self.assertEqual(c.get_cumulative(), 2)
+
+ c.dec(10)
+ self.assertEqual(c.get_cumulative(), -8)
+
+ c.dec()
+ self.assertEqual(c.get_cumulative(), -9)
+
+ c.inc()
+ self.assertEqual(c.get_cumulative(), -8)
+
+
+class TestDistributionCell(unittest.TestCase):
+ @classmethod
+ def _modify_distribution(cls, d):
+ for i in range(cls.NUM_ITERATIONS):
+ d.update(i)
+
+ NUM_THREADS = 5
+ NUM_ITERATIONS = 100
+
+ def test_parallel_access(self):
+ # We create NUM_THREADS threads that concurrently modify the distribution.
+ threads = []
+ d = DistributionCell()
+ for _ in range(TestDistributionCell.NUM_THREADS):
+ t = threading.Thread(target=TestDistributionCell._modify_distribution,
+ args=(d,))
+ threads.append(t)
+ t.start()
+
+ for t in threads:
+ t.join()
+
+ total = (self.NUM_ITERATIONS * (self.NUM_ITERATIONS-1)/2 * self.NUM_THREADS)
+
+ count = (self.NUM_ITERATIONS * self.NUM_THREADS)
+
+ self.assertEqual(d.get_cumulative(),
+ DistributionData(total, count, 0,
+ self.NUM_ITERATIONS - 1))
+
+ def test_basic_operations(self):
+ d = DistributionCell()
+ d.update(10)
+ self.assertEqual(d.get_cumulative(),
+ DistributionData(10, 1, 10, 10))
+
+ d.update(2)
+ self.assertEqual(d.get_cumulative(),
+ DistributionData(12, 2, 2, 10))
+
+ d.update(900)
+ self.assertEqual(d.get_cumulative(),
+ DistributionData(912, 3, 2, 900))
+
+ def test_integer_only(self):
+ d = DistributionCell()
+ d.update(3.1)
+ d.update(3.2)
+ d.update(3.3)
+ self.assertEqual(d.get_cumulative(),
+ DistributionData(9, 3, 3, 3))
+
+
+class TestCellCommitState(unittest.TestCase):
+ def test_basic_path(self):
+ ds = CellCommitState()
+ # Starts dirty
+ self.assertTrue(ds.before_commit())
+ ds.after_commit()
+ self.assertFalse(ds.before_commit())
+
+ # Make it dirty again
+ ds.after_modification()
+ self.assertTrue(ds.before_commit())
+ ds.after_commit()
+ self.assertFalse(ds.before_commit())
+
+ # Dirty again
+ ds.after_modification()
+ self.assertTrue(ds.before_commit())
+ ds.after_modification()
+ ds.after_commit()
+ self.assertTrue(ds.before_commit())
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/execution.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py
new file mode 100644
index 0000000..8f04b7b
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -0,0 +1,210 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Internal classes for Metrics API.
+
+The classes in this file keep shared state, and organize metrics information.
+
+Available classes:
+- MetricKey - Internal key for a metric.
+- MetricResult - Current status of a metric's updates/commits.
+- MetricsEnvironment - Keeps track of MetricsContainer and other metrics
+ information for every single execution working thread.
+- MetricsContainer - Holds the metrics of a single step and a single
+ unit-of-commit (bundle).
+"""
+from collections import defaultdict
+import threading
+
+from apache_beam.metrics.cells import CounterCell, DistributionCell
+
+
+class MetricKey(object):
+ """Key used to identify instance of metric cell.
+
+ Metrics are internally keyed by the step name they associated with and
+ the name of the metric.
+ """
+ def __init__(self, step, metric):
+ """Initializes ``MetricKey``.
+
+ Args:
+ step: A string with the step this metric cell is part of.
+ metric: A ``MetricName`` that identifies a metric.
+ """
+ self.step = step
+ self.metric = metric
+
+ def __eq__(self, other):
+ return (self.step == other.step and
+ self.metric == other.metric)
+
+ def __str__(self):
+ return 'MetricKey(step={}, metric={})'.format(
+ self.step, self.metric)
+
+ def __hash__(self):
+ return hash((self.step, self.metric))
+
+
+class MetricResult(object):
+ """Keeps track of the status of a metric within a single bundle.
+
+ It contains the physical and logical updates to the metric. Physical updates
+ are updates that have not necessarily been committed, but that have been made
+ during pipeline execution. Logical updates are updates that have been
+ committed.
+
+ Attributes:
+ key: A ``MetricKey`` that identifies the metric and bundle of this result.
+ committed: The committed updates of the metric. This attribute's type is
+ that of the underlying cell data (e.g. int, DistributionData).
+ attempted: The logical updates of the metric. This attribute's type is that
+ of the underlying cell data (e.g. int, DistributionData).
+ """
+ def __init__(self, key, committed, attempted):
+ """Initializes ``MetricResult``.
+ Args:
+ key: A ``MetricKey`` object.
+ committed: Metric data that has been committed (e.g. logical updates)
+ attempted: Metric data that has been attempted (e.g. physical updates)
+ """
+ self.key = key
+ self.committed = committed
+ self.attempted = attempted
+
+ def __eq__(self, other):
+ return (self.key == other.key and
+ self.committed == other.committed and
+ self.attempted == other.attempted)
+
+ def __str__(self):
+ return 'MetricResult(key={}, committed={}, attempted={})'.format(
+ self.key, self.committed, self.attempted)
+
+
+class MetricsEnvironment(object):
+ """Holds the MetricsContainer for every thread and other metric information.
+
+ This class is not meant to be instantiated, instead being used to keep
+ track of global state.
+ """
+ METRICS_SUPPORTED = False
+ _METRICS_SUPPORTED_LOCK = threading.Lock()
+
+ PER_THREAD = threading.local()
+
+ @classmethod
+ def set_metrics_supported(cls, supported):
+ with cls._METRICS_SUPPORTED_LOCK:
+ cls.METRICS_SUPPORTED = supported
+
+ @classmethod
+ def current_container(cls):
+ try:
+ return cls.PER_THREAD.container
+ except AttributeError:
+ return None
+
+ @classmethod
+ def set_current_container(cls, container):
+ cls.PER_THREAD.container = container
+
+ @classmethod
+ def unset_current_container(cls):
+ del cls.PER_THREAD.container
+
+
+class MetricsContainer(object):
+ """Holds the metrics of a single step and a single bundle."""
+ def __init__(self, step_name):
+ self.step_name = step_name
+ self.counters = defaultdict(lambda: CounterCell())
+ self.distributions = defaultdict(lambda: DistributionCell())
+
+ def get_counter(self, metric_name):
+ return self.counters[metric_name]
+
+ def get_distribution(self, metric_name):
+ return self.distributions[metric_name]
+
+ def _get_updates(self, filter=None):
+ """Return cumulative values of metrics filtered according to a lambda.
+
+ This returns all the cumulative values for all metrics after filtering
+ then with the filter parameter lambda function. If None is passed in,
+ then cumulative values for all metrics are returned.
+ """
+ if filter is None:
+ filter = lambda v: True
+ counters = {MetricKey(self.step_name, k): v.get_cumulative()
+ for k, v in self.counters.items()
+ if filter(v)}
+
+ distributions = {MetricKey(self.step_name, k): v.get_cumulative()
+ for k, v in self.distributions.items()
+ if filter(v)}
+
+ return MetricUpdates(counters, distributions)
+
+ def get_updates(self):
+ """Return cumulative values of metrics that changed since the last commit.
+
+ This returns all the cumulative values for all metrics only if their state
+ prior to the function call was COMMITTING or DIRTY.
+ """
+ return self._get_updates(filter=lambda v: v.commit.before_commit())
+
+ def get_cumulative(self):
+ """Return MetricUpdates with cumulative values of all metrics in container.
+
+ This returns all the cumulative values for all metrics regardless of whether
+ they have been committed or not.
+ """
+ return self._get_updates()
+
+
+class ScopedMetricsContainer(object):
+ def __init__(self, container):
+ self._old_container = MetricsEnvironment.current_container()
+ self._container = container
+
+ def __enter__(self):
+ MetricsEnvironment.set_current_container(self._container)
+ return self._container
+
+ def __exit__(self, type, value, traceback):
+ MetricsEnvironment.set_current_container(self._old_container)
+
+
+class MetricUpdates(object):
+ """Contains updates for several metrics.
+
+ A metric update is an object containing information to update a metric.
+ For Distribution metrics, it is DistributionData, and for Counter metrics,
+ it's an int.
+ """
+ def __init__(self, counters=None, distributions=None):
+ """Create a MetricUpdates object.
+
+ Args:
+ counters: Dictionary of MetricKey:MetricUpdate updates.
+ distributions: Dictionary of MetricKey:MetricUpdate objects.
+ """
+ self.counters = counters or {}
+ self.distributions = distributions or {}
http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/execution_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution_test.py b/sdks/python/apache_beam/metrics/execution_test.py
new file mode 100644
index 0000000..54569c1
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/execution_test.py
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from apache_beam.metrics.cells import CellCommitState
+from apache_beam.metrics.execution import MetricsContainer
+from apache_beam.metrics.execution import ScopedMetricsContainer
+from apache_beam.metrics.execution import MetricsEnvironment
+from apache_beam.metrics.execution import MetricKey
+from apache_beam.metrics.metric import Metrics
+from apache_beam.metrics.metricbase import MetricName
+
+
+class TestMetricsContainer(unittest.TestCase):
+ def test_create_new_counter(self):
+ mc = MetricsContainer('astep')
+ self.assertFalse(mc.counters.has_key(MetricName('namespace', 'name')))
+ mc.get_counter(MetricName('namespace', 'name'))
+ self.assertTrue(mc.counters.has_key(MetricName('namespace', 'name')))
+
+ def test_scoped_container(self):
+ c1 = MetricsContainer('mystep')
+ c2 = MetricsContainer('myinternalstep')
+ with ScopedMetricsContainer(c1):
+ self.assertEqual(c1, MetricsEnvironment.current_container())
+ counter = Metrics.counter('ns', 'name')
+ counter.inc(2)
+
+ with ScopedMetricsContainer(c2):
+ self.assertEqual(c2, MetricsEnvironment.current_container())
+ counter = Metrics.counter('ns', 'name')
+ counter.inc(3)
+ self.assertEqual(
+ c2.get_cumulative().counters.items(),
+ [(MetricKey('myinternalstep', MetricName('ns', 'name')), 3)])
+
+ self.assertEqual(c1, MetricsEnvironment.current_container())
+ counter = Metrics.counter('ns', 'name')
+ counter.inc(4)
+ self.assertEqual(
+ c1.get_cumulative().counters.items(),
+ [(MetricKey('mystep', MetricName('ns', 'name')), 6)])
+
+ def test_add_to_counter(self):
+ mc = MetricsContainer('astep')
+ counter = mc.get_counter(MetricName('namespace', 'name'))
+ counter.inc()
+ counter = mc.get_counter(MetricName('namespace', 'name'))
+ self.assertEqual(counter.value, 1)
+
+ def test_get_cumulative_or_updates(self):
+ mc = MetricsContainer('astep')
+
+ clean_values = []
+ dirty_values = []
+ for i in range(1, 11):
+ counter = mc.get_counter(MetricName('namespace', 'name{}'.format(i)))
+ distribution = mc.get_distribution(
+ MetricName('namespace', 'name{}'.format(i)))
+ counter.inc(i)
+ distribution.update(i)
+ if i % 2 == 0:
+ # Some are left to be DIRTY (i.e. not yet committed).
+ # Some are left to be CLEAN (i.e. already committed).
+ dirty_values.append(i)
+ continue
+ # Assert: Counter/Distribution is DIRTY or COMMITTING (not CLEAN)
+ self.assertEqual(distribution.commit.before_commit(), True)
+ self.assertEqual(counter.commit.before_commit(), True)
+ distribution.commit.after_commit()
+ counter.commit.after_commit()
+ # Assert: Counter/Distribution has been committed, therefore it's CLEAN
+ self.assertEqual(counter.commit.state, CellCommitState.CLEAN)
+ self.assertEqual(distribution.commit.state, CellCommitState.CLEAN)
+ clean_values.append(i)
+
+ # Retrieve NON-COMMITTED updates.
+ logical = mc.get_updates()
+ self.assertEqual(len(logical.counters), 5)
+ self.assertEqual(len(logical.distributions), 5)
+ self.assertEqual(set(dirty_values),
+ set([v for _, v in logical.counters.items()]))
+ # Retrieve ALL updates.
+ cumulative = mc.get_cumulative()
+ self.assertEqual(len(cumulative.counters), 10)
+ self.assertEqual(len(cumulative.distributions), 10)
+ self.assertEqual(set(dirty_values + clean_values),
+ set([v for _, v in cumulative.counters.items()]))
+
+
+class TestMetricsEnvironment(unittest.TestCase):
+ def test_uses_right_container(self):
+ c1 = MetricsContainer('step1')
+ c2 = MetricsContainer('step2')
+ counter = Metrics.counter('ns', 'name')
+ MetricsEnvironment.set_current_container(c1)
+ counter.inc()
+ MetricsEnvironment.set_current_container(c2)
+ counter.inc(3)
+ MetricsEnvironment.unset_current_container()
+
+ self.assertEqual(
+ c1.get_cumulative().counters.items(),
+ [(MetricKey('step1', MetricName('ns', 'name')), 1)])
+
+ self.assertEqual(
+ c2.get_cumulative().counters.items(),
+ [(MetricKey('step2', MetricName('ns', 'name')), 3)])
+
+ def test_no_container(self):
+ self.assertEqual(MetricsEnvironment.current_container(),
+ None)
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/metric.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py
new file mode 100644
index 0000000..13ca77b
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -0,0 +1,165 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+User-facing classes for Metrics API.
+
+The classes in this file allow users to define and use metrics to be collected
+and displayed as part of their pipeline execution.
+
+- Metrics - This class lets pipeline and transform writers create and access
+ metric objects such as counters, distributions, etc.
+"""
+import inspect
+
+from apache_beam.metrics.execution import MetricsEnvironment
+from apache_beam.metrics.metricbase import Counter, Distribution
+from apache_beam.metrics.metricbase import MetricName
+
+
+class Metrics(object):
+ """Lets users create/access metric objects during pipeline execution.
+ """
+ @staticmethod
+ def get_namespace(namespace):
+ if inspect.isclass(namespace):
+ return '{}.{}'.format(namespace.__module__, namespace.__name__)
+ elif isinstance(namespace, str):
+ return namespace
+ else:
+ raise ValueError('Unknown namespace type')
+
+ @staticmethod
+ def counter(namespace, name):
+ """Obtains or creates a Counter metric.
+
+ Args:
+ namespace: A class or string that gives the namespace to a metric
+ name: A string that gives a unique name to a metric
+
+ Returns:
+ A Counter object.
+ """
+ namespace = Metrics.get_namespace(namespace)
+ return Metrics.DelegatingCounter(MetricName(namespace, name))
+
+ @staticmethod
+ def distribution(namespace, name):
+ """Obtains or creates a Distribution metric.
+
+ Distribution metrics are restricted to integer-only distributions.
+
+ Args:
+ namespace: A class or string that gives the namespace to a metric
+ name: A string that gives a unique name to a metric
+
+ Returns:
+ A Distribution object.
+ """
+ namespace = Metrics.get_namespace(namespace)
+ return Metrics.DelegatingDistribution(MetricName(namespace, name))
+
+ class DelegatingCounter(Counter):
+ def __init__(self, metric_name):
+ self.metric_name = metric_name
+
+ def inc(self, n=1):
+ container = MetricsEnvironment.current_container()
+ if container is not None:
+ container.get_counter(self.metric_name).inc(n)
+
+ class DelegatingDistribution(Distribution):
+ def __init__(self, metric_name):
+ self.metric_name = metric_name
+
+ def update(self, value):
+ container = MetricsEnvironment.current_container()
+ if container is not None:
+ container.get_distribution(self.metric_name).update(value)
+
+
+class MetricResults(object):
+ @staticmethod
+ def matches(filter, metric_key):
+ if filter is None:
+ return True
+
+ if (metric_key.step in filter.steps and
+ metric_key.metric.namespace in filter.namespaces and
+ metric_key.metric.name in filter.names):
+ return True
+ else:
+ return False
+
+ def query(self, filter):
+ raise NotImplementedError
+
+
+class MetricsFilter(object):
+ """Simple object to filter metrics results.
+
+ If filters by matching a result's step-namespace-name with three internal
+ sets. No execution/matching logic is added to this object, so that it may
+ be used to construct arguments as an RPC request. It is left for runners
+ to implement matching logic by themselves.
+ """
+ def __init__(self):
+ self._names = set()
+ self._namespaces = set()
+ self._steps = set()
+
+ @property
+ def steps(self):
+ return frozenset(self._steps)
+
+ @property
+ def names(self):
+ return frozenset(self._names)
+
+ @property
+ def namespaces(self):
+ return frozenset(self._namespaces)
+
+ def with_name(self, name):
+ return self.with_names([name])
+
+ def with_names(self, names):
+ if isinstance(names, str):
+ raise ValueError('Names must be an iterable, not a string')
+
+ self._steps.update(names)
+ return self
+
+ def with_namespace(self, namespace):
+ return self.with_namespaces([namespace])
+
+ def with_namespaces(self, namespaces):
+ if isinstance(namespaces, str):
+ raise ValueError('Namespaces must be an iterable, not a string')
+
+ self._namespaces.update([Metrics.get_namespace(ns) for ns in namespaces])
+ return self
+
+ def with_step(self, step):
+ return self.with_steps([step])
+
+ def with_steps(self, steps):
+ if isinstance(namespaces, str):
+ raise ValueError('Steps must be an iterable, not a string')
+
+ self._steps.update(steps)
+ return self
http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/metric_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py
new file mode 100644
index 0000000..c478a85
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/metric_test.py
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.metrics.execution import MetricKey
+from apache_beam.metrics.execution import MetricsContainer
+from apache_beam.metrics.execution import MetricsEnvironment
+from apache_beam.metrics.metric import Metrics
+from apache_beam.metrics.metricbase import MetricName
+
+
+class NameTest(unittest.TestCase):
+ def test_basic_metric_name(self):
+ name = MetricName('namespace1', 'name1')
+ self.assertEqual(name.namespace, 'namespace1')
+ self.assertEqual(name.name, 'name1')
+ self.assertEqual(name, MetricName('namespace1', 'name1'))
+
+ key = MetricKey('step1', name)
+ self.assertEqual(key.step, 'step1')
+ self.assertEqual(key.metric.namespace, 'namespace1')
+ self.assertEqual(key.metric.name, 'name1')
+ self.assertEqual(key, MetricKey('step1', MetricName('namespace1', 'name1')))
+
+
+class MetricsTest(unittest.TestCase):
+ def test_get_namespace_class(self):
+ class MyClass(object):
+ pass
+
+ self.assertEqual('{}.{}'.format(MyClass.__module__, MyClass.__name__),
+ Metrics.get_namespace(MyClass))
+
+ def test_get_namespace_string(self):
+ namespace = 'MyNamespace'
+ self.assertEqual(namespace, Metrics.get_namespace(namespace))
+
+ def test_get_namespace_error(self):
+ with self.assertRaises(ValueError):
+ Metrics.get_namespace(object())
+
+ def test_create_counter_distribution(self):
+ MetricsEnvironment.set_current_container(MetricsContainer('mystep'))
+ counter_ns = 'aCounterNamespace'
+ distro_ns = 'aDistributionNamespace'
+ name = 'aName'
+ counter = Metrics.counter(counter_ns, name)
+ distro = Metrics.distribution(distro_ns, name)
+ counter.inc(10)
+ counter.dec(3)
+ distro.update(10)
+ distro.update(2)
+ self.assertTrue(isinstance(counter, Metrics.DelegatingCounter))
+ self.assertTrue(isinstance(distro, Metrics.DelegatingDistribution))
+
+ del distro
+ del counter
+
+ container = MetricsEnvironment.current_container()
+ self.assertEqual(
+ container.counters[MetricName(counter_ns, name)].get_cumulative(),
+ 7)
+ self.assertEqual(
+ container.distributions[MetricName(distro_ns, name)].get_cumulative(),
+ DistributionData(12, 2, 2, 10))
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/metrics/metricbase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metricbase.py b/sdks/python/apache_beam/metrics/metricbase.py
new file mode 100644
index 0000000..1ad6962
--- /dev/null
+++ b/sdks/python/apache_beam/metrics/metricbase.py
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+The classes in this file are interfaces for metrics. They are not intended
+to be subclassed or created directly by users. To work with and access metrics,
+ users should use the classes and methods exposed in metric.py.
+
+Available classes:
+- Metric - Base interface of a metrics object.
+- Counter - Counter metric interface. Allows a count to be incremented or
+ decremented during pipeline execution.
+- Distribution - Distribution Metric interface. Allows statistics about the
+ distribution of a variable to be collected during pipeline execution.
+- MetricName - Namespace and name used to refer to a Metric.
+"""
+
+
+class MetricName(object):
+ """The name of a metric.
+
+ The name of a metric consists of a namespace and a name. The namespace
+ allows grouping related metrics together and also prevents collisions
+ between multiple metrics of the same name.
+ """
+ def __init__(self, namespace, name):
+ """Initializes ``MetricName``.
+
+ Args:
+ namespace: A string with the namespace of a metric.
+ name: A string with the name of a metric.
+ """
+ self.namespace = namespace
+ self.name = name
+
+ def __eq__(self, other):
+ return (self.namespace == other.namespace and
+ self.name == other.name)
+
+ def __str__(self):
+ return 'MetricName(namespace={}, name={})'.format(
+ self.namespace, self.name)
+
+ def __hash__(self):
+ return hash((self.namespace, self.name))
+
+
+class Metric(object):
+ """Base interface of a metric object."""
+ pass
+
+
+class Counter(Metric):
+ """Counter metric interface. Allows a count to be incremented/decremented
+ during pipeline execution."""
+ def inc(self, n=1):
+ raise NotImplementedError
+
+ def dec(self, n=1):
+ self.inc(-n)
+
+
+class Distribution(Metric):
+ """Distribution Metric interface. Allows statistics about the
+ distribution of a variable to be collected during pipeline execution."""
+ def update(self, value):
+ raise NotImplementedError
http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/direct_metrics.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_metrics.py b/sdks/python/apache_beam/runners/direct/direct_metrics.py
new file mode 100644
index 0000000..9d23487
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/direct_metrics.py
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+DirectRunner implementation of MetricResults. It is in charge not only of
+responding to queries of current metrics, but also of keeping the common
+state consistent.
+"""
+from collections import defaultdict
+import threading
+
+from apache_beam.metrics.cells import CounterAggregator
+from apache_beam.metrics.cells import DistributionAggregator
+from apache_beam.metrics.execution import MetricKey
+from apache_beam.metrics.execution import MetricResult
+from apache_beam.metrics.metric import MetricResults
+
+
+class DirectMetrics(MetricResults):
+ def __init__(self):
+ self._counters = defaultdict(
+ lambda: DirectMetric(CounterAggregator()))
+ self._distributions = defaultdict(
+ lambda: DirectMetric(DistributionAggregator()))
+
+ def _apply_operation(self, bundle, updates, op):
+ for k, v in updates.counters.items():
+ op(self._counters[k], bundle, v)
+
+ for k, v in updates.distributions.items():
+ op(self._distributions[k], bundle, v)
+
+ def commit_logical(self, bundle, updates):
+ op = lambda obj, bundle, update: obj.commit_logical(bundle, update)
+ self._apply_operation(bundle, updates, op)
+
+ def commit_physical(self, bundle, updates):
+ op = lambda obj, bundle, update: obj.commit_physical(bundle, update)
+ self._apply_operation(bundle, updates, op)
+
+ def update_physical(self, bundle, updates):
+ op = lambda obj, bundle, update: obj.update_physical(bundle, update)
+ self._apply_operation(bundle, updates, op)
+
+ def query(self, filter=None):
+ counters = [MetricResult(MetricKey(k.step, k.metric),
+ v.extract_committed(),
+ v.extract_latest_attempted())
+ for k, v in self._counters.items()
+ if self.matches(filter, k)]
+ distributions = [MetricResult(MetricKey(k.step, k.metric),
+ v.extract_committed(),
+ v.extract_latest_attempted())
+ for k, v in self._distributions.items()
+ if self.matches(filter, k)]
+
+ return {'counters': counters,
+ 'distributions': distributions}
+
+
+class DirectMetric(object):
+ """ Keeps a consistent state for a single metric.
+
+ It keeps track of the metric's physical and logical updates.
+ It's thread safe.
+ """
+ def __init__(self, aggregator):
+ self.aggregator = aggregator
+ self._attempted_lock = threading.Lock()
+ self.finished_attempted = aggregator.zero()
+ self.inflight_attempted = {}
+ self._committed_lock = threading.Lock()
+ self.finished_committed = aggregator.zero()
+
+ def commit_logical(self, bundle, update):
+ with self._committed_lock:
+ self.finished_committed = self.aggregator.combine(update,
+ self.finished_committed)
+
+ def commit_physical(self, bundle, update):
+ with self._attempted_lock:
+ self.inflight_attempted[bundle] = update
+ self.finished_attempted = self.aggregator.combine(update,
+ self.finished_attempted)
+ del self.inflight_attempted[bundle]
+
+ def update_physical(self, bundle, update):
+ self.inflight_attempted[bundle] = update
+
+ def extract_committed(self):
+ return self.aggregator.result(self.finished_committed)
+
+ def extract_latest_attempted(self):
+ res = self.finished_attempted
+ for _, u in self.inflight_attempted.items():
+ res = self.aggregator.combine(res, u)
+
+ return self.aggregator.result(res)
http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/direct_metrics_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_metrics_test.py b/sdks/python/apache_beam/runners/direct/direct_metrics_test.py
new file mode 100644
index 0000000..256b91f
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/direct_metrics_test.py
@@ -0,0 +1,211 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+import hamcrest as hc
+
+from apache_beam.metrics.metricbase import MetricName
+from apache_beam.metrics.execution import MetricUpdates
+from apache_beam.metrics.execution import MetricResult
+from apache_beam.metrics.execution import MetricKey
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.metrics.cells import DistributionResult
+from apache_beam.runners.direct.direct_metrics import DirectMetrics
+
+
+class DirectMetricsTest(unittest.TestCase):
+ name1 = MetricName('namespace1', 'name1')
+ name2 = MetricName('namespace1', 'name2')
+ name3 = MetricName('namespace2', 'name1')
+
+ bundle1 = object() # For this test, any object can be a bundle
+ bundle2 = object()
+
+ def test_combiner_functions(self):
+ metrics = DirectMetrics()
+ counter = metrics._counters['anykey']
+ counter.commit_logical(self.bundle1, 5)
+ self.assertEqual(counter.extract_committed(), 5)
+ with self.assertRaises(TypeError):
+ counter.commit_logical(self.bundle1, None)
+
+ distribution = metrics._distributions['anykey']
+ distribution.commit_logical(self.bundle1, DistributionData(4, 1, 4, 4))
+ self.assertEqual(distribution.extract_committed(),
+ DistributionResult(DistributionData(4, 1, 4, 4)))
+
+ with self.assertRaises(AttributeError):
+ distribution.commit_logical(self.bundle1, None)
+
+ def test_commit_logical_no_filter(self):
+ metrics = DirectMetrics()
+ metrics.commit_logical(
+ self.bundle1,
+ MetricUpdates(
+ counters={MetricKey('step1', self.name1): 5,
+ MetricKey('step1', self.name2): 8},
+ distributions={
+ MetricKey('step1', self.name1): DistributionData(8, 2, 3, 5)}))
+
+ metrics.commit_logical(
+ self.bundle1,
+ MetricUpdates(
+ counters={MetricKey('step2', self.name1): 7,
+ MetricKey('step1', self.name2): 4},
+ distributions={
+ MetricKey('step1', self.name1): DistributionData(4, 1, 4, 4)}))
+
+ results = metrics.query()
+ hc.assert_that(
+ results['counters'],
+ hc.contains_inanyorder(*[
+ MetricResult(MetricKey('step1', self.name2), 12, 0),
+ MetricResult(MetricKey('step2', self.name1), 7, 0),
+ MetricResult(MetricKey('step1', self.name1), 5, 0)]))
+ hc.assert_that(
+ results['distributions'],
+ hc.contains_inanyorder(
+ MetricResult(MetricKey('step1', self.name1),
+ DistributionResult(
+ DistributionData(12, 3, 3, 5)),
+ DistributionResult(
+ DistributionData(0, 0, None, None)))))
+
+ def test_apply_physical_no_filter(self):
+ metrics = DirectMetrics()
+ metrics.update_physical(object(),
+ MetricUpdates(
+ counters={MetricKey('step1', self.name1): 5,
+ MetricKey('step1', self.name3): 8}))
+
+ metrics.update_physical(object(),
+ MetricUpdates(
+ counters={MetricKey('step2', self.name1): 7,
+ MetricKey('step1', self.name3): 4}))
+ results = metrics.query()
+ hc.assert_that(results['counters'],
+ hc.contains_inanyorder(*[
+ MetricResult(MetricKey('step1', self.name1), 0, 5),
+ MetricResult(MetricKey('step1', self.name3), 0, 12),
+ MetricResult(MetricKey('step2', self.name1), 0, 7)]))
+
+ metrics.commit_physical(object(), MetricUpdates())
+ results = metrics.query()
+ hc.assert_that(results['counters'],
+ hc.contains_inanyorder(*[
+ MetricResult(MetricKey('step1', self.name1), 0, 5),
+ MetricResult(MetricKey('step1', self.name3), 0, 12),
+ MetricResult(MetricKey('step2', self.name1), 0, 7)]))
+
+ def test_apply_physical_logical(self):
+ metrics = DirectMetrics()
+ dist_zero = DistributionData(0, 0, None, None)
+ metrics.update_physical(
+ object(),
+ MetricUpdates(
+ counters={MetricKey('step1', self.name1): 7,
+ MetricKey('step1', self.name2): 5,
+ MetricKey('step2', self.name1): 1},
+ distributions={MetricKey('step1', self.name1):
+ DistributionData(3, 1, 3, 3),
+ MetricKey('step2', self.name3):
+ DistributionData(8, 2, 4, 4)}))
+ results = metrics.query()
+ hc.assert_that(results['counters'],
+ hc.contains_inanyorder(*[
+ MetricResult(MetricKey('step1', self.name1), 0, 7),
+ MetricResult(MetricKey('step1', self.name2), 0, 5),
+ MetricResult(MetricKey('step2', self.name1), 0, 1)]))
+ hc.assert_that(results['distributions'],
+ hc.contains_inanyorder(*[
+ MetricResult(
+ MetricKey('step1', self.name1),
+ DistributionResult(dist_zero),
+ DistributionResult(DistributionData(3, 1, 3, 3))),
+ MetricResult(
+ MetricKey('step2', self.name3),
+ DistributionResult(dist_zero),
+ DistributionResult(DistributionData(8, 2, 4, 4)))]))
+
+ metrics.commit_physical(
+ object(),
+ MetricUpdates(
+ counters={MetricKey('step1', self.name1): -3,
+ MetricKey('step2', self.name1): -5},
+ distributions={MetricKey('step1', self.name1):
+ DistributionData(8, 4, 1, 5),
+ MetricKey('step2', self.name2):
+ DistributionData(8, 8, 1, 1)}))
+ results = metrics.query()
+ hc.assert_that(results['counters'],
+ hc.contains_inanyorder(*[
+ MetricResult(MetricKey('step1', self.name1), 0, 4),
+ MetricResult(MetricKey('step1', self.name2), 0, 5),
+ MetricResult(MetricKey('step2', self.name1), 0, -4)]))
+ hc.assert_that(results['distributions'],
+ hc.contains_inanyorder(*[
+ MetricResult(
+ MetricKey('step1', self.name1),
+ DistributionResult(dist_zero),
+ DistributionResult(DistributionData(11, 5, 1, 5))),
+ MetricResult(
+ MetricKey('step2', self.name3),
+ DistributionResult(dist_zero),
+ DistributionResult(DistributionData(8, 2, 4, 4))),
+ MetricResult(
+ MetricKey('step2', self.name2),
+ DistributionResult(dist_zero),
+ DistributionResult(DistributionData(8, 8, 1, 1)))]))
+
+ metrics.commit_logical(
+ object(),
+ MetricUpdates(
+ counters={MetricKey('step1', self.name1): 3,
+ MetricKey('step1', self.name2): 5,
+ MetricKey('step2', self.name1): -3},
+ distributions={MetricKey('step1', self.name1):
+ DistributionData(11, 5, 1, 5),
+ MetricKey('step2', self.name2):
+ DistributionData(8, 8, 1, 1),
+ MetricKey('step2', self.name3):
+ DistributionData(4, 1, 4, 4)}))
+
+ results = metrics.query()
+ hc.assert_that(results['counters'],
+ hc.contains_inanyorder(*[
+ MetricResult(MetricKey('step1', self.name1), 3, 4),
+ MetricResult(MetricKey('step1', self.name2), 5, 5),
+ MetricResult(MetricKey('step2', self.name1), -3, -4)]))
+ hc.assert_that(results['distributions'],
+ hc.contains_inanyorder(*[
+ MetricResult(
+ MetricKey('step1', self.name1),
+ DistributionResult(DistributionData(11, 5, 1, 5)),
+ DistributionResult(DistributionData(11, 5, 1, 5))),
+ MetricResult(
+ MetricKey('step2', self.name3),
+ DistributionResult(DistributionData(4, 1, 4, 4)),
+ DistributionResult(DistributionData(8, 2, 4, 4))),
+ MetricResult(
+ MetricKey('step2', self.name2),
+ DistributionResult(DistributionData(8, 8, 1, 1)),
+ DistributionResult(DistributionData(8, 8, 1, 1)))]))
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 523eb05..a5c616b 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -27,6 +27,7 @@ import collections
import logging
from apache_beam.runners.direct.bundle_factory import BundleFactory
+from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.runners.runner import PipelineResult
from apache_beam.runners.runner import PipelineRunner
from apache_beam.runners.runner import PipelineState
@@ -52,6 +53,7 @@ class DirectRunner(PipelineRunner):
from apache_beam.runners.direct.transform_evaluator import \
TransformEvaluatorRegistry
+ MetricsEnvironment.set_metrics_supported(True)
logging.info('Running pipeline with DirectRunner.')
self.visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.visitor)
@@ -152,6 +154,9 @@ class DirectPipelineResult(PipelineResult):
def aggregated_values(self, aggregator_or_name):
return self._evaluation_context.get_aggregator_values(aggregator_or_name)
+ def metrics(self):
+ return self._evaluation_context.metrics()
+
class EagerRunner(DirectRunner):
http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 48d353b..34701f5 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -26,6 +26,7 @@ from apache_beam.transforms import sideinputs
from apache_beam.runners.direct.clock import Clock
from apache_beam.runners.direct.watermark_manager import WatermarkManager
from apache_beam.runners.direct.executor import TransformExecutor
+from apache_beam.runners.direct.direct_metrics import DirectMetrics
from apache_beam.utils import counters
@@ -142,6 +143,7 @@ class EvaluationContext(object):
self._pending_unblocked_tasks = []
self._counter_factory = counters.CounterFactory()
self._cache = None
+ self._metrics = DirectMetrics()
self._lock = threading.Lock()
@@ -149,6 +151,10 @@ class EvaluationContext(object):
assert not self._cache
self._cache = cache
+ def metrics(self):
+ # TODO. Should this be made a @property?
+ return self._metrics
+
@property
def has_cache(self):
return self._cache is not None
@@ -187,6 +193,9 @@ class EvaluationContext(object):
completed_bundle, result.transform, completed_timers,
committed_bundles, result.watermark_hold)
+ self._metrics.commit_logical(completed_bundle,
+ result.logical_metric_updates())
+
# If the result is for a view, update side inputs container.
if (result.output_bundles
and result.output_bundles[0].pcollection in self.views):
http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 378aecf..7e404f8 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -26,6 +26,9 @@ import threading
import traceback
from weakref import WeakValueDictionary
+from apache_beam.metrics.execution import MetricsContainer
+from apache_beam.metrics.execution import MetricsEnvironment
+
class ExecutorService(object):
"""Thread pool for executing tasks in parallel."""
@@ -266,6 +269,8 @@ class TransformExecutor(ExecutorService.CallableTask):
def __call__(self):
self._call_count += 1
assert self._call_count <= (1 + len(self._applied_transform.side_inputs))
+ metrics_container = MetricsContainer(self._applied_transform.full_label)
+ MetricsEnvironment.set_current_container(metrics_container)
for side_input in self._applied_transform.side_inputs:
if side_input not in self._side_input_values:
@@ -290,6 +295,7 @@ class TransformExecutor(ExecutorService.CallableTask):
evaluator.process_element(value)
result = evaluator.finish_bundle()
+ result.metric_updates = metrics_container.get_cumulative()
if self._evaluation_context.has_cache:
for uncommitted_bundle in result.output_bundles:
@@ -308,6 +314,9 @@ class TransformExecutor(ExecutorService.CallableTask):
logging.warning('Task failed: %s', traceback.format_exc(), exc_info=True)
self._completion_callback.handle_exception(e)
finally:
+ self._evaluation_context.metrics().commit_physical(
+ self._input_bundle,
+ metrics_container.get_cumulative())
self._transform_evaluation_state.complete(self)
http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/direct/transform_result.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_result.py b/sdks/python/apache_beam/runners/direct/transform_result.py
index 298e629..59597dc 100644
--- a/sdks/python/apache_beam/runners/direct/transform_result.py
+++ b/sdks/python/apache_beam/runners/direct/transform_result.py
@@ -34,6 +34,10 @@ class TransformResult(object):
self._watermark_hold = watermark_hold
# Only used when caching (materializing) all values is requested.
self._undeclared_tag_values = undeclared_tag_values
+ self.metric_updates = None
+
+ def logical_metric_updates(self):
+ return self.metric_updates
@property
def transform(self):
http://git-wip-us.apache.org/repos/asf/beam/blob/88bad042/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index ff6a22e..ea86061 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -26,6 +26,8 @@ from datetime import datetime
import json
import unittest
+import hamcrest as hc
+
import apache_beam as beam
from apache_beam.internal import apiclient
@@ -38,6 +40,12 @@ import apache_beam.transforms as ptransform
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.metrics.cells import DistributionResult
+from apache_beam.metrics.execution import MetricResult
+from apache_beam.metrics.execution import MetricKey
+from apache_beam.metrics.metricbase import MetricName
+
class RunnerTest(unittest.TestCase):
default_properties = [
@@ -135,6 +143,41 @@ class RunnerTest(unittest.TestCase):
self.assertEqual(len(disp_data), 3)
self.assertEqual(disp_data, expected_data)
+ def test_direct_runner_metrics(self):
+ from apache_beam.metrics.metric import Metrics
+
+ class MyDoFn(beam.DoFn):
+ def process(self, context):
+ count = Metrics.counter(self.__class__, 'elements')
+ count.inc()
+ distro = Metrics.distribution(self.__class__, 'element-dist')
+ distro.update(context.element)
+ return [context.element]
+
+ runner = DirectRunner()
+ p = Pipeline(runner,
+ options=PipelineOptions(self.default_properties))
+ # pylint: disable=expression-not-assigned
+ (p | 'create' >> ptransform.Create([1, 2, 3, 4, 5])
+ | 'do' >> beam.ParDo(MyDoFn()))
+ result = p.run()
+ metrics = result.metrics().query()
+ namespace = '{}.{}'.format(MyDoFn.__module__,
+ MyDoFn.__name__)
+ hc.assert_that(
+ metrics['counters'],
+ hc.contains_inanyorder(
+ MetricResult(
+ MetricKey('do', MetricName(namespace, 'elements')),
+ 5, 5)))
+ hc.assert_that(
+ metrics['distributions'],
+ hc.contains_inanyorder(
+ MetricResult(
+ MetricKey('do', MetricName(namespace, 'element-dist')),
+ DistributionResult(DistributionData(15, 5, 1, 5)),
+ DistributionResult(DistributionData(15, 5, 1, 5)))))
+
def test_no_group_by_key_directly_after_bigquery(self):
remote_runner = DataflowRunner()
p = Pipeline(remote_runner,
[2/2] beam git commit: Closes #1420
Posted by bc...@apache.org.
Closes #1420
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1906416
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1906416
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1906416
Branch: refs/heads/python-sdk
Commit: d1906416e97d5f9b95dacb42137e6662c332e3dd
Parents: 8bf4c80 88bad04
Author: bchambers <bc...@google.com>
Authored: Thu Dec 29 16:02:59 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Thu Dec 29 16:02:59 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/metrics/__init__.py | 17 +
sdks/python/apache_beam/metrics/cells.py | 315 +++++++++++++++++++
sdks/python/apache_beam/metrics/cells_test.py | 143 +++++++++
sdks/python/apache_beam/metrics/execution.py | 210 +++++++++++++
.../apache_beam/metrics/execution_test.py | 131 ++++++++
sdks/python/apache_beam/metrics/metric.py | 165 ++++++++++
sdks/python/apache_beam/metrics/metric_test.py | 85 +++++
sdks/python/apache_beam/metrics/metricbase.py | 81 +++++
.../runners/direct/direct_metrics.py | 112 +++++++
.../runners/direct/direct_metrics_test.py | 211 +++++++++++++
.../apache_beam/runners/direct/direct_runner.py | 5 +
.../runners/direct/evaluation_context.py | 9 +
.../apache_beam/runners/direct/executor.py | 9 +
.../runners/direct/transform_result.py | 4 +
sdks/python/apache_beam/runners/runner_test.py | 43 +++
15 files changed, 1540 insertions(+)
----------------------------------------------------------------------