You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/10/08 18:37:40 UTC

[beam] branch portable-lull created (now 7af4b64)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch portable-lull
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at 7af4b64  Trying to add a test case

This branch includes the following new commits:

     new 9bbebaa  Adding lull logging for SDK harness
     new 7af4b64  Trying to add a test case

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/02: Adding lull logging for SDK harness

Posted by pa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch portable-lull
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 9bbebaaa6e89c7eb8c092e7bb136c58324c7c046
Author: pabloem <pa...@apache.org>
AuthorDate: Mon Oct 7 18:21:51 2019 -0700

    Adding lull logging for SDK harness
---
 .../runners/portability/fn_api_runner.py           | 14 ++++++---
 .../runners/portability/fn_api_runner_test.py      |  3 +-
 .../apache_beam/runners/worker/sdk_worker.py       | 35 +++++++++++++++++++++-
 3 files changed, 46 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 78e774f..4b36af8 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -316,7 +316,8 @@ class FnApiRunner(runner.PipelineRunner):
       default_environment=None,
       bundle_repeat=0,
       use_state_iterables=False,
-      provision_info=None):
+      provision_info=None,
+      progress_request_frequency=None):
     """Creates a new Fn API Runner.
 
     Args:
@@ -326,6 +327,8 @@ class FnApiRunner(runner.PipelineRunner):
       use_state_iterables: Intentionally split gbk iterables over state API
           (for testing)
       provision_info: provisioning info to make available to workers, or None
+      progress_request_frequency: The frequency (in seconds) that the runner
+          waits before requesting progress from the SDK.
     """
     super(FnApiRunner, self).__init__()
     self._last_uid = -1
@@ -334,7 +337,7 @@ class FnApiRunner(runner.PipelineRunner):
         or beam_runner_api_pb2.Environment(urn=python_urns.EMBEDDED_PYTHON))
     self._bundle_repeat = bundle_repeat
     self._num_workers = 1
-    self._progress_frequency = None
+    self._progress_frequency = progress_request_frequency
     self._profiler_factory = None
     self._use_state_iterables = use_state_iterables
     self._provision_info = provision_info or ExtendedProvisionInfo(
@@ -505,13 +508,14 @@ class FnApiRunner(runner.PipelineRunner):
     for k in range(self._bundle_repeat):
       try:
         worker_handler.state.checkpoint()
-        ParallelBundleManager(
+        testing_bundle_manager = ParallelBundleManager(
             worker_handler_list, lambda pcoll_id: [],
             get_input_coder_callable, process_bundle_descriptor,
             self._progress_frequency, k,
             num_workers=self._num_workers,
             cache_token_generator=cache_token_generator
-        ).process_bundle(data_input, data_output)
+        )
+        testing_bundle_manager.process_bundle(data_input, data_output)
       finally:
         worker_handler.state.restore()
 
@@ -1516,6 +1520,8 @@ class WorkerHandlerManager(object):
         worker_handler = WorkerHandler.create(
             environment, self._state, self._job_provision_info,
             self._grpc_server)
+        logging.info("Created Worker handler %s for environment %s",
+                     worker_handler, environment)
         self._cached_handlers[environment_id].append(worker_handler)
         worker_handler.start_worker()
     return self._cached_handlers[environment_id][:num_workers]
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 125b856..d50e9e8 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -1166,7 +1166,8 @@ class FnApiRunnerTestWithGrpc(FnApiRunnerTest):
     return beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(
             default_environment=beam_runner_api_pb2.Environment(
-                urn=python_urns.EMBEDDED_PYTHON_GRPC)))
+                urn=python_urns.EMBEDDED_PYTHON_GRPC),
+            progress_request_frequency=1))
 
 
 class FnApiRunnerTestWithGrpcMultiThreaded(FnApiRunnerTest):
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 9bcfe97..d5bb2c1 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -47,6 +47,12 @@ from apache_beam.runners.worker.statecache import StateCache
 from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
 
 
