You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2018/05/09 15:39:36 UTC
[beam] branch master updated: StateSampler knows the execution
thread it tracks.
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1e40357 StateSampler knows the execution thread it tracks.
1e40357 is described below
commit 1e40357edf3e9940bb898bcd7b6c5516d685f908
Author: Pablo <pa...@google.com>
AuthorDate: Mon May 7 16:53:46 2018 -0700
StateSampler knows the execution thread it tracks.
---
sdks/python/apache_beam/runners/worker/statesampler.py | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py b/sdks/python/apache_beam/runners/worker/statesampler.py
index d398092..8a00079 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler.py
@@ -47,7 +47,10 @@ def get_current_tracker():
StateSamplerInfo = namedtuple(
'StateSamplerInfo',
- ['state_name', 'transition_count', 'time_since_transition'])
+ ['state_name',
+ 'transition_count',
+ 'time_since_transition',
+ 'tracked_thread'])
# Default period for sampling current state of pipeline execution.
@@ -63,6 +66,7 @@ class StateSampler(statesampler_impl.StateSampler):
self._counter_factory = counter_factory
self._states_by_name = {}
self.sampling_period_ms = sampling_period_ms
+ self.tracked_thread = None
super(StateSampler, self).__init__(sampling_period_ms)
def stop_if_still_running(self):
@@ -70,6 +74,7 @@ class StateSampler(statesampler_impl.StateSampler):
self.stop()
def start(self):
+ self.tracked_thread = threading.current_thread()
set_current_tracker(self)
execution.metrics_startup()
super(StateSampler, self).start()
@@ -80,7 +85,8 @@ class StateSampler(statesampler_impl.StateSampler):
return StateSamplerInfo(
self.current_state().name,
self.state_transition_count,
- self.time_since_transition)
+ self.time_since_transition,
+ self.tracked_thread)
def scoped_state(self,
step_name,
--
To stop receiving notification emails like this one, please contact
pabloem@apache.org.