You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/10/08 18:37:41 UTC

[beam] 01/02: Adding lull logging for SDK harness

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch portable-lull
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 9bbebaaa6e89c7eb8c092e7bb136c58324c7c046
Author: pabloem <pa...@apache.org>
AuthorDate: Mon Oct 7 18:21:51 2019 -0700

    Adding lull logging for SDK harness
---
 .../runners/portability/fn_api_runner.py           | 14 ++++++---
 .../runners/portability/fn_api_runner_test.py      |  3 +-
 .../apache_beam/runners/worker/sdk_worker.py       | 35 +++++++++++++++++++++-
 3 files changed, 46 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 78e774f..4b36af8 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -316,7 +316,8 @@ class FnApiRunner(runner.PipelineRunner):
       default_environment=None,
       bundle_repeat=0,
       use_state_iterables=False,
-      provision_info=None):
+      provision_info=None,
+      progress_request_frequency=None):
     """Creates a new Fn API Runner.
 
     Args:
@@ -326,6 +327,8 @@ class FnApiRunner(runner.PipelineRunner):
       use_state_iterables: Intentionally split gbk iterables over state API
           (for testing)
       provision_info: provisioning info to make available to workers, or None
+      progress_request_frequency: The frequency (in seconds) that the runner
+          waits before requesting progress from the SDK.
     """
     super(FnApiRunner, self).__init__()
     self._last_uid = -1
@@ -334,7 +337,7 @@ class FnApiRunner(runner.PipelineRunner):
         or beam_runner_api_pb2.Environment(urn=python_urns.EMBEDDED_PYTHON))
     self._bundle_repeat = bundle_repeat
     self._num_workers = 1
-    self._progress_frequency = None
+    self._progress_frequency = progress_request_frequency
     self._profiler_factory = None
     self._use_state_iterables = use_state_iterables
     self._provision_info = provision_info or ExtendedProvisionInfo(
@@ -505,13 +508,14 @@ class FnApiRunner(runner.PipelineRunner):
     for k in range(self._bundle_repeat):
       try:
         worker_handler.state.checkpoint()
-        ParallelBundleManager(
+        testing_bundle_manager = ParallelBundleManager(
             worker_handler_list, lambda pcoll_id: [],
             get_input_coder_callable, process_bundle_descriptor,
             self._progress_frequency, k,
             num_workers=self._num_workers,
             cache_token_generator=cache_token_generator
-        ).process_bundle(data_input, data_output)
+        )
+        testing_bundle_manager.process_bundle(data_input, data_output)
       finally:
         worker_handler.state.restore()
 
@@ -1516,6 +1520,8 @@ class WorkerHandlerManager(object):
         worker_handler = WorkerHandler.create(
             environment, self._state, self._job_provision_info,
             self._grpc_server)
+        logging.info("Created Worker handler %s for environment %s",
+                     worker_handler, environment)
         self._cached_handlers[environment_id].append(worker_handler)
         worker_handler.start_worker()
     return self._cached_handlers[environment_id][:num_workers]
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 125b856..d50e9e8 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -1166,7 +1166,8 @@ class FnApiRunnerTestWithGrpc(FnApiRunnerTest):
     return beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(
             default_environment=beam_runner_api_pb2.Environment(
-                urn=python_urns.EMBEDDED_PYTHON_GRPC)))
+                urn=python_urns.EMBEDDED_PYTHON_GRPC),
+            progress_request_frequency=1))
 
 
 class FnApiRunnerTestWithGrpcMultiThreaded(FnApiRunnerTest):
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 9bcfe97..d5bb2c1 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -47,6 +47,12 @@ from apache_beam.runners.worker.statecache import StateCache
 from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
 
 
+# This SDK harness will (by default), log a "lull" in processing if it sees no
+# transitions in over 5 minutes.
+# 5 minutes * 60 seconds * 1020 millis * 1000 micros * 1000 nanoseconds
+DEFAULT_LOG_LULL_TIMEOUT_NS = 5 * 60 * 1000 * 1000 * 1000
+
+
 class SdkHarness(object):
   REQUEST_METHOD_PREFIX = '_request_'
   SCHEDULING_DELAY_THRESHOLD_SEC = 5*60  # 5 Minutes
@@ -339,9 +345,11 @@ class BundleProcessorCache(object):
 class SdkWorker(object):
 
   def __init__(self, bundle_processor_cache,
-               profiler_factory=None):
+               profiler_factory=None,
+               log_lull_timeout_ns=DEFAULT_LOG_LULL_TIMEOUT_NS):
     self.bundle_processor_cache = bundle_processor_cache
     self.profiler_factory = profiler_factory
+    self.log_lull_timeout_ns = log_lull_timeout_ns
 
   def do_instruction(self, request):
     request_type = request.WhichOneof('request')
@@ -403,10 +411,35 @@ class SdkWorker(object):
           instruction_id=instruction_id,
           error='Instruction not running: %s' % instruction_id)
 
+  def _log_lull_in_bundle_processor(self, processor):
+    state_sampler = processor.state_sampler
+    sampler_info = state_sampler.get_info()
+    if (sampler_info
+        and sampler_info.time_since_transition
+        and sampler_info.time_since_transition > self.log_lull_timeout_ns):
+      step_name = sampler_info.state_name.step_name
+      state_name = sampler_info.state_name.name
+      state_lull_log = (
+          'There has been a processing lull of over %.2f seconds in state %s'
+          % (sampler_info.time_since_transition / 1e9, state_name))
+      step_name_log = (' in step %s ' % step_name) if step_name else ''
+
+      exec_thread = getattr(sampler_info, 'tracked_thread', None)
+      if exec_thread is not None:
+        thread_frame = sys._current_frames().get(exec_thread.ident)  # pylint: disable=protected-access
+        stack_trace = '\n'.join(
+            traceback.format_stack(thread_frame)) if thread_frame else ''
+      else:
+        stack_trace = '-NOT AVAILABLE-'
+
+      logging.warning(
+          '%s%s. Traceback:\n%s', state_lull_log, step_name_log, stack_trace)
+
   def process_bundle_progress(self, request, instruction_id):
     # It is an error to get progress for a not-in-flight bundle.
     processor = self.bundle_processor_cache.lookup(
         request.instruction_id)
+    self._log_lull_in_bundle_processor(processor)
     return beam_fn_api_pb2.InstructionResponse(
         instruction_id=instruction_id,
         process_bundle_progress=beam_fn_api_pb2.ProcessBundleProgressResponse(