+# This SDK harness will (by default), log a "lull" in processing if it sees no
+# transitions in over 5 minutes.
+# 5 minutes * 60 seconds * 1020 millis * 1000 micros * 1000 nanoseconds
+DEFAULT_LOG_LULL_TIMEOUT_NS = 5 * 60 * 1000 * 1000 * 1000
+
+
 class SdkHarness(object):
   REQUEST_METHOD_PREFIX = '_request_'
   SCHEDULING_DELAY_THRESHOLD_SEC = 5*60  # 5 Minutes
@@ -339,9 +345,11 @@ class BundleProcessorCache(object):
 class SdkWorker(object):
 
   def __init__(self, bundle_processor_cache,
-               profiler_factory=None):
+               profiler_factory=None,
+               log_lull_timeout_ns=DEFAULT_LOG_LULL_TIMEOUT_NS):
     self.bundle_processor_cache = bundle_processor_cache
     self.profiler_factory = profiler_factory
+    self.log_lull_timeout_ns = log_lull_timeout_ns
 
   def do_instruction(self, request):
     request_type = request.WhichOneof('request')
@@ -403,10 +411,35 @@ class SdkWorker(object):
           instruction_id=instruction_id,
           error='Instruction not running: %s' % instruction_id)
 
+  def _log_lull_in_bundle_processor(self, processor):
+    state_sampler = processor.state_sampler
+    sampler_info = state_sampler.get_info()
+    if (sampler_info
+        and sampler_info.time_since_transition
+        and sampler_info.time_since_transition > self.log_lull_timeout_ns):
+      step_name = sampler_info.state_name.step_name
+      state_name = sampler_info.state_name.name
+      state_lull_log = (
+          'There has been a processing lull of over %.2f seconds in state %s'
+          % (sampler_info.time_since_transition / 1e9, state_name))
+      step_name_log = (' in step %s ' % step_name) if step_name else ''
+
+      exec_thread = getattr(sampler_info, 'tracked_thread', None)
+      if exec_thread is not None:
+        thread_frame = sys._current_frames().get(exec_thread.ident)  # pylint: disable=protected-access
+        stack_trace = '\n'.join(
+            traceback.format_stack(thread_frame)) if thread_frame else ''
+      else:
+        stack_trace = '-NOT AVAILABLE-'
+
+      logging.warning(
+          '%s%s. Traceback:\n%s', state_lull_log, step_name_log, stack_trace)
+
   def process_bundle_progress(self, request, instruction_id):
     # It is an error to get progress for a not-in-flight bundle.
     processor = self.bundle_processor_cache.lookup(
         request.instruction_id)
+    self._log_lull_in_bundle_processor(processor)
     return beam_fn_api_pb2.InstructionResponse(
         instruction_id=instruction_id,
         process_bundle_progress=beam_fn_api_pb2.ProcessBundleProgressResponse(


[beam] 02/02: Trying to add a test case

Posted by pa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch portable-lull
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7af4b649a545e93e1a7923c048c42b6a9dbed9a0
Author: pabloem <pa...@apache.org>
AuthorDate: Tue Oct 8 11:36:14 2019 -0700

    Trying to add a test case
---
 .../runners/portability/fn_api_runner_test.py      | 30 ++++++++++++++++++++--
 .../apache_beam/runners/worker/sdk_worker.py       | 17 +++++++-----
 2 files changed, 39 insertions(+), 8 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index d50e9e8..35fa688 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -22,6 +22,7 @@ import logging
 import os
 import random
 import shutil
+import sys
 import tempfile
 import threading
 import time
@@ -49,6 +50,7 @@ from apache_beam.portability import python_urns
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners.portability import fn_api_runner
 from apache_beam.runners.worker import data_plane
+from apache_beam.runners.worker import sdk_worker
 from apache_beam.runners.worker import statesampler
 from apache_beam.testing.synthetic_pipeline import SyntheticSDFAsSource
 from apache_beam.testing.test_stream import TestStream
@@ -1166,8 +1168,7 @@ class FnApiRunnerTestWithGrpc(FnApiRunnerTest):
     return beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(
             default_environment=beam_runner_api_pb2.Environment(
-                urn=python_urns.EMBEDDED_PYTHON_GRPC),
-            progress_request_frequency=1))
+                urn=python_urns.EMBEDDED_PYTHON_GRPC)))
 
 
 class FnApiRunnerTestWithGrpcMultiThreaded(FnApiRunnerTest):
