You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yi...@apache.org on 2021/10/14 16:04:43 UTC

[beam] branch master updated: Use daemon thread to report lull operation and move code to FnApiWorkerStatusHandler

This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d296688  Use daemon thread to report lull operation and move code to FnApiWorkerStatusHandler
     new bd0ac38  Merge pull request #15676 from y1chi/lull_logging
d296688 is described below

commit d2966886ae3d990056a71257d062e6067134675e
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Wed Oct 6 17:51:58 2021 -0700

    Use daemon thread to report lull operation and move code to FnApiWorkerStatusHandler
---
 .../portability/fn_api_runner/fn_runner_test.py    | 20 ------
 .../apache_beam/runners/worker/sdk_worker.py       | 76 ----------------------
 .../apache_beam/runners/worker/sdk_worker_test.py  | 62 ------------------
 .../apache_beam/runners/worker/worker_status.py    | 76 +++++++++++++++++++++-
 .../runners/worker/worker_status_test.py           | 41 ++++++++++++
 5 files changed, 116 insertions(+), 159 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index c002963..ece4f28 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -58,13 +58,11 @@ from apache_beam.runners.portability import fn_api_runner
 from apache_beam.runners.portability.fn_api_runner import fn_runner
 from apache_beam.runners.sdf_utils import RestrictionTrackerView
 from apache_beam.runners.worker import data_plane
-from apache_beam.runners.worker import sdk_worker
 from apache_beam.runners.worker import statesampler
 from apache_beam.testing.synthetic_pipeline import SyntheticSDFAsSource
 from apache_beam.testing.test_stream import TestStream
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
-from apache_beam.tools import utils
 from apache_beam.transforms import environments
 from apache_beam.transforms import userstate
 from apache_beam.transforms import window
@@ -2002,24 +2000,6 @@ class FnApiBasedLullLoggingTest(unittest.TestCase):
             default(),
             progress_request_frequency=0.5))
 
-  def test_lull_logging(self):
-
-    try:
-      utils.check_compiled('apache_beam.runners.worker.opcounters')
-    except RuntimeError:
-      self.skipTest('Cython is not available')
-
-    with self.assertLogs(level='WARNING') as logs:
-      with self.create_pipeline() as p:
-        sdk_worker.DEFAULT_LOG_LULL_TIMEOUT_NS = 1000 * 1000  # Lull after 1 ms
-
-        _ = (p | beam.Create([1]) | beam.Map(time.sleep))
-
-    self.assertRegex(
-        ''.join(logs.output),
-        '.*Operation ongoing for over.*',
-        'Unable to find a lull logged for this job.')
-
 
 class StateBackedTestElementType(object):
   live_element_count = 0
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 6fdca4d..0951401 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -63,7 +63,6 @@ from apache_beam.runners.worker.data_plane import PeriodicThread
 from apache_beam.runners.worker.statecache import StateCache
 from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
 from apache_beam.runners.worker.worker_status import FnApiWorkerStatusHandler
-from apache_beam.runners.worker.worker_status import thread_dump
 from apache_beam.utils import thread_pool_executor
 from apache_beam.utils.sentinel import Sentinel
 
@@ -77,19 +76,8 @@ _VT = TypeVar('_VT')
 
 _LOGGER = logging.getLogger(__name__)
 
-# This SDK harness will (by default), log a "lull" in processing if it sees no
-# transitions in over 5 minutes.
-# 5 minutes * 60 seconds * 1000 millis * 1000 micros * 1000 nanoseconds
-DEFAULT_LOG_LULL_TIMEOUT_NS = 5 * 60 * 1000 * 1000 * 1000
-
 DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S = 60
 
-# Full thread dump is performed at most every 20 minutes.
-LOG_LULL_FULL_THREAD_DUMP_INTERVAL_S = 20 * 60
-
-# Full thread dump is performed if the lull is more than 20 minutes.
-LOG_LULL_FULL_THREAD_DUMP_LULL_S = 20 * 60
-
 # The number of ProcessBundleRequest instruction ids the BundleProcessorCache
 # will remember for not running instructions.
 MAX_KNOWN_NOT_RUNNING_INSTRUCTIONS = 1000
