You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2020/07/01 17:51:57 UTC

[beam] branch master updated: [BEAM-10291] Adding full thread dump upon lull detection (#12047)

This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f1eebfa  [BEAM-10291] Adding full thread dump upon lull detection (#12047)
f1eebfa is described below

commit f1eebfa32cebad89e8ca629cfc4010c468e1835c
Author: David Yan <da...@apache.org>
AuthorDate: Wed Jul 1 10:51:35 2020 -0700

    [BEAM-10291] Adding full thread dump upon lull detection (#12047)
---
 .../apache_beam/runners/worker/sdk_worker.py       | 26 ++++++++++--
 .../apache_beam/runners/worker/sdk_worker_test.py  | 49 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index ecf93d7..5cbc498 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -65,6 +65,7 @@ from apache_beam.runners.worker.data_plane import PeriodicThread
 from apache_beam.runners.worker.statecache import StateCache
 from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
 from apache_beam.runners.worker.worker_status import FnApiWorkerStatusHandler
+from apache_beam.runners.worker.worker_status import thread_dump
 from apache_beam.utils import thread_pool_executor
 
 if TYPE_CHECKING:
@@ -75,11 +76,14 @@ _LOGGER = logging.getLogger(__name__)
 
 # This SDK harness will (by default), log a "lull" in processing if it sees no
 # transitions in over 5 minutes.
-# 5 minutes * 60 seconds * 1020 millis * 1000 micros * 1000 nanoseconds
+# 5 minutes * 60 seconds * 1000 millis * 1000 micros * 1000 nanoseconds
 DEFAULT_LOG_LULL_TIMEOUT_NS = 5 * 60 * 1000 * 1000 * 1000
 
 DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S = 60
 
+# Full thread dump is performed at most every 20 minutes.
+LOG_LULL_FULL_THREAD_DUMP_S = 20 * 60
+
 
 class ShortIdCache(object):
   """ Cache for MonitoringInfo "short ids"
@@ -465,6 +469,7 @@ class SdkWorker(object):
     self.profiler_factory = profiler_factory
     self.log_lull_timeout_ns = (
         log_lull_timeout_ns or DEFAULT_LOG_LULL_TIMEOUT_NS)
+    self._last_full_thread_dump_secs = 0
 
   def do_instruction(self, request):
     # type: (beam_fn_api_pb2.InstructionRequest) -> beam_fn_api_pb2.InstructionResponse
@@ -545,8 +550,10 @@ class SdkWorker(object):
           error='Instruction not running: %s' % instruction_id)
 
   def _log_lull_in_bundle_processor(self, processor):
-    state_sampler = processor.state_sampler
-    sampler_info = state_sampler.get_info()
+    sampler_info = processor.state_sampler.get_info()
+    self._log_lull_sampler_info(sampler_info)
+
+  def _log_lull_sampler_info(self, sampler_info):
     if (sampler_info and sampler_info.time_since_transition and
         sampler_info.time_since_transition > self.log_lull_timeout_ns):
       step_name = sampler_info.state_name.step_name
@@ -570,6 +577,19 @@ class SdkWorker(object):
           step_name_log,
           stack_trace)
 
+      if self._should_log_full_thread_dump():
+        self._log_full_thread_dump()
+
+  def _should_log_full_thread_dump(self):
+    now = time.time()
+    if self._last_full_thread_dump_secs + LOG_LULL_FULL_THREAD_DUMP_S < now:
+      self._last_full_thread_dump_secs = now
+      return True
+    return False
+
+  def _log_full_thread_dump(self):
+    thread_dump()
+
   def process_bundle_progress(self,
                               request,  # type: beam_fn_api_pb2.ProcessBundleProgressRequest
                               instruction_id  # type: str
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index 59baf15..a950ded 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -25,11 +25,14 @@ from __future__ import print_function
 
 import contextlib
 import logging
+import threading
+import time
 import unittest
 from builtins import range
 from collections import namedtuple
 
 import grpc
+import mock
 
 from apache_beam.coders import VarIntCoder
 from apache_beam.portability.api import beam_fn_api_pb2
@@ -38,8 +41,11 @@ from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.portability.api import metrics_pb2
 from apache_beam.runners.worker import sdk_worker
 from apache_beam.runners.worker import statecache
+from apache_beam.runners.worker import statesampler
 from apache_beam.runners.worker.sdk_worker import CachingStateHandler
+from apache_beam.runners.worker.sdk_worker import SdkWorker
 from apache_beam.utils import thread_pool_executor
+from apache_beam.utils.counters import CounterName
 
 _LOGGER = logging.getLogger(__name__)
 
@@ -121,6 +127,49 @@ class SdkWorkerTest(unittest.TestCase):
   def test_fn_registration(self):
     self._check_fn_registration_multi_request((1, 4), (4, 4))
 
+  def test_log_lull_in_bundle_processor(self):
+    bundle_processor_cache = mock.MagicMock()
+    worker = SdkWorker(bundle_processor_cache)
+
+    sampler_info = statesampler.StateSamplerInfo(
+        CounterName('progress-msecs', 'stage_name', 'step_name'),
+        1,
+        400000000000,
+        threading.current_thread())
+
+    now = time.time()
+    log_full_thread_dump_fn_name = \
+        'apache_beam.runners.worker.sdk_worker.SdkWorker._log_full_thread_dump'
+    with mock.patch('logging.Logger.warning') as warn_mock:
+      with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump:
+        with mock.patch('time.time') as time_mock:
+          time_mock.return_value = now
+          worker._log_lull_sampler_info(sampler_info)
+
+          processing_template = warn_mock.call_args[0][1]
+          step_name_template = warn_mock.call_args[0][2]
+          traceback = warn_mock.call_args = warn_mock.call_args[0][3]
+
+          self.assertIn('progress-msecs', processing_template)
+          self.assertIn('step_name', step_name_template)
+          self.assertIn('test_log_lull_in_bundle_processor', traceback)
+
+          log_full_thread_dump.assert_called_once_with()
+
+    with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump:
+      with mock.patch('time.time') as time_mock:
+        time_mock.return_value = now + 6 * 60  # 6 minutes
+        worker._log_lull_sampler_info(sampler_info)
+        self.assertFalse(
+            log_full_thread_dump.called,
+            'log_full_thread_dump should not be called.')
+
+    with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump:
+      with mock.patch('time.time') as time_mock:
+        time_mock.return_value = now + 21 * 60  # 21 minutes
+        worker._log_lull_sampler_info(sampler_info)
+        log_full_thread_dump.assert_called_once_with()
+
 
 class CachingStateHandlerTest(unittest.TestCase):
   def test_caching(self):