You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2020/07/01 17:51:57 UTC
[beam] branch master updated: [BEAM-10291] Adding full thread dump
upon lull detection (#12047)
This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f1eebfa [BEAM-10291] Adding full thread dump upon lull detection (#12047)
f1eebfa is described below
commit f1eebfa32cebad89e8ca629cfc4010c468e1835c
Author: David Yan <da...@apache.org>
AuthorDate: Wed Jul 1 10:51:35 2020 -0700
[BEAM-10291] Adding full thread dump upon lull detection (#12047)
---
.../apache_beam/runners/worker/sdk_worker.py | 26 ++++++++++--
.../apache_beam/runners/worker/sdk_worker_test.py | 49 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index ecf93d7..5cbc498 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -65,6 +65,7 @@ from apache_beam.runners.worker.data_plane import PeriodicThread
from apache_beam.runners.worker.statecache import StateCache
from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
from apache_beam.runners.worker.worker_status import FnApiWorkerStatusHandler
+from apache_beam.runners.worker.worker_status import thread_dump
from apache_beam.utils import thread_pool_executor
if TYPE_CHECKING:
@@ -75,11 +76,14 @@ _LOGGER = logging.getLogger(__name__)
# This SDK harness will (by default), log a "lull" in processing if it sees no
# transitions in over 5 minutes.
-# 5 minutes * 60 seconds * 1020 millis * 1000 micros * 1000 nanoseconds
+# 5 minutes * 60 seconds * 1000 millis * 1000 micros * 1000 nanoseconds
DEFAULT_LOG_LULL_TIMEOUT_NS = 5 * 60 * 1000 * 1000 * 1000
DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S = 60
+# Full thread dump is performed at most every 20 minutes.
+LOG_LULL_FULL_THREAD_DUMP_S = 20 * 60
+
class ShortIdCache(object):
""" Cache for MonitoringInfo "short ids"
@@ -465,6 +469,7 @@ class SdkWorker(object):
self.profiler_factory = profiler_factory
self.log_lull_timeout_ns = (
log_lull_timeout_ns or DEFAULT_LOG_LULL_TIMEOUT_NS)
+ self._last_full_thread_dump_secs = 0
def do_instruction(self, request):
# type: (beam_fn_api_pb2.InstructionRequest) -> beam_fn_api_pb2.InstructionResponse
@@ -545,8 +550,10 @@ class SdkWorker(object):
error='Instruction not running: %s' % instruction_id)
def _log_lull_in_bundle_processor(self, processor):
- state_sampler = processor.state_sampler
- sampler_info = state_sampler.get_info()
+ sampler_info = processor.state_sampler.get_info()
+ self._log_lull_sampler_info(sampler_info)
+
+ def _log_lull_sampler_info(self, sampler_info):
if (sampler_info and sampler_info.time_since_transition and
sampler_info.time_since_transition > self.log_lull_timeout_ns):
step_name = sampler_info.state_name.step_name
@@ -570,6 +577,19 @@ class SdkWorker(object):
step_name_log,
stack_trace)
+ if self._should_log_full_thread_dump():
+ self._log_full_thread_dump()
+
+ def _should_log_full_thread_dump(self):
+ now = time.time()
+ if self._last_full_thread_dump_secs + LOG_LULL_FULL_THREAD_DUMP_S < now:
+ self._last_full_thread_dump_secs = now
+ return True
+ return False
+
+ def _log_full_thread_dump(self):
+ thread_dump()
+
def process_bundle_progress(self,
request, # type: beam_fn_api_pb2.ProcessBundleProgressRequest
instruction_id # type: str
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index 59baf15..a950ded 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -25,11 +25,14 @@ from __future__ import print_function
import contextlib
import logging
+import threading
+import time
import unittest
from builtins import range
from collections import namedtuple
import grpc
+import mock
from apache_beam.coders import VarIntCoder
from apache_beam.portability.api import beam_fn_api_pb2
@@ -38,8 +41,11 @@ from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability.api import metrics_pb2
from apache_beam.runners.worker import sdk_worker
from apache_beam.runners.worker import statecache
+from apache_beam.runners.worker import statesampler
from apache_beam.runners.worker.sdk_worker import CachingStateHandler
+from apache_beam.runners.worker.sdk_worker import SdkWorker
from apache_beam.utils import thread_pool_executor
+from apache_beam.utils.counters import CounterName
_LOGGER = logging.getLogger(__name__)
@@ -121,6 +127,49 @@ class SdkWorkerTest(unittest.TestCase):
def test_fn_registration(self):
self._check_fn_registration_multi_request((1, 4), (4, 4))
+ def test_log_lull_in_bundle_processor(self):
+ bundle_processor_cache = mock.MagicMock()
+ worker = SdkWorker(bundle_processor_cache)
+
+ sampler_info = statesampler.StateSamplerInfo(
+ CounterName('progress-msecs', 'stage_name', 'step_name'),
+ 1,
+ 400000000000,
+ threading.current_thread())
+
+ now = time.time()
+ log_full_thread_dump_fn_name = \
+ 'apache_beam.runners.worker.sdk_worker.SdkWorker._log_full_thread_dump'
+ with mock.patch('logging.Logger.warning') as warn_mock:
+ with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump:
+ with mock.patch('time.time') as time_mock:
+ time_mock.return_value = now
+ worker._log_lull_sampler_info(sampler_info)
+
+ processing_template = warn_mock.call_args[0][1]
+ step_name_template = warn_mock.call_args[0][2]
+ traceback = warn_mock.call_args = warn_mock.call_args[0][3]
+
+ self.assertIn('progress-msecs', processing_template)
+ self.assertIn('step_name', step_name_template)
+ self.assertIn('test_log_lull_in_bundle_processor', traceback)
+
+ log_full_thread_dump.assert_called_once_with()
+
+ with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump:
+ with mock.patch('time.time') as time_mock:
+ time_mock.return_value = now + 6 * 60 # 6 minutes
+ worker._log_lull_sampler_info(sampler_info)
+ self.assertFalse(
+ log_full_thread_dump.called,
+ 'log_full_thread_dump should not be called.')
+
+ with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump:
+ with mock.patch('time.time') as time_mock:
+ time_mock.return_value = now + 21 * 60 # 21 minutes
+ worker._log_lull_sampler_info(sampler_info)
+ log_full_thread_dump.assert_called_once_with()
+
class CachingStateHandlerTest(unittest.TestCase):
def test_caching(self):