@@ -578,16 +566,11 @@ class SdkWorker(object):
       bundle_processor_cache,  # type: BundleProcessorCache
       state_cache_metrics_fn=list,  # type: Callable[[], Iterable[metrics_pb2.MonitoringInfo]]
       profiler_factory=None,  # type: Optional[Callable[..., Profile]]
-      log_lull_timeout_ns=None,  # type: Optional[int]
   ):
     # type: (...) -> None
     self.bundle_processor_cache = bundle_processor_cache
     self.state_cache_metrics_fn = state_cache_metrics_fn
     self.profiler_factory = profiler_factory
-    self.log_lull_timeout_ns = (
-        log_lull_timeout_ns or DEFAULT_LOG_LULL_TIMEOUT_NS)
-    self._last_full_thread_dump_secs = 0.0
-    self._last_lull_logged_secs = 0.0
 
   def do_instruction(self, request):
     # type: (beam_fn_api_pb2.InstructionRequest) -> beam_fn_api_pb2.InstructionResponse
@@ -674,64 +657,6 @@ class SdkWorker(object):
         instruction_id=instruction_id,
         process_bundle_split=process_bundle_split)
 
-  def _log_lull_in_bundle_processor(self, processor):
-    # type: (bundle_processor.BundleProcessor) -> None
-    sampler_info = processor.state_sampler.get_info()
-    self._log_lull_sampler_info(sampler_info)
-
-  def _log_lull_sampler_info(self, sampler_info):
-    # type: (statesampler.StateSamplerInfo) -> None
-    if (sampler_info and sampler_info.time_since_transition and
-        sampler_info.time_since_transition > self.log_lull_timeout_ns and
-        self._passed_lull_timeout_since_last_log()):
-      step_name = sampler_info.state_name.step_name
-      state_name = sampler_info.state_name.name
-      lull_seconds = sampler_info.time_since_transition / 1e9
-      state_lull_log = (
-          'Operation ongoing for over %.2f seconds in state %s' %
-          (lull_seconds, state_name))
-      step_name_log = (' in step %s ' % step_name) if step_name else ''
-
-      exec_thread = getattr(sampler_info, 'tracked_thread', None)
-      if exec_thread is not None:
-        thread_frame = sys._current_frames().get(exec_thread.ident)  # pylint: disable=protected-access
-        stack_trace = '\n'.join(
-            traceback.format_stack(thread_frame)) if thread_frame else ''
-      else:
-        stack_trace = '-NOT AVAILABLE-'
-
-      _LOGGER.warning(
-          '%s%s without returning. Current Traceback:\n%s',
-          state_lull_log,
-          step_name_log,
-          stack_trace)
-
-      if self._should_log_full_thread_dump(lull_seconds):
-        self._log_full_thread_dump()
-
-  def _passed_lull_timeout_since_last_log(self) -> bool:
-    if (time.time() - self._last_lull_logged_secs >
-        self.log_lull_timeout_ns / 1e9):
-      self._last_lull_logged_secs = time.time()
-      return True
-    else:
-      return False
-
-  def _should_log_full_thread_dump(self, lull_seconds):
-    # type: (float) -> bool
-    if lull_seconds < LOG_LULL_FULL_THREAD_DUMP_LULL_S:
-      return False
-    now = time.time()
-    if (self._last_full_thread_dump_secs + LOG_LULL_FULL_THREAD_DUMP_INTERVAL_S
-        < now):
-      self._last_full_thread_dump_secs = now
-      return True
-    return False
-
-  def _log_full_thread_dump(self):
-    # type: () -> None
-    thread_dump()
-
   def process_bundle_progress(
       self,
       request,  # type: beam_fn_api_pb2.ProcessBundleProgressRequest
@@ -744,7 +669,6 @@ class SdkWorker(object):
       return beam_fn_api_pb2.InstructionResponse(
           instruction_id=instruction_id, error=traceback.format_exc())
     if processor:
-      self._log_lull_in_bundle_processor(processor)
       monitoring_infos = processor.monitoring_infos()
     else:
       # Return an empty response if we aren't running. This can happen
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index 92a2f72..d7309c1 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -21,8 +21,6 @@
 
 import contextlib
 import logging
-import threading
-import time
 import unittest
 from collections import namedtuple
 
@@ -40,12 +38,10 @@ from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.portability.api import metrics_pb2
 from apache_beam.runners.worker import sdk_worker
 from apache_beam.runners.worker import statecache
-from apache_beam.runners.worker import statesampler
 from apache_beam.runners.worker.sdk_worker import BundleProcessorCache
 from apache_beam.runners.worker.sdk_worker import GlobalCachingStateHandler
 from apache_beam.runners.worker.sdk_worker import SdkWorker
 from apache_beam.utils import thread_pool_executor
-from apache_beam.utils.counters import CounterName
 
 _LOGGER = logging.getLogger(__name__)
 
@@ -127,13 +123,6 @@ class SdkWorkerTest(unittest.TestCase):
   def test_fn_registration(self):
     self._check_fn_registration_multi_request((1, 4), (4, 4))
 
-  def _get_state_sampler_info_for_lull(self, lull_duration_s):
-    return statesampler.StateSamplerInfo(
-        CounterName('progress-msecs', 'stage_name', 'step_name'),
-        1,
-        lull_duration_s * 1e9,
-        threading.current_thread())
-
   def test_inactive_bundle_processor_returns_empty_progress_response(self):
     bundle_processor = mock.MagicMock()
     bundle_processor_cache = BundleProcessorCache(None, None, {})
@@ -283,57 +272,6 @@ class SdkWorkerTest(unittest.TestCase):
         hc.contains_string(
             'Bundle processing associated with instruction_id has failed'))
 
