You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/10/08 18:37:42 UTC

[beam] 02/02: Trying to add a test case

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch portable-lull
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7af4b649a545e93e1a7923c048c42b6a9dbed9a0
Author: pabloem <pa...@apache.org>
AuthorDate: Tue Oct 8 11:36:14 2019 -0700

    Trying to add a test case
---
 .../runners/portability/fn_api_runner_test.py      | 30 ++++++++++++++++++++--
 .../apache_beam/runners/worker/sdk_worker.py       | 17 +++++++-----
 2 files changed, 39 insertions(+), 8 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index d50e9e8..35fa688 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -22,6 +22,7 @@ import logging
 import os
 import random
 import shutil
+import sys
 import tempfile
 import threading
 import time
@@ -49,6 +50,7 @@ from apache_beam.portability import python_urns
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners.portability import fn_api_runner
 from apache_beam.runners.worker import data_plane
+from apache_beam.runners.worker import sdk_worker
 from apache_beam.runners.worker import statesampler
 from apache_beam.testing.synthetic_pipeline import SyntheticSDFAsSource
 from apache_beam.testing.test_stream import TestStream
@@ -1166,8 +1168,7 @@ class FnApiRunnerTestWithGrpc(FnApiRunnerTest):
     return beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(
             default_environment=beam_runner_api_pb2.Environment(
-                urn=python_urns.EMBEDDED_PYTHON_GRPC),
-            progress_request_frequency=1))
+                urn=python_urns.EMBEDDED_PYTHON_GRPC)))
 
 
 class FnApiRunnerTestWithGrpcMultiThreaded(FnApiRunnerTest):
@@ -1581,6 +1582,31 @@ class FnApiRunnerSplitTestWithMultiWorkers(FnApiRunnerSplitTest):
     raise unittest.SkipTest("This test is for a single worker only.")
 
 
+class FnApiBasedLullLoggingTest(unittest.TestCase):
+  def create_pipeline(self):
+    return beam.Pipeline(
+        runner=fn_api_runner.FnApiRunner(
+            default_environment=beam_runner_api_pb2.Environment(
+                urn=python_urns.EMBEDDED_PYTHON_GRPC),
+            progress_request_frequency=1))
+
+  def test_lull_logging(self):
+
+    if sys.version_info < (3, 4):
+      self.skipTest("Log-based assertions are supported after Python 3.4")
+
+    with self.assertLogs('apache_beam.runners.worker.sdk_worker',
+                         level='WARNING') as logs:
+      with self.create_pipeline() as p:
+        sdk_worker.DEFAULT_LOG_LULL_TIMEOUT_NS = 1000 * 1000  # Lull after 1 msec
+
+        (p
+         | beam.Create([1, 2, 3])
+         | beam.Map(time.sleep))
+
+      self.assertRegex(logs.output[0], 'There has been a processing lull.*')
+      print(logs)
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index d5bb2c1..ddc5200 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -344,12 +344,15 @@ class BundleProcessorCache(object):
 
 class SdkWorker(object):
 
-  def __init__(self, bundle_processor_cache,
+
+  def __init__(self,
+               bundle_processor_cache,
                profiler_factory=None,
-               log_lull_timeout_ns=DEFAULT_LOG_LULL_TIMEOUT_NS):
+               log_lull_timeout_ns=None):
     self.bundle_processor_cache = bundle_processor_cache
     self.profiler_factory = profiler_factory
-    self.log_lull_timeout_ns = log_lull_timeout_ns
+    self.log_lull_timeout_ns = (log_lull_timeout_ns
+                                or DEFAULT_LOG_LULL_TIMEOUT_NS)
 
   def do_instruction(self, request):
     request_type = request.WhichOneof('request')
@@ -414,6 +417,8 @@ class SdkWorker(object):
   def _log_lull_in_bundle_processor(self, processor):
     state_sampler = processor.state_sampler
     sampler_info = state_sampler.get_info()
+    logging.warning("State sampler %s with info %s, lull timeout: %s",
+                    state_sampler, sampler_info, self.log_lull_timeout_ns)
     if (sampler_info
         and sampler_info.time_since_transition
         and sampler_info.time_since_transition > self.log_lull_timeout_ns):
@@ -437,9 +442,9 @@ class SdkWorker(object):
 
   def process_bundle_progress(self, request, instruction_id):
     # It is an error to get progress for a not-in-flight bundle.
-    processor = self.bundle_processor_cache.lookup(
-        request.instruction_id)
-    self._log_lull_in_bundle_processor(processor)
+    processor = self.bundle_processor_cache.lookup(request.instruction_id)
+    if processor:
+      self._log_lull_in_bundle_processor(processor)
     return beam_fn_api_pb2.InstructionResponse(
         instruction_id=instruction_id,
         process_bundle_progress=beam_fn_api_pb2.ProcessBundleProgressResponse(