You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/10/08 18:37:40 UTC
[beam] branch portable-lull created (now 7af4b64)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a change to branch portable-lull
in repository https://gitbox.apache.org/repos/asf/beam.git.
at 7af4b64 Trying to add a test case
This branch includes the following new commits:
new 9bbebaa Adding lull logging for SDK harness
new 7af4b64 Trying to add a test case
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[beam] 01/02: Adding lull logging for SDK harness
Posted by pa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch portable-lull
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 9bbebaaa6e89c7eb8c092e7bb136c58324c7c046
Author: pabloem <pa...@apache.org>
AuthorDate: Mon Oct 7 18:21:51 2019 -0700
Adding lull logging for SDK harness
---
.../runners/portability/fn_api_runner.py | 14 ++++++---
.../runners/portability/fn_api_runner_test.py | 3 +-
.../apache_beam/runners/worker/sdk_worker.py | 35 +++++++++++++++++++++-
3 files changed, 46 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 78e774f..4b36af8 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -316,7 +316,8 @@ class FnApiRunner(runner.PipelineRunner):
default_environment=None,
bundle_repeat=0,
use_state_iterables=False,
- provision_info=None):
+ provision_info=None,
+ progress_request_frequency=None):
"""Creates a new Fn API Runner.
Args:
@@ -326,6 +327,8 @@ class FnApiRunner(runner.PipelineRunner):
use_state_iterables: Intentionally split gbk iterables over state API
(for testing)
provision_info: provisioning info to make available to workers, or None
+ progress_request_frequency: The frequency (in seconds) that the runner
+ waits before requesting progress from the SDK.
"""
super(FnApiRunner, self).__init__()
self._last_uid = -1
@@ -334,7 +337,7 @@ class FnApiRunner(runner.PipelineRunner):
or beam_runner_api_pb2.Environment(urn=python_urns.EMBEDDED_PYTHON))
self._bundle_repeat = bundle_repeat
self._num_workers = 1
- self._progress_frequency = None
+ self._progress_frequency = progress_request_frequency
self._profiler_factory = None
self._use_state_iterables = use_state_iterables
self._provision_info = provision_info or ExtendedProvisionInfo(
@@ -505,13 +508,14 @@ class FnApiRunner(runner.PipelineRunner):
for k in range(self._bundle_repeat):
try:
worker_handler.state.checkpoint()
- ParallelBundleManager(
+ testing_bundle_manager = ParallelBundleManager(
worker_handler_list, lambda pcoll_id: [],
get_input_coder_callable, process_bundle_descriptor,
self._progress_frequency, k,
num_workers=self._num_workers,
cache_token_generator=cache_token_generator
- ).process_bundle(data_input, data_output)
+ )
+ testing_bundle_manager.process_bundle(data_input, data_output)
finally:
worker_handler.state.restore()
@@ -1516,6 +1520,8 @@ class WorkerHandlerManager(object):
worker_handler = WorkerHandler.create(
environment, self._state, self._job_provision_info,
self._grpc_server)
+ logging.info("Created Worker handler %s for environment %s",
+ worker_handler, environment)
self._cached_handlers[environment_id].append(worker_handler)
worker_handler.start_worker()
return self._cached_handlers[environment_id][:num_workers]
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 125b856..d50e9e8 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -1166,7 +1166,8 @@ class FnApiRunnerTestWithGrpc(FnApiRunnerTest):
return beam.Pipeline(
runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
- urn=python_urns.EMBEDDED_PYTHON_GRPC)))
+ urn=python_urns.EMBEDDED_PYTHON_GRPC),
+ progress_request_frequency=1))
class FnApiRunnerTestWithGrpcMultiThreaded(FnApiRunnerTest):
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 9bcfe97..d5bb2c1 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -47,6 +47,12 @@ from apache_beam.runners.worker.statecache import StateCache
from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
+# This SDK harness will (by default), log a "lull" in processing if it sees no
+# transitions in over 5 minutes.
+# 5 minutes * 60 seconds * 1020 millis * 1000 micros * 1000 nanoseconds
+DEFAULT_LOG_LULL_TIMEOUT_NS = 5 * 60 * 1000 * 1000 * 1000
+
+
class SdkHarness(object):
REQUEST_METHOD_PREFIX = '_request_'
SCHEDULING_DELAY_THRESHOLD_SEC = 5*60 # 5 Minutes
@@ -339,9 +345,11 @@ class BundleProcessorCache(object):
class SdkWorker(object):
def __init__(self, bundle_processor_cache,
- profiler_factory=None):
+ profiler_factory=None,
+ log_lull_timeout_ns=DEFAULT_LOG_LULL_TIMEOUT_NS):
self.bundle_processor_cache = bundle_processor_cache
self.profiler_factory = profiler_factory
+ self.log_lull_timeout_ns = log_lull_timeout_ns
def do_instruction(self, request):
request_type = request.WhichOneof('request')
@@ -403,10 +411,35 @@ class SdkWorker(object):
instruction_id=instruction_id,
error='Instruction not running: %s' % instruction_id)
+ def _log_lull_in_bundle_processor(self, processor):
+ state_sampler = processor.state_sampler
+ sampler_info = state_sampler.get_info()
+ if (sampler_info
+ and sampler_info.time_since_transition
+ and sampler_info.time_since_transition > self.log_lull_timeout_ns):
+ step_name = sampler_info.state_name.step_name
+ state_name = sampler_info.state_name.name
+ state_lull_log = (
+ 'There has been a processing lull of over %.2f seconds in state %s'
+ % (sampler_info.time_since_transition / 1e9, state_name))
+ step_name_log = (' in step %s ' % step_name) if step_name else ''
+
+ exec_thread = getattr(sampler_info, 'tracked_thread', None)
+ if exec_thread is not None:
+ thread_frame = sys._current_frames().get(exec_thread.ident) # pylint: disable=protected-access
+ stack_trace = '\n'.join(
+ traceback.format_stack(thread_frame)) if thread_frame else ''
+ else:
+ stack_trace = '-NOT AVAILABLE-'
+
+ logging.warning(
+ '%s%s. Traceback:\n%s', state_lull_log, step_name_log, stack_trace)
+
def process_bundle_progress(self, request, instruction_id):
# It is an error to get progress for a not-in-flight bundle.
processor = self.bundle_processor_cache.lookup(
request.instruction_id)
+ self._log_lull_in_bundle_processor(processor)
return beam_fn_api_pb2.InstructionResponse(
instruction_id=instruction_id,
process_bundle_progress=beam_fn_api_pb2.ProcessBundleProgressResponse(
[beam] 02/02: Trying to add a test case
Posted by pa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch portable-lull
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 7af4b649a545e93e1a7923c048c42b6a9dbed9a0
Author: pabloem <pa...@apache.org>
AuthorDate: Tue Oct 8 11:36:14 2019 -0700
Trying to add a test case
---
.../runners/portability/fn_api_runner_test.py | 30 ++++++++++++++++++++--
.../apache_beam/runners/worker/sdk_worker.py | 17 +++++++-----
2 files changed, 39 insertions(+), 8 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index d50e9e8..35fa688 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -22,6 +22,7 @@ import logging
import os
import random
import shutil
+import sys
import tempfile
import threading
import time
@@ -49,6 +50,7 @@ from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.portability import fn_api_runner
from apache_beam.runners.worker import data_plane
+from apache_beam.runners.worker import sdk_worker
from apache_beam.runners.worker import statesampler
from apache_beam.testing.synthetic_pipeline import SyntheticSDFAsSource
from apache_beam.testing.test_stream import TestStream
@@ -1166,8 +1168,7 @@ class FnApiRunnerTestWithGrpc(FnApiRunnerTest):
return beam.Pipeline(
runner=fn_api_runner.FnApiRunner(
default_environment=beam_runner_api_pb2.Environment(
- urn=python_urns.EMBEDDED_PYTHON_GRPC),
- progress_request_frequency=1))
+ urn=python_urns.EMBEDDED_PYTHON_GRPC)))
class FnApiRunnerTestWithGrpcMultiThreaded(FnApiRunnerTest):
@@ -1581,6 +1582,31 @@ class FnApiRunnerSplitTestWithMultiWorkers(FnApiRunnerSplitTest):
raise unittest.SkipTest("This test is for a single worker only.")
+class FnApiBasedLullLoggingTest(unittest.TestCase):
+ def create_pipeline(self):
+ return beam.Pipeline(
+ runner=fn_api_runner.FnApiRunner(
+ default_environment=beam_runner_api_pb2.Environment(
+ urn=python_urns.EMBEDDED_PYTHON_GRPC),
+ progress_request_frequency=1))
+
+ def test_lull_logging(self):
+
+ if sys.version_info < (3, 4):
+ self.skipTest("Log-based assertions are supported after Python 3.4")
+
+ with self.assertLogs('apache_beam.runners.worker.sdk_worker',
+ level='WARNING') as logs:
+ with self.create_pipeline() as p:
+ sdk_worker.DEFAULT_LOG_LULL_TIMEOUT_NS = 1000 * 1000 # Lull after 1 msec
+
+ (p
+ | beam.Create([1, 2, 3])
+ | beam.Map(time.sleep))
+
+ self.assertRegex(logs.output[0], 'There has been a processing lull.*')
+ print(logs)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index d5bb2c1..ddc5200 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -344,12 +344,15 @@ class BundleProcessorCache(object):
class SdkWorker(object):
- def __init__(self, bundle_processor_cache,
+
+ def __init__(self,
+ bundle_processor_cache,
profiler_factory=None,
- log_lull_timeout_ns=DEFAULT_LOG_LULL_TIMEOUT_NS):
+ log_lull_timeout_ns=None):
self.bundle_processor_cache = bundle_processor_cache
self.profiler_factory = profiler_factory
- self.log_lull_timeout_ns = log_lull_timeout_ns
+ self.log_lull_timeout_ns = (log_lull_timeout_ns
+ or DEFAULT_LOG_LULL_TIMEOUT_NS)
def do_instruction(self, request):
request_type = request.WhichOneof('request')
@@ -414,6 +417,8 @@ class SdkWorker(object):
def _log_lull_in_bundle_processor(self, processor):
state_sampler = processor.state_sampler
sampler_info = state_sampler.get_info()
+ logging.warning("State sampler %s with info %s, lull timeout: %s",
+ state_sampler, sampler_info, self.log_lull_timeout_ns)
if (sampler_info
and sampler_info.time_since_transition
and sampler_info.time_since_transition > self.log_lull_timeout_ns):
@@ -437,9 +442,9 @@ class SdkWorker(object):
def process_bundle_progress(self, request, instruction_id):
# It is an error to get progress for a not-in-flight bundle.
- processor = self.bundle_processor_cache.lookup(
- request.instruction_id)
- self._log_lull_in_bundle_processor(processor)
+ processor = self.bundle_processor_cache.lookup(request.instruction_id)
+ if processor:
+ self._log_lull_in_bundle_processor(processor)
return beam_fn_api_pb2.InstructionResponse(
instruction_id=instruction_id,
process_bundle_progress=beam_fn_api_pb2.ProcessBundleProgressResponse(