-  def test_log_lull_in_bundle_processor(self):
-    bundle_processor_cache = mock.MagicMock()
-    worker = SdkWorker(bundle_processor_cache)
-
-    now = time.time()
-    log_full_thread_dump_fn_name = \
-        'apache_beam.runners.worker.sdk_worker.SdkWorker._log_full_thread_dump'
-    with mock.patch('logging.Logger.warning') as warn_mock:
-      with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump:
-        with mock.patch('time.time') as time_mock:
-          time_mock.return_value = now
-          sampler_info = self._get_state_sampler_info_for_lull(21 * 60)
-          worker._log_lull_sampler_info(sampler_info)
-
-          processing_template = warn_mock.call_args[0][1]
-          step_name_template = warn_mock.call_args[0][2]
-          traceback = warn_mock.call_args = warn_mock.call_args[0][3]
-
-          self.assertIn('progress-msecs', processing_template)
-          self.assertIn('step_name', step_name_template)
-          self.assertIn('test_log_lull_in_bundle_processor', traceback)
-
-          log_full_thread_dump.assert_called_once_with()
-
-    with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump:
-      with mock.patch('time.time') as time_mock:
-        time_mock.return_value = now + 6 * 60  # 6 minutes
-        sampler_info = self._get_state_sampler_info_for_lull(21 * 60)
-        worker._log_lull_sampler_info(sampler_info)
-        self.assertFalse(
-            log_full_thread_dump.called,
-            'log_full_thread_dump should not be called because only 6 minutes '
-            'have passed since the last dump.')
-
-    with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump:
-      with mock.patch('time.time') as time_mock:
-        time_mock.return_value = now + 21 * 60  # 21 minutes
-        sampler_info = self._get_state_sampler_info_for_lull(10 * 60)
-        worker._log_lull_sampler_info(sampler_info)
-        self.assertFalse(
-            log_full_thread_dump.called,
-            'log_full_thread_dump should not be called because lull is only '
-            'for 10 minutes.')
-
-    with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump:
-      with mock.patch('time.time') as time_mock:
-        time_mock.return_value = now + 42 * 60  # 21 minutes after previous one
-        sampler_info = self._get_state_sampler_info_for_lull(21 * 60)
-        worker._log_lull_sampler_info(sampler_info)
-        log_full_thread_dump.assert_called_once_with()
-
 
 class CachingStateHandlerTest(unittest.TestCase):
   def test_caching(self):
