You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yi...@apache.org on 2021/10/14 16:04:43 UTC
[beam] branch master updated: Use daemon thread to report lull
operation and move code to FnApiWorkerStatusHandler
This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d296688 Use daemon thread to report lull operation and move code to FnApiWorkerStatusHandler
new bd0ac38 Merge pull request #15676 from y1chi/lull_logging
d296688 is described below
commit d2966886ae3d990056a71257d062e6067134675e
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Wed Oct 6 17:51:58 2021 -0700
Use daemon thread to report lull operation and move code to FnApiWorkerStatusHandler
---
.../portability/fn_api_runner/fn_runner_test.py | 20 ------
.../apache_beam/runners/worker/sdk_worker.py | 76 ----------------------
.../apache_beam/runners/worker/sdk_worker_test.py | 62 ------------------
.../apache_beam/runners/worker/worker_status.py | 76 +++++++++++++++++++++-
.../runners/worker/worker_status_test.py | 41 ++++++++++++
5 files changed, 116 insertions(+), 159 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index c002963..ece4f28 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -58,13 +58,11 @@ from apache_beam.runners.portability import fn_api_runner
from apache_beam.runners.portability.fn_api_runner import fn_runner
from apache_beam.runners.sdf_utils import RestrictionTrackerView
from apache_beam.runners.worker import data_plane
-from apache_beam.runners.worker import sdk_worker
from apache_beam.runners.worker import statesampler
from apache_beam.testing.synthetic_pipeline import SyntheticSDFAsSource
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
-from apache_beam.tools import utils
from apache_beam.transforms import environments
from apache_beam.transforms import userstate
from apache_beam.transforms import window
@@ -2002,24 +2000,6 @@ class FnApiBasedLullLoggingTest(unittest.TestCase):
default(),
progress_request_frequency=0.5))
- def test_lull_logging(self):
-
- try:
- utils.check_compiled('apache_beam.runners.worker.opcounters')
- except RuntimeError:
- self.skipTest('Cython is not available')
-
- with self.assertLogs(level='WARNING') as logs:
- with self.create_pipeline() as p:
- sdk_worker.DEFAULT_LOG_LULL_TIMEOUT_NS = 1000 * 1000 # Lull after 1 ms
-
- _ = (p | beam.Create([1]) | beam.Map(time.sleep))
-
- self.assertRegex(
- ''.join(logs.output),
- '.*Operation ongoing for over.*',
- 'Unable to find a lull logged for this job.')
-
class StateBackedTestElementType(object):
live_element_count = 0
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 6fdca4d..0951401 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -63,7 +63,6 @@ from apache_beam.runners.worker.data_plane import PeriodicThread
from apache_beam.runners.worker.statecache import StateCache
from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
from apache_beam.runners.worker.worker_status import FnApiWorkerStatusHandler
-from apache_beam.runners.worker.worker_status import thread_dump
from apache_beam.utils import thread_pool_executor
from apache_beam.utils.sentinel import Sentinel
@@ -77,19 +76,8 @@ _VT = TypeVar('_VT')
_LOGGER = logging.getLogger(__name__)
-# This SDK harness will (by default), log a "lull" in processing if it sees no
-# transitions in over 5 minutes.
-# 5 minutes * 60 seconds * 1000 millis * 1000 micros * 1000 nanoseconds
-DEFAULT_LOG_LULL_TIMEOUT_NS = 5 * 60 * 1000 * 1000 * 1000
-
DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S = 60
-# Full thread dump is performed at most every 20 minutes.
-LOG_LULL_FULL_THREAD_DUMP_INTERVAL_S = 20 * 60
-
-# Full thread dump is performed if the lull is more than 20 minutes.
-LOG_LULL_FULL_THREAD_DUMP_LULL_S = 20 * 60
-
# The number of ProcessBundleRequest instruction ids the BundleProcessorCache
# will remember for not running instructions.
MAX_KNOWN_NOT_RUNNING_INSTRUCTIONS = 1000
@@ -578,16 +566,11 @@ class SdkWorker(object):
bundle_processor_cache, # type: BundleProcessorCache
state_cache_metrics_fn=list, # type: Callable[[], Iterable[metrics_pb2.MonitoringInfo]]
profiler_factory=None, # type: Optional[Callable[..., Profile]]
- log_lull_timeout_ns=None, # type: Optional[int]
):
# type: (...) -> None
self.bundle_processor_cache = bundle_processor_cache
self.state_cache_metrics_fn = state_cache_metrics_fn
self.profiler_factory = profiler_factory
- self.log_lull_timeout_ns = (
- log_lull_timeout_ns or DEFAULT_LOG_LULL_TIMEOUT_NS)
- self._last_full_thread_dump_secs = 0.0
- self._last_lull_logged_secs = 0.0
def do_instruction(self, request):
# type: (beam_fn_api_pb2.InstructionRequest) -> beam_fn_api_pb2.InstructionResponse
@@ -674,64 +657,6 @@ class SdkWorker(object):
instruction_id=instruction_id,
process_bundle_split=process_bundle_split)
- def _log_lull_in_bundle_processor(self, processor):
- # type: (bundle_processor.BundleProcessor) -> None
- sampler_info = processor.state_sampler.get_info()
- self._log_lull_sampler_info(sampler_info)
-
- def _log_lull_sampler_info(self, sampler_info):
- # type: (statesampler.StateSamplerInfo) -> None
- if (sampler_info and sampler_info.time_since_transition and
- sampler_info.time_since_transition > self.log_lull_timeout_ns and
- self._passed_lull_timeout_since_last_log()):
- step_name = sampler_info.state_name.step_name
- state_name = sampler_info.state_name.name
- lull_seconds = sampler_info.time_since_transition / 1e9
- state_lull_log = (
- 'Operation ongoing for over %.2f seconds in state %s' %
- (lull_seconds, state_name))
- step_name_log = (' in step %s ' % step_name) if step_name else ''
-
- exec_thread = getattr(sampler_info, 'tracked_thread', None)
- if exec_thread is not None:
- thread_frame = sys._current_frames().get(exec_thread.ident) # pylint: disable=protected-access
- stack_trace = '\n'.join(
- traceback.format_stack(thread_frame)) if thread_frame else ''
- else:
- stack_trace = '-NOT AVAILABLE-'
-
- _LOGGER.warning(
- '%s%s without returning. Current Traceback:\n%s',
- state_lull_log,
- step_name_log,
- stack_trace)
-
- if self._should_log_full_thread_dump(lull_seconds):
- self._log_full_thread_dump()
-
- def _passed_lull_timeout_since_last_log(self) -> bool:
- if (time.time() - self._last_lull_logged_secs >
- self.log_lull_timeout_ns / 1e9):
- self._last_lull_logged_secs = time.time()
- return True
- else:
- return False
-
- def _should_log_full_thread_dump(self, lull_seconds):
- # type: (float) -> bool
- if lull_seconds < LOG_LULL_FULL_THREAD_DUMP_LULL_S:
- return False
- now = time.time()
- if (self._last_full_thread_dump_secs + LOG_LULL_FULL_THREAD_DUMP_INTERVAL_S
- < now):
- self._last_full_thread_dump_secs = now
- return True
- return False
-
- def _log_full_thread_dump(self):
- # type: () -> None
- thread_dump()
-
def process_bundle_progress(
self,
request, # type: beam_fn_api_pb2.ProcessBundleProgressRequest
@@ -744,7 +669,6 @@ class SdkWorker(object):
return beam_fn_api_pb2.InstructionResponse(
instruction_id=instruction_id, error=traceback.format_exc())
if processor:
- self._log_lull_in_bundle_processor(processor)
monitoring_infos = processor.monitoring_infos()
else:
# Return an empty response if we aren't running. This can happen
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index 92a2f72..d7309c1 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -21,8 +21,6 @@
import contextlib
import logging
-import threading
-import time
import unittest
from collections import namedtuple
@@ -40,12 +38,10 @@ from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability.api import metrics_pb2
from apache_beam.runners.worker import sdk_worker
from apache_beam.runners.worker import statecache
-from apache_beam.runners.worker import statesampler
from apache_beam.runners.worker.sdk_worker import BundleProcessorCache
from apache_beam.runners.worker.sdk_worker import GlobalCachingStateHandler
from apache_beam.runners.worker.sdk_worker import SdkWorker
from apache_beam.utils import thread_pool_executor
-from apache_beam.utils.counters import CounterName
_LOGGER = logging.getLogger(__name__)
@@ -127,13 +123,6 @@ class SdkWorkerTest(unittest.TestCase):
def test_fn_registration(self):
self._check_fn_registration_multi_request((1, 4), (4, 4))
- def _get_state_sampler_info_for_lull(self, lull_duration_s):
- return statesampler.StateSamplerInfo(
- CounterName('progress-msecs', 'stage_name', 'step_name'),
- 1,
- lull_duration_s * 1e9,
- threading.current_thread())
-
def test_inactive_bundle_processor_returns_empty_progress_response(self):
bundle_processor = mock.MagicMock()
bundle_processor_cache = BundleProcessorCache(None, None, {})
@@ -283,57 +272,6 @@ class SdkWorkerTest(unittest.TestCase):
hc.contains_string(
'Bundle processing associated with instruction_id has failed'))
- def test_log_lull_in_bundle_processor(self):
- bundle_processor_cache = mock.MagicMock()
- worker = SdkWorker(bundle_processor_cache)
-
- now = time.time()
- log_full_thread_dump_fn_name = \
- 'apache_beam.runners.worker.sdk_worker.SdkWorker._log_full_thread_dump'
- with mock.patch('logging.Logger.warning') as warn_mock:
- with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump:
- with mock.patch('time.time') as time_mock:
- time_mock.return_value = now
- sampler_info = self._get_state_sampler_info_for_lull(21 * 60)
- worker._log_lull_sampler_info(sampler_info)
-
- processing_template = warn_mock.call_args[0][1]
- step_name_template = warn_mock.call_args[0][2]
- traceback = warn_mock.call_args = warn_mock.call_args[0][3]
-
- self.assertIn('progress-msecs', processing_template)
- self.assertIn('step_name', step_name_template)
- self.assertIn('test_log_lull_in_bundle_processor', traceback)
-
- log_full_thread_dump.assert_called_once_with()
-
- with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump:
- with mock.patch('time.time') as time_mock:
- time_mock.return_value = now + 6 * 60 # 6 minutes
- sampler_info = self._get_state_sampler_info_for_lull(21 * 60)
- worker._log_lull_sampler_info(sampler_info)
- self.assertFalse(
- log_full_thread_dump.called,
- 'log_full_thread_dump should not be called because only 6 minutes '
- 'have passed since the last dump.')
-
- with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump:
- with mock.patch('time.time') as time_mock:
- time_mock.return_value = now + 21 * 60 # 21 minutes
- sampler_info = self._get_state_sampler_info_for_lull(10 * 60)
- worker._log_lull_sampler_info(sampler_info)
- self.assertFalse(
- log_full_thread_dump.called,
- 'log_full_thread_dump should not be called because lull is only '
- 'for 10 minutes.')
-
- with mock.patch(log_full_thread_dump_fn_name) as log_full_thread_dump:
- with mock.patch('time.time') as time_mock:
- time_mock.return_value = now + 42 * 60 # 21 minutes after previous one
- sampler_info = self._get_state_sampler_info_for_lull(21 * 60)
- worker._log_lull_sampler_info(sampler_info)
- log_full_thread_dump.assert_called_once_with()
-
class CachingStateHandlerTest(unittest.TestCase):
def test_caching(self):
diff --git a/sdks/python/apache_beam/runners/worker/worker_status.py b/sdks/python/apache_beam/runners/worker/worker_status.py
index b699f07..652b01d 100644
--- a/sdks/python/apache_beam/runners/worker/worker_status.py
+++ b/sdks/python/apache_beam/runners/worker/worker_status.py
@@ -17,9 +17,11 @@
"""Worker status api handler for reporting SDK harness debug info."""
+import logging
import queue
import sys
import threading
+import time
import traceback
from collections import defaultdict
@@ -36,6 +38,19 @@ try:
except ImportError:
hpy = None
+_LOGGER = logging.getLogger(__name__)
+
+# This SDK harness will (by default), log a "lull" in processing if it sees no
+# transitions in over 5 minutes.
+# 5 minutes * 60 seconds * 1000 millis * 1000 micros * 1000 nanoseconds
+DEFAULT_LOG_LULL_TIMEOUT_NS = 5 * 60 * 1000 * 1000 * 1000
+
+# Full thread dump is performed at most every 20 minutes.
+LOG_LULL_FULL_THREAD_DUMP_INTERVAL_S = 20 * 60
+
+# Full thread dump is performed if the lull is more than 20 minutes.
+LOG_LULL_FULL_THREAD_DUMP_LULL_S = 20 * 60
+
def thread_dump():
"""Get a thread dump for the current SDK worker harness. """
@@ -120,7 +135,11 @@ DONE = Sentinel.sentinel
class FnApiWorkerStatusHandler(object):
"""FnApiWorkerStatusHandler handles worker status request from Runner. """
def __init__(
- self, status_address, bundle_process_cache=None, enable_heap_dump=False):
+ self,
+ status_address,
+ bundle_process_cache=None,
+ enable_heap_dump=False,
+ log_lull_timeout_ns=DEFAULT_LOG_LULL_TIMEOUT_NS):
"""Initialize FnApiWorkerStatusHandler.
Args:
@@ -135,11 +154,20 @@ class FnApiWorkerStatusHandler(object):
self._status_stub = beam_fn_api_pb2_grpc.BeamFnWorkerStatusStub(
self._status_channel)
self._responses = queue.Queue()
+ self.log_lull_timeout_ns = log_lull_timeout_ns
+ self._last_full_thread_dump_secs = 0.0
+ self._last_lull_logged_secs = 0.0
self._server = threading.Thread(
target=lambda: self._serve(), name='fn_api_status_handler')
self._server.daemon = True
self._enable_heap_dump = enable_heap_dump
self._server.start()
+ self._lull_logger = threading.Thread(
+ target=lambda: self._log_lull_in_bundle_processor(
+ self._bundle_process_cache),
+ name='lull_operation_logger')
+ self._lull_logger.daemon = True
+ self._lull_logger.start()
def _get_responses(self):
while True:
@@ -177,3 +205,49 @@ class FnApiWorkerStatusHandler(object):
def close(self):
self._responses.put(DONE, timeout=5)
+
+ def _log_lull_in_bundle_processor(self, bundle_process_cache):
+ while True:
+ time.sleep(2 * 60)
+ if bundle_process_cache.active_bundle_processors:
+ for instruction in list(
+ bundle_process_cache.active_bundle_processors.keys()):
+ processor = bundle_process_cache.lookup(instruction)
+ if processor:
+ info = processor.state_sampler.get_info()
+ self._log_lull_sampler_info(info)
+
+ def _log_lull_sampler_info(self, sampler_info):
+ if not self._passed_lull_timeout_since_last_log():
+ return
+ if (sampler_info and sampler_info.time_since_transition and
+ sampler_info.time_since_transition > self.log_lull_timeout_ns):
+ step_name = sampler_info.state_name.step_name
+ state_name = sampler_info.state_name.name
+ lull_seconds = sampler_info.time_since_transition / 1e9
+ state_lull_log = (
+ 'Operation ongoing for over %.2f seconds in state %s' %
+ (lull_seconds, state_name))
+ step_name_log = (' in step %s ' % step_name) if step_name else ''
+
+ exec_thread = getattr(sampler_info, 'tracked_thread', None)
+ if exec_thread is not None:
+ thread_frame = sys._current_frames().get(exec_thread.ident) # pylint: disable=protected-access
+ stack_trace = '\n'.join(
+ traceback.format_stack(thread_frame)) if thread_frame else ''
+ else:
+ stack_trace = '-NOT AVAILABLE-'
+
+ _LOGGER.warning(
+ '%s%s without returning. Current Traceback:\n%s',
+ state_lull_log,
+ step_name_log,
+ stack_trace)
+
+ def _passed_lull_timeout_since_last_log(self) -> bool:
+ if (time.time() - self._last_lull_logged_secs >
+ self.log_lull_timeout_ns / 1e9):
+ self._last_lull_logged_secs = time.time()
+ return True
+ else:
+ return False
diff --git a/sdks/python/apache_beam/runners/worker/worker_status_test.py b/sdks/python/apache_beam/runners/worker/worker_status_test.py
index 2b4c7dc..7da5339 100644
--- a/sdks/python/apache_beam/runners/worker/worker_status_test.py
+++ b/sdks/python/apache_beam/runners/worker/worker_status_test.py
@@ -17,6 +17,7 @@
import logging
import threading
+import time
import unittest
import grpc
@@ -24,9 +25,11 @@ import mock
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import statesampler
from apache_beam.runners.worker.worker_status import FnApiWorkerStatusHandler
from apache_beam.runners.worker.worker_status import heap_dump
from apache_beam.utils import thread_pool_executor
+from apache_beam.utils.counters import CounterName
class BeamFnStatusServicer(beam_fn_api_pb2_grpc.BeamFnWorkerStatusServicer):
@@ -83,6 +86,44 @@ class FnApiWorkerStatusHandlerTest(unittest.TestCase):
self.assertIsNotNone(response.error)
self.fn_status_handler.close()
+ def test_log_lull_in_bundle_processor(self):
+ def get_state_sampler_info_for_lull(lull_duration_s):
+ return statesampler.StateSamplerInfo(
+ CounterName('progress-msecs', 'stage_name', 'step_name'),
+ 1,
+ lull_duration_s * 1e9,
+ threading.current_thread())
+
+ now = time.time()
+ with mock.patch('logging.Logger.warning') as warn_mock:
+ with mock.patch('time.time') as time_mock:
+ time_mock.return_value = now
+ sampler_info = get_state_sampler_info_for_lull(21 * 60)
+ self.fn_status_handler._log_lull_sampler_info(sampler_info)
+
+ processing_template = warn_mock.call_args[0][1]
+ step_name_template = warn_mock.call_args[0][2]
+ traceback = warn_mock.call_args = warn_mock.call_args[0][3]
+
+ self.assertIn('progress-msecs', processing_template)
+ self.assertIn('step_name', step_name_template)
+ self.assertIn('test_log_lull_in_bundle_processor', traceback)
+
+ with mock.patch('time.time') as time_mock:
+ time_mock.return_value = now + 6 * 60 # 6 minutes
+ sampler_info = get_state_sampler_info_for_lull(21 * 60)
+ self.fn_status_handler._log_lull_sampler_info(sampler_info)
+
+ with mock.patch('time.time') as time_mock:
+ time_mock.return_value = now + 21 * 60 # 21 minutes
+ sampler_info = get_state_sampler_info_for_lull(10 * 60)
+ self.fn_status_handler._log_lull_sampler_info(sampler_info)
+
+ with mock.patch('time.time') as time_mock:
+ time_mock.return_value = now + 42 * 60 # 21 minutes after previous one
+ sampler_info = get_state_sampler_info_for_lull(21 * 60)
+ self.fn_status_handler._log_lull_sampler_info(sampler_info)
+
class HeapDumpTest(unittest.TestCase):
@mock.patch('apache_beam.runners.worker.worker_status.hpy', None)