You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/24 16:48:27 UTC

[1/2] beam git commit: Adding IOTargetName and unittests for CounterName

Repository: beam
Updated Branches:
  refs/heads/master 5181e619f -> 260c4fce8


Adding IOTargetName and unittests for CounterName


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/185daffa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/185daffa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/185daffa

Branch: refs/heads/master
Commit: 185daffa53595b3a6d900252d69132c85013aa4c
Parents: 5181e61
Author: Pablo <pa...@google.com>
Authored: Tue Aug 22 16:53:32 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Aug 24 09:47:40 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/utils/counters.py      | 58 +++++++++------
 sdks/python/apache_beam/utils/counters_test.py | 78 +++++++++++++++++++++
 2 files changed, 115 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/185daffa/sdks/python/apache_beam/utils/counters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/counters.py b/sdks/python/apache_beam/utils/counters.py
index 5d029dc..08685aa 100644
--- a/sdks/python/apache_beam/utils/counters.py
+++ b/sdks/python/apache_beam/utils/counters.py
@@ -23,40 +23,56 @@
 For internal use only; no backwards-compatibility guarantees.
 """
 
+from collections import namedtuple
 import threading
+
 from apache_beam.transforms import cy_combiners
 
 
-class CounterName(object):
+# Information identifying the IO being measured by a counter.
+IOTargetName = namedtuple('IOTargetName', ['side_input_step_name',
+                                           'side_input_index',
+                                           'original_shuffle_step_name'])
+
+
+def side_input_id(step_name, input_index):
+  """Create an IOTargetName that identifies the reading of a side input."""
+  return IOTargetName(step_name, input_index, None)
+
+
+def shuffle_id(step_name):
+  """Create an IOTargetName that identifies a GBK step."""
+  return IOTargetName(None, None, step_name)
+
+
+_CounterName = namedtuple('_CounterName', ['name',
+                                           'stage_name',
+                                           'step_name',
+                                           'system_name',
+                                           'namespace',
+                                           'origin',
+                                           'output_index',
+                                           'io_target'])
+
+
+class CounterName(_CounterName):
   """Naming information for a counter."""
   SYSTEM = object()
   USER = object()
 
-  def __init__(self, name, stage_name=None, step_name=None,
-               system_name=None, namespace=None,
-               origin=None, output_index=None):
-    self.name = name
-    self.origin = origin or CounterName.SYSTEM
-    self.namespace = namespace
-    self.stage_name = stage_name
-    self.step_name = step_name
-    self.system_name = system_name
-    self.output_index = output_index
-
-  def __hash__(self):
-    return hash((self.name,
-                 self.origin,
-                 self.namespace,
-                 self.stage_name,
-                 self.step_name,
-                 self.system_name,
-                 self.output_index))
+  def __new__(cls, name, stage_name=None, step_name=None,
+              system_name=None, namespace=None,
+              origin=None, output_index=None, io_target=None):
+    origin = origin or CounterName.SYSTEM
+    return super(CounterName, cls).__new__(cls, name, stage_name, step_name,
+                                           system_name, namespace,
+                                           origin, output_index, io_target)
 
   def __str__(self):
     return '%s' % self._str_internal()
 
   def __repr__(self):
-    return '<%s at %s>' % (self._str_internal(), hex(id(self)))
+    return '<CounterName<%s> at %s>' % (self._str_internal(), hex(id(self)))
 
   def _str_internal(self):
     if self.origin == CounterName.USER:

http://git-wip-us.apache.org/repos/asf/beam/blob/185daffa/sdks/python/apache_beam/utils/counters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/counters_test.py b/sdks/python/apache_beam/utils/counters_test.py
new file mode 100644
index 0000000..37cab88
--- /dev/null
+++ b/sdks/python/apache_beam/utils/counters_test.py
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for counters and counter names."""
+
+from __future__ import absolute_import
+
+import unittest
+
+from apache_beam.utils import counters
+from apache_beam.utils.counters import CounterName
+
+
+class CounterNameTest(unittest.TestCase):
+
+  def test_equal_objects(self):
+    self.assertEqual(CounterName('counter_name',
+                                 'stage_name',
+                                 'step_name'),
+                     CounterName('counter_name',
+                                 'stage_name',
+                                 'step_name'))
+    self.assertNotEqual(CounterName('counter_name',
+                                    'stage_name',
+                                    'step_name'),
+                        CounterName('counter_name',
+                                    'stage_name',
+                                    'step_nam'))
+
+    # Testing objects with an IOTarget.
+    self.assertEqual(CounterName('counter_name',
+                                 'stage_name',
+                                 'step_name',
+                                 io_target=counters.side_input_id(1, 's9')),
+                     CounterName('counter_name',
+                                 'stage_name',
+                                 'step_name',
+                                 io_target=counters.side_input_id(1, 's9')))
+    self.assertNotEqual(CounterName('counter_name',
+                                    'stage_name',
+                                    'step_name',
+                                    io_target=counters.side_input_id(1, 's')),
+                        CounterName('counter_name',
+                                    'stage_name',
+                                    'step_name',
+                                    io_target=counters.side_input_id(1, 's9')))
+
+  def test_hash_two_objects(self):
+    self.assertEqual(hash(CounterName('counter_name',
+                                      'stage_name',
+                                      'step_name')),
+                     hash(CounterName('counter_name',
+                                      'stage_name',
+                                      'step_name')))
+    self.assertNotEqual(hash(CounterName('counter_name',
+                                         'stage_name',
+                                         'step_name')),
+                        hash(CounterName('counter_name',
+                                         'stage_name',
+                                         'step_nam')))
+
+
+if __name__ == '__main__':
+  unittest.main()


[2/2] beam git commit: This closes #3747

Posted by al...@apache.org.
This closes #3747


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/260c4fce
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/260c4fce
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/260c4fce

Branch: refs/heads/master
Commit: 260c4fce8dd61c8e50c2bb3f3da671358a773c84
Parents: 5181e61 185daff
Author: Ahmet Altay <al...@google.com>
Authored: Thu Aug 24 09:48:16 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Aug 24 09:48:16 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/utils/counters.py      | 58 +++++++++------
 sdks/python/apache_beam/utils/counters_test.py | 78 +++++++++++++++++++++
 2 files changed, 115 insertions(+), 21 deletions(-)
----------------------------------------------------------------------