diff --git a/sdks/python/apache_beam/runners/worker/worker_status.py b/sdks/python/apache_beam/runners/worker/worker_status.py
index b699f07..652b01d 100644
--- a/sdks/python/apache_beam/runners/worker/worker_status.py
+++ b/sdks/python/apache_beam/runners/worker/worker_status.py
@@ -17,9 +17,11 @@
 
 """Worker status api handler for reporting SDK harness debug info."""
 
+import logging
 import queue
 import sys
 import threading
+import time
 import traceback
 from collections import defaultdict
 
@@ -36,6 +38,19 @@ try:
 except ImportError:
   hpy = None
 
+_LOGGER = logging.getLogger(__name__)
+
+# This SDK harness will (by default), log a "lull" in processing if it sees no
+# transitions in over 5 minutes.
+# 5 minutes * 60 seconds * 1000 millis * 1000 micros * 1000 nanoseconds
+DEFAULT_LOG_LULL_TIMEOUT_NS = 5 * 60 * 1000 * 1000 * 1000
+
+# Full thread dump is performed at most every 20 minutes.
+LOG_LULL_FULL_THREAD_DUMP_INTERVAL_S = 20 * 60
+
+# Full thread dump is performed if the lull is more than 20 minutes.
+LOG_LULL_FULL_THREAD_DUMP_LULL_S = 20 * 60
+
 
 def thread_dump():
   """Get a thread dump for the current SDK worker harness. """
