You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/10/08 18:37:42 UTC
[beam] 02/02: Trying to add a test case
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch portable-lull
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 7af4b649a545e93e1a7923c048c42b6a9dbed9a0
Author: pabloem <pa...@apache.org>
AuthorDate: Tue Oct 8 11:36:14 2019 -0700
Trying to add a test case
---
.../runners/portability/fn_api_runner_test.py | 30 ++++++++++++++++++++--
.../apache_beam/runners/worker/sdk_worker.py | 17 +++++++-----
2 files changed, 39 insertions(+), 8 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index d50e9e8..35fa688 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -22,6 +22,7 @@ import logging
import os
import random
import shutil
+import sys
import tempfile
import threading
import time
@@ -49,6 +50,7 @@ from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.portability import fn_api_runner
from apache_beam.runners.worker import data_plane
+from apache_beam.runners.worker import sdk_worker
from apache_beam.runners.worker import statesampler
from apache_beam.testing.synthetic_pipeline import SyntheticSDFAsSource
from apache_beam.testing.test_stream import TestStream
@@ -1166,8 +1168,7 @@ class FnApiRunnerTestWithGrpc(FnApiRunnerTest):
return beam.Pipeline(
runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
- urn=python_urns.EMBEDDED_PYTHON_GRPC),
- progress_request_frequency=1))
+ urn=python_urns.EMBEDDED_PYTHON_GRPC)))
class FnApiRunnerTestWithGrpcMultiThreaded(FnApiRunnerTest):
@@ -1581,6 +1582,31 @@ class FnApiRunnerSplitTestWithMultiWorkers(FnApiRunnerSplitTest):
raise unittest.SkipTest("This test is for a single worker only.")
+class FnApiBasedLullLoggingTest(unittest.TestCase):
+ def create_pipeline(self):
+ return beam.Pipeline(
+ runner=fn_api_runner.FnApiRunner(
+ default_environment=beam_runner_api_pb2.Environment(
+ urn=python_urns.EMBEDDED_PYTHON_GRPC),
+ progress_request_frequency=1))
+
+ def test_lull_logging(self):
+
+ if sys.version_info < (3, 4):
+ self.skipTest("Log-based assertions are supported after Python 3.4")
+
+ with self.assertLogs('apache_beam.runners.worker.sdk_worker',
+ level='WARNING') as logs:
+ with self.create_pipeline() as p:
+ sdk_worker.DEFAULT_LOG_LULL_TIMEOUT_NS = 1000 * 1000 # Lull after 1 msec
+
+ (p
+ | beam.Create([1, 2, 3])
+ | beam.Map(time.sleep))
+
+ self.assertRegex(logs.output[0], 'There has been a processing lull.*')
+ print(logs)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index d5bb2c1..ddc5200 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -344,12 +344,15 @@ class BundleProcessorCache(object):
class SdkWorker(object):
- def __init__(self, bundle_processor_cache,
+
+ def __init__(self,
+ bundle_processor_cache,
profiler_factory=None,
- log_lull_timeout_ns=DEFAULT_LOG_LULL_TIMEOUT_NS):
+ log_lull_timeout_ns=None):
self.bundle_processor_cache = bundle_processor_cache
self.profiler_factory = profiler_factory
- self.log_lull_timeout_ns = log_lull_timeout_ns
+ self.log_lull_timeout_ns = (log_lull_timeout_ns
+ or DEFAULT_LOG_LULL_TIMEOUT_NS)
def do_instruction(self, request):
request_type = request.WhichOneof('request')
@@ -414,6 +417,8 @@ class SdkWorker(object):
def _log_lull_in_bundle_processor(self, processor):
state_sampler = processor.state_sampler
sampler_info = state_sampler.get_info()
+ logging.warning("State sampler %s with info %s, lull timeout: %s",
+ state_sampler, sampler_info, self.log_lull_timeout_ns)
if (sampler_info
and sampler_info.time_since_transition
and sampler_info.time_since_transition > self.log_lull_timeout_ns):
@@ -437,9 +442,9 @@ class SdkWorker(object):
def process_bundle_progress(self, request, instruction_id):
# It is an error to get progress for a not-in-flight bundle.
- processor = self.bundle_processor_cache.lookup(
- request.instruction_id)
- self._log_lull_in_bundle_processor(processor)
+ processor = self.bundle_processor_cache.lookup(request.instruction_id)
+ if processor:
+ self._log_lull_in_bundle_processor(processor)
return beam_fn_api_pb2.InstructionResponse(
instruction_id=instruction_id,
process_bundle_progress=beam_fn_api_pb2.ProcessBundleProgressResponse(