You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2018/05/01 19:47:15 UTC

[beam] branch master updated: [BEAM-3042] Refactor of TransformIOCounters (performance, inheritance). (#5075)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a0036d5  [BEAM-3042] Refactor of TransformIOCounters (performance, inheritance). (#5075)
a0036d5 is described below

commit a0036d5ab77be31ed090b3338faf2b32784399e0
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Tue May 1 12:47:03 2018 -0700

    [BEAM-3042] Refactor of TransformIOCounters (performance, inheritance). (#5075)
    
    * Refactor of TransformIOCounters (performance, inheritance).
---
 .../apache_beam/runners/worker/opcounters.pxd      | 15 ++--
 .../apache_beam/runners/worker/opcounters.py       | 90 +++++++++++++---------
 .../apache_beam/runners/worker/operations.py       |  2 +-
 .../apache_beam/runners/worker/sideinputs.py       |  7 +-
 4 files changed, 69 insertions(+), 45 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/opcounters.pxd b/sdks/python/apache_beam/runners/worker/opcounters.pxd
index 40ca72d..0bcd428 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.pxd
+++ b/sdks/python/apache_beam/runners/worker/opcounters.pxd
@@ -22,21 +22,26 @@ from apache_beam.utils.counters cimport Counter
 
 
 cdef class TransformIOCounter(object):
+  cdef readonly object _counter_factory
+  cdef readonly object _state_sampler
+  cdef Counter bytes_read_counter
+  cdef object scoped_state
+  cdef object _latest_step
+
   cpdef update_current_step(self)
   cpdef add_bytes_read(self, libc.stdint.int64_t n)
   cpdef __enter__(self)
   cpdef __exit__(self, exc_type, exc_value, traceback)
 
 
+cdef class NoOpTransformIOCounter(TransformIOCounter):
+  pass
+
+
 cdef class SideInputReadCounter(TransformIOCounter):
-  cdef readonly object _counter_factory
-  cdef readonly object _state_sampler
   cdef readonly object declaring_step
   cdef readonly object input_index
 
-  cdef Counter bytes_read_counter
-  cdef object scoped_state
-
 
 cdef class SumAccumulator(object):
   cdef libc.stdint.int64_t _value
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py b/sdks/python/apache_beam/runners/worker/opcounters.py
index 17fead2..0e4ee0a 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters.py
@@ -41,24 +41,66 @@ class TransformIOCounter(object):
   Some examples of IO can be side inputs, shuffle, or streaming state.
   """
 
+  def __init__(self, counter_factory, state_sampler):
+    """Create a new IO read counter.
+
+    Args:
+      counter_factory: A counters.CounterFactory to create byte counters.
+      state_sampler: A statesampler.StateSampler to transition into read states.
+    """
+    self._counter_factory = counter_factory
+    self._state_sampler = state_sampler
+    self._latest_step = None
+    self.bytes_read_counter = None
+    self.scoped_state = None
+
   def update_current_step(self):
-    """Update the current step within a stage as it may have changed.
+    """Update the current running step.
 
-    If the state changed, it would mean that an initial step passed a
-    data-accessor (such as a side input / shuffle Iterable) down to the
-    next step in a stage.
+    Due to the fusion optimization, user code may choose to emit the data
+    structure that holds side inputs (Iterable, Dict, or others). This call
+    updates the current step, to attribute the data consumption to the step
+    that is responsible for actual consumption.
+
+    CounterName uses the io_target field for information pertinent to the
+    consumption of IO.
     """
+    current_state = self._state_sampler.current_state()
+    current_step_name = current_state.name.step_name
+    if current_step_name != self._latest_step:
+      self._latest_step = current_step_name
+      self._update_counters_for_requesting_step(current_step_name)
+
+  def _update_counters_for_requesting_step(self, step_name):
     pass
 
   def add_bytes_read(self, count):
-    pass
+    if count > 0 and self.bytes_read_counter:
+      self.bytes_read_counter.update(count)
+
+  def __enter__(self):
+    self.scoped_state.__enter__()
+
+  def __exit__(self, exception_type, exception_value, traceback):
+    self.scoped_state.__exit__(exception_type, exception_value, traceback)
+
+
+class NoOpTransformIOCounter(TransformIOCounter):
+  """All operations for IO tracking are no-ops."""
+
+  def __init__(self):
+    super(NoOpTransformIOCounter, self).__init__(None, None)
 
-  def __exit__(self, exc_type, exc_value, traceback):
-    """Exit the IO state."""
+  def update_current_step(self):
     pass
 
   def __enter__(self):
-    """Enter the IO state. This should track time spent blocked on IO."""
+    pass
+
+  def __exit__(self, exception_type, exception_value, traceback):
+    pass
+
+  def add_bytes_read(self, count):
     pass
 
 
@@ -93,8 +135,7 @@ class SideInputReadCounter(TransformIOCounter):
     side input, and input_index is the index of the PCollectionView within
     the list of inputs.
     """
-    self._counter_factory = counter_factory
-    self._state_sampler = state_sampler
+    super(SideInputReadCounter, self).__init__(counter_factory, state_sampler)
     self.declaring_step = declaring_step
     self.input_index = input_index
 
@@ -102,40 +143,19 @@ class SideInputReadCounter(TransformIOCounter):
     # step. We check the current state to create the internal counters.
     self.update_current_step()
 
-  def update_current_step(self):
-    """Update the current running step.
-
-    Due to the fusion optimization, user code may choose to emit the data
-    structure that holds side inputs (Iterable, Dict, or others). This call
-    updates the current step, to attribute the data consumption to the step
-    that is responsible for actual consumption.
-
-    CounterName uses the io_target field for information pertinent to the
-    consumption of side inputs.
-    """
-    current_state = self._state_sampler.current_state()
-    operation_name = current_state.name.step_name
+  def _update_counters_for_requesting_step(self, step_name):
+    side_input_id = counters.side_input_id(step_name, self.input_index)
     self.scoped_state = self._state_sampler.scoped_state(
         self.declaring_step,
         'read-sideinput',
-        io_target=counters.side_input_id(operation_name, self.input_index))
+        io_target=side_input_id)
     self.bytes_read_counter = self._counter_factory.get_counter(
         CounterName(
             'read-sideinput-byte-count',
             step_name=self.declaring_step,
-            io_target=counters.side_input_id(operation_name, self.input_index)),
+            io_target=side_input_id),
         Counter.SUM)
 
-  def add_bytes_read(self, count):
-    if count > 0:
-      self.bytes_read_counter.update(count)
-
-  def __enter__(self):
-    self.scoped_state.__enter__()
-
-  def __exit__(self, exception_type, exception_value, traceback):
-    self.scoped_state.__exit__(exception_type, exception_value, traceback)
-
 
 class SumAccumulator(object):
   """Accumulator for collecting byte counts."""
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index b5a75a5..e30effc 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -336,7 +336,7 @@ class DoOperation(Operation):
               # Inputs are 1-indexed, so we add 1 to i in the side input id
               input_index=i + 1)
         else:
-          si_counter = opcounters.TransformIOCounter()
+          si_counter = opcounters.NoOpTransformIOCounter()
       iterator_fn = sideinputs.get_iterator_fn_for_sources(
           sources, read_counter=si_counter)
 
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py
index 7715785..d2599fd 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -73,16 +73,15 @@ class PrefetchingSourceSetIterable(object):
     # Whether an error was encountered in any source reader.
     self.has_errored = False
 
-    self.read_counter = read_counter or opcounters.TransformIOCounter()
-
+    self.read_counter = read_counter or opcounters.NoOpTransformIOCounter()
     self.reader_threads = []
     self._start_reader_threads()
 
   def add_byte_counter(self, reader):
     """Adds byte counter observer to a side input reader.
 
-    If the 'sideinput_io_metrics' experiment flag is not passed in, then nothing
-    is attached to the reader.
+    If the 'sideinput_io_metrics' experiment flag is not passed in, then
+    nothing is attached to the reader.
 
     Args:
       reader: A reader that should inherit from ObservableMixin to have

-- 
To stop receiving notification emails like this one, please contact
pabloem@apache.org.