You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2018/05/09 15:39:36 UTC

[beam] branch master updated: StateSampler knows the execution thread it tracks.

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e40357  StateSampler knows the execution thread it tracks.
1e40357 is described below

commit 1e40357edf3e9940bb898bcd7b6c5516d685f908
Author: Pablo <pa...@google.com>
AuthorDate: Mon May 7 16:53:46 2018 -0700

    StateSampler knows the execution thread it tracks.
---
 sdks/python/apache_beam/runners/worker/statesampler.py | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/statesampler.py b/sdks/python/apache_beam/runners/worker/statesampler.py
index d398092..8a00079 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler.py
@@ -47,7 +47,10 @@ def get_current_tracker():
 
 StateSamplerInfo = namedtuple(
     'StateSamplerInfo',
-    ['state_name', 'transition_count', 'time_since_transition'])
+    ['state_name',
+     'transition_count',
+     'time_since_transition',
+     'tracked_thread'])
 
 
 # Default period for sampling current state of pipeline execution.
@@ -63,6 +66,7 @@ class StateSampler(statesampler_impl.StateSampler):
     self._counter_factory = counter_factory
     self._states_by_name = {}
     self.sampling_period_ms = sampling_period_ms
+    self.tracked_thread = None
     super(StateSampler, self).__init__(sampling_period_ms)
 
   def stop_if_still_running(self):
@@ -70,6 +74,7 @@ class StateSampler(statesampler_impl.StateSampler):
       self.stop()
 
   def start(self):
+    self.tracked_thread = threading.current_thread()
     set_current_tracker(self)
     execution.metrics_startup()
     super(StateSampler, self).start()
@@ -80,7 +85,8 @@ class StateSampler(statesampler_impl.StateSampler):
     return StateSamplerInfo(
         self.current_state().name,
         self.state_transition_count,
-        self.time_since_transition)
+        self.time_since_transition,
+        self.tracked_thread)
 
   def scoped_state(self,
                    step_name,

-- 
To stop receiving notification emails like this one, please contact
pabloem@apache.org.