You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/10/08 18:37:41 UTC
[beam] 01/02: Adding lull logging for SDK harness
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch portable-lull
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 9bbebaaa6e89c7eb8c092e7bb136c58324c7c046
Author: pabloem <pa...@apache.org>
AuthorDate: Mon Oct 7 18:21:51 2019 -0700
Adding lull logging for SDK harness
---
.../runners/portability/fn_api_runner.py | 14 ++++++---
.../runners/portability/fn_api_runner_test.py | 3 +-
.../apache_beam/runners/worker/sdk_worker.py | 35 +++++++++++++++++++++-
3 files changed, 46 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 78e774f..4b36af8 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -316,7 +316,8 @@ class FnApiRunner(runner.PipelineRunner):
default_environment=None,
bundle_repeat=0,
use_state_iterables=False,
- provision_info=None):
+ provision_info=None,
+ progress_request_frequency=None):
"""Creates a new Fn API Runner.
Args:
@@ -326,6 +327,8 @@ class FnApiRunner(runner.PipelineRunner):
use_state_iterables: Intentionally split gbk iterables over state API
(for testing)
provision_info: provisioning info to make available to workers, or None
+ progress_request_frequency: The frequency (in seconds) that the runner
+ waits before requesting progress from the SDK.
"""
super(FnApiRunner, self).__init__()
self._last_uid = -1
@@ -334,7 +337,7 @@ class FnApiRunner(runner.PipelineRunner):
or beam_runner_api_pb2.Environment(urn=python_urns.EMBEDDED_PYTHON))
self._bundle_repeat = bundle_repeat
self._num_workers = 1
- self._progress_frequency = None
+ self._progress_frequency = progress_request_frequency
self._profiler_factory = None
self._use_state_iterables = use_state_iterables
self._provision_info = provision_info or ExtendedProvisionInfo(
@@ -505,13 +508,14 @@ class FnApiRunner(runner.PipelineRunner):
for k in range(self._bundle_repeat):
try:
worker_handler.state.checkpoint()
- ParallelBundleManager(
+ testing_bundle_manager = ParallelBundleManager(
worker_handler_list, lambda pcoll_id: [],
get_input_coder_callable, process_bundle_descriptor,
self._progress_frequency, k,
num_workers=self._num_workers,
cache_token_generator=cache_token_generator
- ).process_bundle(data_input, data_output)
+ )
+ testing_bundle_manager.process_bundle(data_input, data_output)
finally:
worker_handler.state.restore()
@@ -1516,6 +1520,8 @@ class WorkerHandlerManager(object):
worker_handler = WorkerHandler.create(
environment, self._state, self._job_provision_info,
self._grpc_server)
+ logging.info("Created Worker handler %s for environment %s",
+ worker_handler, environment)
self._cached_handlers[environment_id].append(worker_handler)
worker_handler.start_worker()
return self._cached_handlers[environment_id][:num_workers]
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 125b856..d50e9e8 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -1166,7 +1166,8 @@ class FnApiRunnerTestWithGrpc(FnApiRunnerTest):
return beam.Pipeline(
runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
- urn=python_urns.EMBEDDED_PYTHON_GRPC)))
+ urn=python_urns.EMBEDDED_PYTHON_GRPC),
+ progress_request_frequency=1))
class FnApiRunnerTestWithGrpcMultiThreaded(FnApiRunnerTest):
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 9bcfe97..d5bb2c1 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -47,6 +47,12 @@ from apache_beam.runners.worker.statecache import StateCache
from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
+# This SDK harness will (by default), log a "lull" in processing if it sees no
+# transitions in over 5 minutes.
+# 5 minutes * 60 seconds * 1020 millis * 1000 micros * 1000 nanoseconds
+DEFAULT_LOG_LULL_TIMEOUT_NS = 5 * 60 * 1000 * 1000 * 1000
+
+
class SdkHarness(object):
REQUEST_METHOD_PREFIX = '_request_'
SCHEDULING_DELAY_THRESHOLD_SEC = 5*60 # 5 Minutes
@@ -339,9 +345,11 @@ class BundleProcessorCache(object):
class SdkWorker(object):
def __init__(self, bundle_processor_cache,
- profiler_factory=None):
+ profiler_factory=None,
+ log_lull_timeout_ns=DEFAULT_LOG_LULL_TIMEOUT_NS):
self.bundle_processor_cache = bundle_processor_cache
self.profiler_factory = profiler_factory
+ self.log_lull_timeout_ns = log_lull_timeout_ns
def do_instruction(self, request):
request_type = request.WhichOneof('request')
@@ -403,10 +411,35 @@ class SdkWorker(object):
instruction_id=instruction_id,
error='Instruction not running: %s' % instruction_id)
+ def _log_lull_in_bundle_processor(self, processor):
+ state_sampler = processor.state_sampler
+ sampler_info = state_sampler.get_info()
+ if (sampler_info
+ and sampler_info.time_since_transition
+ and sampler_info.time_since_transition > self.log_lull_timeout_ns):
+ step_name = sampler_info.state_name.step_name
+ state_name = sampler_info.state_name.name
+ state_lull_log = (
+ 'There has been a processing lull of over %.2f seconds in state %s'
+ % (sampler_info.time_since_transition / 1e9, state_name))
+ step_name_log = (' in step %s ' % step_name) if step_name else ''
+
+ exec_thread = getattr(sampler_info, 'tracked_thread', None)
+ if exec_thread is not None:
+ thread_frame = sys._current_frames().get(exec_thread.ident) # pylint: disable=protected-access
+ stack_trace = '\n'.join(
+ traceback.format_stack(thread_frame)) if thread_frame else ''
+ else:
+ stack_trace = '-NOT AVAILABLE-'
+
+ logging.warning(
+ '%s%s. Traceback:\n%s', state_lull_log, step_name_log, stack_trace)
+
def process_bundle_progress(self, request, instruction_id):
# It is an error to get progress for a not-in-flight bundle.
processor = self.bundle_processor_cache.lookup(
request.instruction_id)
+ self._log_lull_in_bundle_processor(processor)
return beam_fn_api_pb2.InstructionResponse(
instruction_id=instruction_id,
process_bundle_progress=beam_fn_api_pb2.ProcessBundleProgressResponse(