You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/10/19 23:02:35 UTC
[1/2] beam git commit: Adding lull tracking for python sampler
Repository: beam
Updated Branches:
refs/heads/master 41f16123b -> 49c392790
Adding lull tracking for python sampler
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21cdc85c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21cdc85c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21cdc85c
Branch: refs/heads/master
Commit: 21cdc85cfa8a06208a7f0a6736cc7d5886d4c8de
Parents: 41f1612
Author: Pablo <pa...@google.com>
Authored: Thu Oct 19 12:50:46 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Thu Oct 19 16:02:03 2017 -0700
----------------------------------------------------------------------
.../apache_beam/runners/worker/statesampler.pyx | 19 ++++++++++++++++---
1 file changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/21cdc85c/sdks/python/apache_beam/runners/worker/statesampler.pyx
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx b/sdks/python/apache_beam/runners/worker/statesampler.pyx
index f0527c6..1e37196 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.pyx
+++ b/sdks/python/apache_beam/runners/worker/statesampler.pyx
@@ -74,12 +74,16 @@ cdef inline int64_t get_nsec_time() nogil:
class StateSamplerInfo(object):
"""Info for current state and transition statistics of StateSampler."""
- def __init__(self, state_name, transition_count):
+ def __init__(self, state_name, transition_count, time_since_transition):
self.state_name = state_name
self.transition_count = transition_count
+ self.time_since_transition = time_since_transition
def __repr__(self):
- return '<StateSamplerInfo %s %d>' % (self.state_name, self.transition_count)
+ return ('<StateSamplerInfo state: %s time: %dns transitions: %d>'
+ % (self.state_name,
+ self.time_since_transition,
+ self.transition_count))
# Default period for sampling current state of pipeline execution.
@@ -105,6 +109,7 @@ cdef class StateSampler(object):
cdef pythread.PyThread_type_lock lock
cdef public int64_t state_transition_count
+ cdef int64_t time_since_transition
cdef int32_t current_state_index
@@ -122,6 +127,8 @@ cdef class StateSampler(object):
self.scoped_states_by_name = {}
self.current_state_index = 0
+ self.time_since_transition = 0
+ self.state_transition_count = 0
unknown_state = ScopedState(self, 'unknown', self.current_state_index)
pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK)
self.scoped_states_by_index = [unknown_state]
@@ -142,6 +149,7 @@ cdef class StateSampler(object):
def run(self):
cdef int64_t last_nsecs = get_nsec_time()
cdef int64_t elapsed_nsecs
+ cdef int64_t latest_transition_count = self.state_transition_count
with nogil:
while True:
usleep(self.sampling_period_ms * 1000)
@@ -155,6 +163,10 @@ cdef class StateSampler(object):
nsecs_ptr = &(<ScopedState>PyList_GET_ITEM(
self.scoped_states_by_index, self.current_state_index)).nsecs
nsecs_ptr[0] += elapsed_nsecs
+ if latest_transition_count != self.state_transition_count:
+ self.time_since_transition = 0
+ latest_transition_count = self.state_transition_count
+ self.time_since_transition += elapsed_nsecs
last_nsecs += elapsed_nsecs
finally:
pythread.PyThread_release_lock(self.lock)
@@ -182,7 +194,8 @@ cdef class StateSampler(object):
"""Returns StateSamplerInfo with transition statistics."""
return StateSamplerInfo(
self.scoped_states_by_index[self.current_state_index].name,
- self.state_transition_count)
+ self.state_transition_count,
+ self.time_since_transition)
# TODO(pabloem): Make state_name required once all callers migrate,
# and the legacy path is removed.
[2/2] beam git commit: This closes #3936
Posted by ch...@apache.org.
This closes #3936
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/49c39279
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/49c39279
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/49c39279
Branch: refs/heads/master
Commit: 49c3927905d606990ea1d36f1dea0a8c86c11f30
Parents: 41f1612 21cdc85
Author: chamikara@google.com <ch...@google.com>
Authored: Thu Oct 19 16:02:19 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Thu Oct 19 16:02:19 2017 -0700
----------------------------------------------------------------------
.../apache_beam/runners/worker/statesampler.pyx | 19 ++++++++++++++++---
1 file changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------