You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2018/05/01 19:47:15 UTC
[beam] branch master updated: [BEAM-3042] Refactor of
TransformIOCounters (performance, inheritance). (#5075)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a0036d5 [BEAM-3042] Refactor of TransformIOCounters (performance, inheritance). (#5075)
a0036d5 is described below
commit a0036d5ab77be31ed090b3338faf2b32784399e0
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Tue May 1 12:47:03 2018 -0700
[BEAM-3042] Refactor of TransformIOCounters (performance, inheritance). (#5075)
* Refactor of TransformIOCounters (performance, inheritance).
---
.../apache_beam/runners/worker/opcounters.pxd | 15 ++--
.../apache_beam/runners/worker/opcounters.py | 90 +++++++++++++---------
.../apache_beam/runners/worker/operations.py | 2 +-
.../apache_beam/runners/worker/sideinputs.py | 7 +-
4 files changed, 69 insertions(+), 45 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.pxd b/sdks/python/apache_beam/runners/worker/opcounters.pxd
index 40ca72d..0bcd428 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.pxd
+++ b/sdks/python/apache_beam/runners/worker/opcounters.pxd
@@ -22,21 +22,26 @@ from apache_beam.utils.counters cimport Counter
cdef class TransformIOCounter(object):
+ cdef readonly object _counter_factory
+ cdef readonly object _state_sampler
+ cdef Counter bytes_read_counter
+ cdef object scoped_state
+ cdef object _latest_step
+
cpdef update_current_step(self)
cpdef add_bytes_read(self, libc.stdint.int64_t n)
cpdef __enter__(self)
cpdef __exit__(self, exc_type, exc_value, traceback)
+cdef class NoOpTransformIOCounter(TransformIOCounter):
+ pass
+
+
cdef class SideInputReadCounter(TransformIOCounter):
- cdef readonly object _counter_factory
- cdef readonly object _state_sampler
cdef readonly object declaring_step
cdef readonly object input_index
- cdef Counter bytes_read_counter
- cdef object scoped_state
-
cdef class SumAccumulator(object):
cdef libc.stdint.int64_t _value
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py b/sdks/python/apache_beam/runners/worker/opcounters.py
index 17fead2..0e4ee0a 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters.py
@@ -41,24 +41,66 @@ class TransformIOCounter(object):
Some examples of IO can be side inputs, shuffle, or streaming state.
"""
+ def __init__(self, counter_factory, state_sampler):
+ """Create a new IO read counter.
+
+ Args:
+ counter_factory: A counters.CounterFactory to create byte counters.
+ state_sampler: A statesampler.StateSampler to transition into read states.
+ """
+ self._counter_factory = counter_factory
+ self._state_sampler = state_sampler
+ self._latest_step = None
+ self.bytes_read_counter = None
+ self.scoped_state = None
+
def update_current_step(self):
- """Update the current step within a stage as it may have changed.
+ """Update the current running step.
- If the state changed, it would mean that an initial step passed a
- data-accessor (such as a side input / shuffle Iterable) down to the
- next step in a stage.
+ Due to the fusion optimization, user code may choose to emit the data
+ structure that holds side inputs (Iterable, Dict, or others). This call
+ updates the current step, to attribute the data consumption to the step
+ that is responsible for actual consumption.
+
+ CounterName uses the io_target field for information pertinent to the
+ consumption of IO.
"""
+ current_state = self._state_sampler.current_state()
+ current_step_name = current_state.name.step_name
+ if current_step_name != self._latest_step:
+ self._latest_step = current_step_name
+ self._update_counters_for_requesting_step(current_step_name)
+
+ def _update_counters_for_requesting_step(self, step_name):
pass
def add_bytes_read(self, count):
- pass
+ if count > 0 and self.bytes_read_counter:
+ self.bytes_read_counter.update(count)
+
+ def __enter__(self):
+ self.scoped_state.__enter__()
+
+ def __exit__(self, exception_type, exception_value, traceback):
+ self.scoped_state.__exit__(exception_type, exception_value, traceback)
+
+
+class NoOpTransformIOCounter(TransformIOCounter):
+ """All operations for IO tracking are no-ops."""
+
+ def __init__(self):
+ super(NoOpTransformIOCounter, self).__init__(None, None)
- def __exit__(self, exc_type, exc_value, traceback):
- """Exit the IO state."""
+ def update_current_step(self):
pass
def __enter__(self):
- """Enter the IO state. This should track time spent blocked on IO."""
+ pass
+
+ def __exit__(self, exception_type, exception_value, traceback):
+ pass
+
+ def add_bytes_read(self, count):
pass
@@ -93,8 +135,7 @@ class SideInputReadCounter(TransformIOCounter):
side input, and input_index is the index of the PCollectionView within
the list of inputs.
"""
- self._counter_factory = counter_factory
- self._state_sampler = state_sampler
+ super(SideInputReadCounter, self).__init__(counter_factory, state_sampler)
self.declaring_step = declaring_step
self.input_index = input_index
@@ -102,40 +143,19 @@ class SideInputReadCounter(TransformIOCounter):
# step. We check the current state to create the internal counters.
self.update_current_step()
- def update_current_step(self):
- """Update the current running step.
-
- Due to the fusion optimization, user code may choose to emit the data
- structure that holds side inputs (Iterable, Dict, or others). This call
- updates the current step, to attribute the data consumption to the step
- that is responsible for actual consumption.
-
- CounterName uses the io_target field for information pertinent to the
- consumption of side inputs.
- """
- current_state = self._state_sampler.current_state()
- operation_name = current_state.name.step_name
+ def _update_counters_for_requesting_step(self, step_name):
+ side_input_id = counters.side_input_id(step_name, self.input_index)
self.scoped_state = self._state_sampler.scoped_state(
self.declaring_step,
'read-sideinput',
- io_target=counters.side_input_id(operation_name, self.input_index))
+ io_target=side_input_id)
self.bytes_read_counter = self._counter_factory.get_counter(
CounterName(
'read-sideinput-byte-count',
step_name=self.declaring_step,
- io_target=counters.side_input_id(operation_name, self.input_index)),
+ io_target=side_input_id),
Counter.SUM)
- def add_bytes_read(self, count):
- if count > 0:
- self.bytes_read_counter.update(count)
-
- def __enter__(self):
- self.scoped_state.__enter__()
-
- def __exit__(self, exception_type, exception_value, traceback):
- self.scoped_state.__exit__(exception_type, exception_value, traceback)
-
class SumAccumulator(object):
"""Accumulator for collecting byte counts."""
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index b5a75a5..e30effc 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -336,7 +336,7 @@ class DoOperation(Operation):
# Inputs are 1-indexed, so we add 1 to i in the side input id
input_index=i + 1)
else:
- si_counter = opcounters.TransformIOCounter()
+ si_counter = opcounters.NoOpTransformIOCounter()
iterator_fn = sideinputs.get_iterator_fn_for_sources(
sources, read_counter=si_counter)
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py
index 7715785..d2599fd 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -73,16 +73,15 @@ class PrefetchingSourceSetIterable(object):
# Whether an error was encountered in any source reader.
self.has_errored = False
- self.read_counter = read_counter or opcounters.TransformIOCounter()
-
+ self.read_counter = read_counter or opcounters.NoOpTransformIOCounter()
self.reader_threads = []
self._start_reader_threads()
def add_byte_counter(self, reader):
"""Adds byte counter observer to a side input reader.
- If the 'sideinput_io_metrics' experiment flag is not passed in, then nothing
- is attached to the reader.
+ If the 'sideinput_io_metrics' experiment flag is not passed in, then
+ nothing is attached to the reader.
Args:
reader: A reader that should inherit from ObservableMixin to have
--
To stop receiving notification emails like this one, please contact
pabloem@apache.org.