You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/09/25 21:13:54 UTC
[beam] branch master updated: [BEAM-10959] Store a fixed amount of
known process bundle instructions to prevent failures due to concurrency
within a runner. (#12934)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3840369 [BEAM-10959] Store a fixed amount of known process bundle instructions to prevent failures due to concurrency within a runner. (#12934)
3840369 is described below
commit 38403698934c422968320a0bdd834e8b9ae598a3
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Fri Sep 25 14:13:21 2020 -0700
[BEAM-10959] Store a fixed amount of known process bundle instructions to prevent failures due to concurrency within a runner. (#12934)
* [BEAM-10959] Store a fixed amount of known process bundle instructions to prevent failures due to concurrency within a runner.
* fixup! Fix typo in typing information
* fixup! Address PR comments
---
.../runners/portability/fn_api_runner/fn_runner.py | 14 +-
.../apache_beam/runners/worker/sdk_worker.py | 172 +++++++++++++++------
.../apache_beam/runners/worker/sdk_worker_test.py | 92 +++++++++++
3 files changed, 225 insertions(+), 53 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index 4ce14e2..9c43f33 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -793,12 +793,15 @@ class BundleManager(object):
split_response = self._worker_handler.control_conn.push(
split_request).get() # type: beam_fn_api_pb2.InstructionResponse
for t in (0.05, 0.1, 0.2):
- waiting = ('Instruction not running', 'not yet scheduled')
- if any(msg in split_response.error for msg in waiting):
+ if ('Unknown process bundle' in split_response.error or
+ split_response.process_bundle_split ==
+ beam_fn_api_pb2.ProcessBundleSplitResponse()):
time.sleep(t)
split_response = self._worker_handler.control_conn.push(
split_request).get()
- if 'Unknown process bundle' in split_response.error:
+ if ('Unknown process bundle' in split_response.error or
+ split_response.process_bundle_split ==
+ beam_fn_api_pb2.ProcessBundleSplitResponse()):
# It may have finished too fast.
split_result = None
elif split_response.error:
@@ -894,7 +897,10 @@ class BundleManager(object):
finalize_request = beam_fn_api_pb2.InstructionRequest(
finalize_bundle=beam_fn_api_pb2.FinalizeBundleRequest(
instruction_id=process_bundle_id))
- self._worker_handler.control_conn.push(finalize_request)
+ finalize_response = self._worker_handler.control_conn.push(
+ finalize_request).get()
+ if finalize_response.error:
+ raise RuntimeError(finalize_response.error)
return result, split_results
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 1a8e093..b6070dc 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -87,6 +87,13 @@ LOG_LULL_FULL_THREAD_DUMP_INTERVAL_S = 20 * 60
# Full thread dump is performed if the lull is more than 20 minutes.
LOG_LULL_FULL_THREAD_DUMP_LULL_S = 20 * 60
+# The number of ProcessBundleRequest instruction ids the BundleProcessorCache
+# will remember for not running instructions.
+MAX_KNOWN_NOT_RUNNING_INSTRUCTIONS = 1000
+# The number of ProcessBundleRequest instruction ids that BundleProcessorCache
+# will remember for failed instructions.
+MAX_FAILED_INSTRUCTIONS = 10000
+
class ShortIdCache(object):
""" Cache for MonitoringInfo "short ids"
@@ -274,6 +281,7 @@ class SdkHarness(object):
def _request_process_bundle(self, request):
# type: (beam_fn_api_pb2.InstructionRequest) -> None
+ self._bundle_processor_cache.activate(request.instruction_id)
self._request_execute(request)
def _request_process_bundle_split(self, request):
@@ -286,22 +294,9 @@ class SdkHarness(object):
def _request_process_bundle_action(self, request):
# type: (beam_fn_api_pb2.InstructionRequest) -> None
-
def task():
- instruction_id = getattr(
- request, request.WhichOneof('request')).instruction_id
- # only process progress/split request when a bundle is in processing.
- if (instruction_id in
- self._bundle_processor_cache.active_bundle_processors):
- self._execute(
- lambda: self.create_worker().do_instruction(request), request)
- else:
- self._execute(
- lambda: beam_fn_api_pb2.InstructionResponse(
- instruction_id=request.instruction_id,
- error=('Unknown process bundle instruction {}').format(
- instruction_id)),
- request)
+ self._execute(
+ lambda: self.create_worker().do_instruction(request), request)
self._report_progress_executor.submit(task)
@@ -354,6 +349,8 @@ class BundleProcessorCache(object):
self.fns = fns
self.state_handler_factory = state_handler_factory
self.data_channel_factory = data_channel_factory
+ self.known_not_running_instruction_ids = collections.OrderedDict()
+ self.failed_instruction_ids = collections.OrderedDict()
self.active_bundle_processors = {
} # type: Dict[str, Tuple[str, bundle_processor.BundleProcessor]]
self.cached_bundle_processors = collections.defaultdict(
@@ -361,6 +358,7 @@ class BundleProcessorCache(object):
self.last_access_times = collections.defaultdict(
float) # type: DefaultDict[str, float]
self._schedule_periodic_shutdown()
+ self._lock = threading.Lock()
def register(self, bundle_descriptor):
# type: (beam_fn_api_pb2.ProcessBundleDescriptor) -> None
@@ -368,6 +366,17 @@ class BundleProcessorCache(object):
"""Register a ``beam_fn_api_pb2.ProcessBundleDescriptor`` by its id."""
self.fns[bundle_descriptor.id] = bundle_descriptor
+ def activate(self, instruction_id):
+ # type: (str) -> None
+
+ """Makes the ``instruction_id`` known to the bundle processor.
+
+ Allows ``lookup`` to return ``None``. Necessary if ``lookup`` can occur
+ before ``get``.
+ """
+ with self._lock:
+ self.known_not_running_instruction_ids[instruction_id] = True
+
def get(self, instruction_id, bundle_descriptor_id):
# type: (str, str) -> bundle_processor.BundleProcessor
@@ -376,17 +385,37 @@ class BundleProcessorCache(object):
Moves the ``BundleProcessor`` from the inactive to the active cache.
"""
- try:
- # pop() is threadsafe
- processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
- except IndexError:
- processor = bundle_processor.BundleProcessor(
- self.fns[bundle_descriptor_id],
- self.state_handler_factory.create_state_handler(
- self.fns[bundle_descriptor_id].state_api_service_descriptor),
- self.data_channel_factory)
- self.active_bundle_processors[
+ with self._lock:
+ try:
+ # pop() is threadsafe
+ processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
+ self.active_bundle_processors[
+ instruction_id] = bundle_descriptor_id, processor
+ try:
+ del self.known_not_running_instruction_ids[instruction_id]
+ except KeyError:
+ # The instruction may have not been pre-registered before execution
+ # since activate() may have never been invoked
+ pass
+ return processor
+ except IndexError:
+ pass
+
+ # Make sure we instantiate the processor while not holding the lock.
+ processor = bundle_processor.BundleProcessor(
+ self.fns[bundle_descriptor_id],
+ self.state_handler_factory.create_state_handler(
+ self.fns[bundle_descriptor_id].state_api_service_descriptor),
+ self.data_channel_factory)
+ with self._lock:
+ self.active_bundle_processors[
instruction_id] = bundle_descriptor_id, processor
+ try:
+ del self.known_not_running_instruction_ids[instruction_id]
+ except KeyError:
+ # The instruction may have not been pre-registered before execution
+ # since activate() may have never been invoked
+ pass
return processor
def lookup(self, instruction_id):
@@ -394,17 +423,38 @@ class BundleProcessorCache(object):
"""
Return the requested ``BundleProcessor`` from the cache.
+
+ Will return ``None`` if the BundleProcessor is known but not yet ready. Will
+ raise an error if the ``instruction_id`` is not known or has been discarded.
"""
- return self.active_bundle_processors.get(instruction_id, (None, None))[-1]
+ with self._lock:
+ if instruction_id in self.failed_instruction_ids:
+ raise RuntimeError(
+ 'Bundle processing associated with %s has failed. '
+ 'Check prior failing response for details.' % instruction_id)
+ processor = self.active_bundle_processors.get(
+ instruction_id, (None, None))[-1]
+ if processor:
+ return processor
+ if instruction_id in self.known_not_running_instruction_ids:
+ return None
+ raise RuntimeError('Unknown process bundle id %s.' % instruction_id)
def discard(self, instruction_id):
# type: (str) -> None
"""
- Remove the ``BundleProcessor`` from the cache.
+ Marks the instruction id as failed shutting down the ``BundleProcessor``.
"""
- self.active_bundle_processors[instruction_id][1].shutdown()
- del self.active_bundle_processors[instruction_id]
+ with self._lock:
+ self.failed_instruction_ids[instruction_id] = True
+ while len(self.failed_instruction_ids) > MAX_FAILED_INSTRUCTIONS:
+ self.failed_instruction_ids.popitem(last=False)
+ processor = self.active_bundle_processors[instruction_id][1]
+ del self.active_bundle_processors[instruction_id]
+
+ # Perform the shutdown while not holding the lock.
+ processor.shutdown()
def release(self, instruction_id):
# type: (str) -> None
@@ -415,10 +465,19 @@ class BundleProcessorCache(object):
Resets the ``BundleProcessor`` and moves it from the active to the
inactive cache.
"""
- descriptor_id, processor = self.active_bundle_processors.pop(instruction_id)
+ with self._lock:
+ self.known_not_running_instruction_ids[instruction_id] = True
+ while len(self.known_not_running_instruction_ids
+ ) > MAX_KNOWN_NOT_RUNNING_INSTRUCTIONS:
+ self.known_not_running_instruction_ids.popitem(last=False)
+ descriptor_id, processor = (
+ self.active_bundle_processors.pop(instruction_id))
+
+ # Make sure that we reset the processor while not holding the lock.
processor.reset()
- self.last_access_times[descriptor_id] = time.time()
- self.cached_bundle_processors[descriptor_id].append(processor)
+ with self._lock:
+ self.last_access_times[descriptor_id] = time.time()
+ self.cached_bundle_processors[descriptor_id].append(processor)
def shutdown(self):
"""
@@ -543,15 +602,19 @@ class SdkWorker(object):
instruction_id # type: str
):
# type: (...) -> beam_fn_api_pb2.InstructionResponse
- processor = self.bundle_processor_cache.lookup(request.instruction_id)
- if processor:
- return beam_fn_api_pb2.InstructionResponse(
- instruction_id=instruction_id,
- process_bundle_split=processor.try_split(request))
- else:
+ try:
+ processor = self.bundle_processor_cache.lookup(request.instruction_id)
+ except RuntimeError:
return beam_fn_api_pb2.InstructionResponse(
- instruction_id=instruction_id,
- error='Instruction not running: %s' % instruction_id)
+ instruction_id=instruction_id, error=traceback.format_exc())
+ # Return an empty response if we aren't running. This can happen
+ # if the ProcessBundleRequest has not started or already finished.
+ process_bundle_split = (
+ processor.try_split(request)
+ if processor else beam_fn_api_pb2.ProcessBundleSplitResponse())
+ return beam_fn_api_pb2.InstructionResponse(
+ instruction_id=instruction_id,
+ process_bundle_split=process_bundle_split)
def _log_lull_in_bundle_processor(self, processor):
sampler_info = processor.state_sampler.get_info()
@@ -603,12 +666,18 @@ class SdkWorker(object):
instruction_id # type: str
):
# type: (...) -> beam_fn_api_pb2.InstructionResponse
- # It is an error to get progress for a not-in-flight bundle.
- processor = self.bundle_processor_cache.lookup(request.instruction_id)
+ try:
+ processor = self.bundle_processor_cache.lookup(request.instruction_id)
+ except RuntimeError:
+ return beam_fn_api_pb2.InstructionResponse(
+ instruction_id=instruction_id, error=traceback.format_exc())
if processor:
self._log_lull_in_bundle_processor(processor)
-
- monitoring_infos = processor.monitoring_infos() if processor else []
+ monitoring_infos = processor.monitoring_infos()
+ else:
+ # Return an empty response if we aren't running. This can happen
+ # if the ProcessBundleRequest has not started or already finished.
+ monitoring_infos = []
return beam_fn_api_pb2.InstructionResponse(
instruction_id=instruction_id,
process_bundle_progress=beam_fn_api_pb2.ProcessBundleProgressResponse(
@@ -634,7 +703,11 @@ class SdkWorker(object):
instruction_id # type: str
):
# type: (...) -> beam_fn_api_pb2.InstructionResponse
- processor = self.bundle_processor_cache.lookup(request.instruction_id)
+ try:
+ processor = self.bundle_processor_cache.lookup(request.instruction_id)
+ except RuntimeError:
+ return beam_fn_api_pb2.InstructionResponse(
+ instruction_id=instruction_id, error=traceback.format_exc())
if processor:
try:
finalize_response = processor.finalize_bundle()
@@ -644,10 +717,11 @@ class SdkWorker(object):
except:
self.bundle_processor_cache.discard(request.instruction_id)
raise
- else:
- return beam_fn_api_pb2.InstructionResponse(
- instruction_id=instruction_id,
- error='Instruction not running: %s' % instruction_id)
+ # We can reach this state if there was an erroneous request to finalize
+ # the bundle while it is being initialized or has already been finalized
+ # and released.
+ raise RuntimeError(
+ 'Bundle is not in a finalizable state for %s' % instruction_id)
@contextlib.contextmanager
def maybe_profile(self, instruction_id):
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index 7f430c3..611ab7e 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -32,6 +32,7 @@ from builtins import range
from collections import namedtuple
import grpc
+import hamcrest as hc
import mock
from apache_beam.coders import VarIntCoder
@@ -42,6 +43,7 @@ from apache_beam.portability.api import metrics_pb2
from apache_beam.runners.worker import sdk_worker
from apache_beam.runners.worker import statecache
from apache_beam.runners.worker import statesampler
+from apache_beam.runners.worker.sdk_worker import BundleProcessorCache
from apache_beam.runners.worker.sdk_worker import CachingStateHandler
from apache_beam.runners.worker.sdk_worker import SdkWorker
from apache_beam.utils import thread_pool_executor
@@ -134,6 +136,96 @@ class SdkWorkerTest(unittest.TestCase):
lull_duration_s * 1e9,
threading.current_thread())
+ def test_inactive_bundle_processor_returns_empty_progress_response(self):
+ bundle_processor = mock.MagicMock()
+ bundle_processor_cache = BundleProcessorCache(None, None, {})
+ bundle_processor_cache.activate('instruction_id')
+ worker = SdkWorker(bundle_processor_cache)
+ split_request = beam_fn_api_pb2.InstructionRequest(
+ instruction_id='progress_instruction_id',
+ process_bundle_progress=beam_fn_api_pb2.ProcessBundleProgressRequest(
+ instruction_id='instruction_id'))
+ self.assertEqual(
+ worker.do_instruction(split_request),
+ beam_fn_api_pb2.InstructionResponse(
+ instruction_id='progress_instruction_id',
+ process_bundle_progress=beam_fn_api_pb2.
+ ProcessBundleProgressResponse()))
+
+ # Add a mock bundle processor as if it was running before it's released
+ bundle_processor_cache.active_bundle_processors['instruction_id'] = (
+ 'descriptor_id', bundle_processor)
+ bundle_processor_cache.release('instruction_id')
+ self.assertEqual(
+ worker.do_instruction(split_request),
+ beam_fn_api_pb2.InstructionResponse(
+ instruction_id='progress_instruction_id',
+ process_bundle_progress=beam_fn_api_pb2.
+ ProcessBundleProgressResponse()))
+
+ def test_failed_bundle_processor_returns_failed_progress_response(self):
+ bundle_processor = mock.MagicMock()
+ bundle_processor_cache = BundleProcessorCache(None, None, {})
+ bundle_processor_cache.activate('instruction_id')
+ worker = SdkWorker(bundle_processor_cache)
+
+ # Add a mock bundle processor as if it was running before it's discarded
+ bundle_processor_cache.active_bundle_processors['instruction_id'] = (
+ 'descriptor_id', bundle_processor)
+ bundle_processor_cache.discard('instruction_id')
+ split_request = beam_fn_api_pb2.InstructionRequest(
+ instruction_id='progress_instruction_id',
+ process_bundle_progress=beam_fn_api_pb2.ProcessBundleProgressRequest(
+ instruction_id='instruction_id'))
+ hc.assert_that(
+ worker.do_instruction(split_request).error,
+ hc.contains_string(
+ 'Bundle processing associated with instruction_id has failed'))
+
+ def test_inactive_bundle_processor_returns_empty_split_response(self):
+ bundle_processor = mock.MagicMock()
+ bundle_processor_cache = BundleProcessorCache(None, None, {})
+ bundle_processor_cache.activate('instruction_id')
+ worker = SdkWorker(bundle_processor_cache)
+ split_request = beam_fn_api_pb2.InstructionRequest(
+ instruction_id='split_instruction_id',
+ process_bundle_split=beam_fn_api_pb2.ProcessBundleSplitRequest(
+ instruction_id='instruction_id'))
+ self.assertEqual(
+ worker.do_instruction(split_request),
+ beam_fn_api_pb2.InstructionResponse(
+ instruction_id='split_instruction_id',
+ process_bundle_split=beam_fn_api_pb2.ProcessBundleSplitResponse()))
+
+ # Add a mock bundle processor as if it was running before it's released
+ bundle_processor_cache.active_bundle_processors['instruction_id'] = (
+ 'descriptor_id', bundle_processor)
+ bundle_processor_cache.release('instruction_id')
+ self.assertEqual(
+ worker.do_instruction(split_request),
+ beam_fn_api_pb2.InstructionResponse(
+ instruction_id='split_instruction_id',
+ process_bundle_split=beam_fn_api_pb2.ProcessBundleSplitResponse()))
+
+ def test_failed_bundle_processor_returns_failed_split_response(self):
+ bundle_processor = mock.MagicMock()
+ bundle_processor_cache = BundleProcessorCache(None, None, {})
+ bundle_processor_cache.activate('instruction_id')
+ worker = SdkWorker(bundle_processor_cache)
+
+ # Add a mock bundle processor as if it was running before it's discarded
+ bundle_processor_cache.active_bundle_processors['instruction_id'] = (
+ 'descriptor_id', bundle_processor)
+ bundle_processor_cache.discard('instruction_id')
+ split_request = beam_fn_api_pb2.InstructionRequest(
+ instruction_id='split_instruction_id',
+ process_bundle_split=beam_fn_api_pb2.ProcessBundleSplitRequest(
+ instruction_id='instruction_id'))
+ hc.assert_that(
+ worker.do_instruction(split_request).error,
+ hc.contains_string(
+ 'Bundle processing associated with instruction_id has failed'))
+
def test_log_lull_in_bundle_processor(self):
bundle_processor_cache = mock.MagicMock()
worker = SdkWorker(bundle_processor_cache)