You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/24 16:48:27 UTC
[1/2] beam git commit: Adding IOTargetName and unittests for
CounterName
Repository: beam
Updated Branches:
refs/heads/master 5181e619f -> 260c4fce8
Adding IOTargetName and unittests for CounterName
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/185daffa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/185daffa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/185daffa
Branch: refs/heads/master
Commit: 185daffa53595b3a6d900252d69132c85013aa4c
Parents: 5181e61
Author: Pablo <pa...@google.com>
Authored: Tue Aug 22 16:53:32 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Aug 24 09:47:40 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/utils/counters.py | 58 +++++++++------
sdks/python/apache_beam/utils/counters_test.py | 78 +++++++++++++++++++++
2 files changed, 115 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/185daffa/sdks/python/apache_beam/utils/counters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/counters.py b/sdks/python/apache_beam/utils/counters.py
index 5d029dc..08685aa 100644
--- a/sdks/python/apache_beam/utils/counters.py
+++ b/sdks/python/apache_beam/utils/counters.py
@@ -23,40 +23,56 @@
For internal use only; no backwards-compatibility guarantees.
"""
+from collections import namedtuple
import threading
+
from apache_beam.transforms import cy_combiners
-class CounterName(object):
+# Information identifying the IO being measured by a counter.
+IOTargetName = namedtuple('IOTargetName', ['side_input_step_name',
+ 'side_input_index',
+ 'original_shuffle_step_name'])
+
+
+def side_input_id(step_name, input_index):
+ """Create an IOTargetName that identifies the reading of a side input."""
+ return IOTargetName(step_name, input_index, None)
+
+
+def shuffle_id(step_name):
+ """Create an IOTargetName that identifies a GBK step."""
+ return IOTargetName(None, None, step_name)
+
+
+_CounterName = namedtuple('_CounterName', ['name',
+ 'stage_name',
+ 'step_name',
+ 'system_name',
+ 'namespace',
+ 'origin',
+ 'output_index',
+ 'io_target'])
+
+
+class CounterName(_CounterName):
"""Naming information for a counter."""
SYSTEM = object()
USER = object()
- def __init__(self, name, stage_name=None, step_name=None,
- system_name=None, namespace=None,
- origin=None, output_index=None):
- self.name = name
- self.origin = origin or CounterName.SYSTEM
- self.namespace = namespace
- self.stage_name = stage_name
- self.step_name = step_name
- self.system_name = system_name
- self.output_index = output_index
-
- def __hash__(self):
- return hash((self.name,
- self.origin,
- self.namespace,
- self.stage_name,
- self.step_name,
- self.system_name,
- self.output_index))
+ def __new__(cls, name, stage_name=None, step_name=None,
+ system_name=None, namespace=None,
+ origin=None, output_index=None, io_target=None):
+ origin = origin or CounterName.SYSTEM
+ return super(CounterName, cls).__new__(cls, name, stage_name, step_name,
+ system_name, namespace,
+ origin, output_index, io_target)
def __str__(self):
return '%s' % self._str_internal()
def __repr__(self):
- return '<%s at %s>' % (self._str_internal(), hex(id(self)))
+ return '<CounterName<%s> at %s>' % (self._str_internal(), hex(id(self)))
def _str_internal(self):
if self.origin == CounterName.USER:
http://git-wip-us.apache.org/repos/asf/beam/blob/185daffa/sdks/python/apache_beam/utils/counters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/counters_test.py b/sdks/python/apache_beam/utils/counters_test.py
new file mode 100644
index 0000000..37cab88
--- /dev/null
+++ b/sdks/python/apache_beam/utils/counters_test.py
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for counters and counter names."""
+
+from __future__ import absolute_import
+
+import unittest
+
+from apache_beam.utils import counters
+from apache_beam.utils.counters import CounterName
+
+
+class CounterNameTest(unittest.TestCase):
+
+ def test_equal_objects(self):
+ self.assertEqual(CounterName('counter_name',
+ 'stage_name',
+ 'step_name'),
+ CounterName('counter_name',
+ 'stage_name',
+ 'step_name'))
+ self.assertNotEqual(CounterName('counter_name',
+ 'stage_name',
+ 'step_name'),
+ CounterName('counter_name',
+ 'stage_name',
+ 'step_nam'))
+
+ # Testing objects with an IOTarget.
+ self.assertEqual(CounterName('counter_name',
+ 'stage_name',
+ 'step_name',
+ io_target=counters.side_input_id(1, 's9')),
+ CounterName('counter_name',
+ 'stage_name',
+ 'step_name',
+ io_target=counters.side_input_id(1, 's9')))
+ self.assertNotEqual(CounterName('counter_name',
+ 'stage_name',
+ 'step_name',
+ io_target=counters.side_input_id(1, 's')),
+ CounterName('counter_name',
+ 'stage_name',
+ 'step_name',
+ io_target=counters.side_input_id(1, 's9')))
+
+ def test_hash_two_objects(self):
+ self.assertEqual(hash(CounterName('counter_name',
+ 'stage_name',
+ 'step_name')),
+ hash(CounterName('counter_name',
+ 'stage_name',
+ 'step_name')))
+ self.assertNotEqual(hash(CounterName('counter_name',
+ 'stage_name',
+ 'step_name')),
+ hash(CounterName('counter_name',
+ 'stage_name',
+ 'step_nam')))
+
+
+if __name__ == '__main__':
+ unittest.main()
[2/2] beam git commit: This closes #3747
Posted by al...@apache.org.
This closes #3747
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/260c4fce
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/260c4fce
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/260c4fce
Branch: refs/heads/master
Commit: 260c4fce8dd61c8e50c2bb3f3da671358a773c84
Parents: 5181e61 185daff
Author: Ahmet Altay <al...@google.com>
Authored: Thu Aug 24 09:48:16 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Aug 24 09:48:16 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/utils/counters.py | 58 +++++++++------
sdks/python/apache_beam/utils/counters_test.py | 78 +++++++++++++++++++++
2 files changed, 115 insertions(+), 21 deletions(-)
----------------------------------------------------------------------