@@ -1581,6 +1582,31 @@ class FnApiRunnerSplitTestWithMultiWorkers(FnApiRunnerSplitTest):
     raise unittest.SkipTest("This test is for a single worker only.")
 
 
+class FnApiBasedLullLoggingTest(unittest.TestCase):
+  def create_pipeline(self):
+    return beam.Pipeline(
+        runner=fn_api_runner.FnApiRunner(
+            default_environment=beam_runner_api_pb2.Environment(
+                urn=python_urns.EMBEDDED_PYTHON_GRPC),
+            progress_request_frequency=1))
+
+  def test_lull_logging(self):
+
+    if sys.version_info < (3, 4):
+      self.skipTest("Log-based assertions are supported after Python 3.4")
+
+    with self.assertLogs('apache_beam.runners.worker.sdk_worker',
+                         level='WARNING') as logs:
+      with self.create_pipeline() as p:
+        sdk_worker.DEFAULT_LOG_LULL_TIMEOUT_NS = 1000 * 1000  # Lull after 1 msec
+
+        (p
+         | beam.Create([1, 2, 3])
+         | beam.Map(time.sleep))
+
+      self.assertRegex(logs.output[0], 'There has been a processing lull.*')
+      print(logs)
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index d5bb2c1..ddc5200 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -344,12 +344,15 @@ class BundleProcessorCache(object):
 
 class SdkWorker(object):
 
-  def __init__(self, bundle_processor_cache,
+
+  def __init__(self,
+               bundle_processor_cache,
                profiler_factory=None,
-               log_lull_timeout_ns=DEFAULT_LOG_LULL_TIMEOUT_NS):
+               log_lull_timeout_ns=None):
     self.bundle_processor_cache = bundle_processor_cache
     self.profiler_factory = profiler_factory
-    self.log_lull_timeout_ns = log_lull_timeout_ns
+    self.log_lull_timeout_ns = (log_lull_timeout_ns
+                                or DEFAULT_LOG_LULL_TIMEOUT_NS)
 
   def do_instruction(self, request):
     request_type = request.WhichOneof('request')
@@ -414,6 +417,8 @@ class SdkWorker(object):
   def _log_lull_in_bundle_processor(self, processor):
     state_sampler = processor.state_sampler
     sampler_info = state_sampler.get_info()
+    logging.warning("State sampler %s with info %s, lull timeout: %s",
+                    state_sampler, sampler_info, self.log_lull_timeout_ns)
     if (sampler_info
         and sampler_info.time_since_transition
         and sampler_info.time_since_transition > self.log_lull_timeout_ns):
@@ -437,9 +442,9 @@ class SdkWorker(object):
 
   def process_bundle_progress(self, request, instruction_id):
     # It is an error to get progress for a not-in-flight bundle.
-    processor = self.bundle_processor_cache.lookup(
-        request.instruction_id)
-    self._log_lull_in_bundle_processor(processor)
+    processor = self.bundle_processor_cache.lookup(request.instruction_id)
+    if processor:
+      self._log_lull_in_bundle_processor(processor)
     return beam_fn_api_pb2.InstructionResponse(
         instruction_id=instruction_id,
         process_bundle_progress=beam_fn_api_pb2.ProcessBundleProgressResponse(