@@ -120,7 +135,11 @@ DONE = Sentinel.sentinel
 class FnApiWorkerStatusHandler(object):
   """FnApiWorkerStatusHandler handles worker status request from Runner. """
   def __init__(
-      self, status_address, bundle_process_cache=None, enable_heap_dump=False):
+      self,
+      status_address,
+      bundle_process_cache=None,
+      enable_heap_dump=False,
+      log_lull_timeout_ns=DEFAULT_LOG_LULL_TIMEOUT_NS):
     """Initialize FnApiWorkerStatusHandler.
 
     Args:
@@ -135,11 +154,20 @@ class FnApiWorkerStatusHandler(object):
     self._status_stub = beam_fn_api_pb2_grpc.BeamFnWorkerStatusStub(
         self._status_channel)
     self._responses = queue.Queue()
+    self.log_lull_timeout_ns = log_lull_timeout_ns
+    self._last_full_thread_dump_secs = 0.0
+    self._last_lull_logged_secs = 0.0
     self._server = threading.Thread(
         target=lambda: self._serve(), name='fn_api_status_handler')
     self._server.daemon = True
     self._enable_heap_dump = enable_heap_dump
     self._server.start()
+    self._lull_logger = threading.Thread(
+        target=lambda: self._log_lull_in_bundle_processor(
+            self._bundle_process_cache),
+        name='lull_operation_logger')
+    self._lull_logger.daemon = True
+    self._lull_logger.start()
 
   def _get_responses(self):
     while True:
@@ -177,3 +205,49 @@ class FnApiWorkerStatusHandler(object):
 
   def close(self):
     self._responses.put(DONE, timeout=5)
+
+  def _log_lull_in_bundle_processor(self, bundle_process_cache):
+    while True:
+      time.sleep(2 * 60)
+      if bundle_process_cache.active_bundle_processors:
+        for instruction in list(
+            bundle_process_cache.active_bundle_processors.keys()):
+          processor = bundle_process_cache.lookup(instruction)
+          if processor:
+            info = processor.state_sampler.get_info()
+            self._log_lull_sampler_info(info)
+
+  def _log_lull_sampler_info(self, sampler_info):
+    if not self._passed_lull_timeout_since_last_log():
+      return
+    if (sampler_info and sampler_info.time_since_transition and
+        sampler_info.time_since_transition > self.log_lull_timeout_ns):
+      step_name = sampler_info.state_name.step_name
+      state_name = sampler_info.state_name.name
+      lull_seconds = sampler_info.time_since_transition / 1e9
+      state_lull_log = (
+          'Operation ongoing for over %.2f seconds in state %s' %
+          (lull_seconds, state_name))
+      step_name_log = (' in step %s ' % step_name) if step_name else ''
+
+      exec_thread = getattr(sampler_info, 'tracked_thread', None)
+      if exec_thread is not None:
+        thread_frame = sys._current_frames().get(exec_thread.ident)  # pylint: disable=protected-access
+        stack_trace = '\n'.join(
+            traceback.format_stack(thread_frame)) if thread_frame else ''
+      else:
+        stack_trace = '-NOT AVAILABLE-'
+
+      _LOGGER.warning(
+          '%s%s without returning. Current Traceback:\n%s',
+          state_lull_log,
+          step_name_log,
+          stack_trace)
+
+  def _passed_lull_timeout_since_last_log(self) -> bool:
+    if (time.time() - self._last_lull_logged_secs >
+        self.log_lull_timeout_ns / 1e9):
+      self._last_lull_logged_secs = time.time()
+      return True
+    else:
+      return False
diff --git a/sdks/python/apache_beam/runners/worker/worker_status_test.py b/sdks/python/apache_beam/runners/worker/worker_status_test.py
index 2b4c7dc..7da5339 100644
--- a/sdks/python/apache_beam/runners/worker/worker_status_test.py
+++ b/sdks/python/apache_beam/runners/worker/worker_status_test.py
@@ -17,6 +17,7 @@
 
 import logging
 import threading
+import time
 import unittest
 
 import grpc
@@ -24,9 +25,11 @@ import mock
 
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import statesampler
 from apache_beam.runners.worker.worker_status import FnApiWorkerStatusHandler
 from apache_beam.runners.worker.worker_status import heap_dump
 from apache_beam.utils import thread_pool_executor
+from apache_beam.utils.counters import CounterName
 
 
 class BeamFnStatusServicer(beam_fn_api_pb2_grpc.BeamFnWorkerStatusServicer):
@@ -83,6 +86,44 @@ class FnApiWorkerStatusHandlerTest(unittest.TestCase):
       self.assertIsNotNone(response.error)
     self.fn_status_handler.close()
 
+  def test_log_lull_in_bundle_processor(self):
+    def get_state_sampler_info_for_lull(lull_duration_s):
+      return statesampler.StateSamplerInfo(
+          CounterName('progress-msecs', 'stage_name', 'step_name'),
+          1,
+          lull_duration_s * 1e9,
+          threading.current_thread())
+
+    now = time.time()
+    with mock.patch('logging.Logger.warning') as warn_mock:
+      with mock.patch('time.time') as time_mock:
+        time_mock.return_value = now
+        sampler_info = get_state_sampler_info_for_lull(21 * 60)
+        self.fn_status_handler._log_lull_sampler_info(sampler_info)
+
+        processing_template = warn_mock.call_args[0][1]
+        step_name_template = warn_mock.call_args[0][2]
+        traceback = warn_mock.call_args = warn_mock.call_args[0][3]
+
+        self.assertIn('progress-msecs', processing_template)
+        self.assertIn('step_name', step_name_template)
+        self.assertIn('test_log_lull_in_bundle_processor', traceback)
+
+      with mock.patch('time.time') as time_mock:
+        time_mock.return_value = now + 6 * 60  # 6 minutes
+        sampler_info = get_state_sampler_info_for_lull(21 * 60)
+        self.fn_status_handler._log_lull_sampler_info(sampler_info)
+
+      with mock.patch('time.time') as time_mock:
+        time_mock.return_value = now + 21 * 60  # 21 minutes
+        sampler_info = get_state_sampler_info_for_lull(10 * 60)
+        self.fn_status_handler._log_lull_sampler_info(sampler_info)
+
+      with mock.patch('time.time') as time_mock:
+        time_mock.return_value = now + 42 * 60  # 21 minutes after previous one
+        sampler_info = get_state_sampler_info_for_lull(21 * 60)
+        self.fn_status_handler._log_lull_sampler_info(sampler_info)
+
 
 class HeapDumpTest(unittest.TestCase):
   @mock.patch('apache_beam.runners.worker.worker_status.hpy', None)