You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/09/25 21:13:54 UTC

[beam] branch master updated: [BEAM-10959] Store a fixed amount of known process bundle instructions to prevent failures due to concurrency within a runner. (#12934)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3840369  [BEAM-10959] Store a fixed amount of known process bundle instructions to prevent failures due to concurrency within a runner. (#12934)
3840369 is described below

commit 38403698934c422968320a0bdd834e8b9ae598a3
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Fri Sep 25 14:13:21 2020 -0700

    [BEAM-10959] Store a fixed amount of known process bundle instructions to prevent failures due to concurrency within a runner. (#12934)
    
    * [BEAM-10959] Store a fixed amount of known process bundle instructions to prevent failures due to concurrency within a runner.
    
    * fixup! Fix typo in typing information
    
    * fixup! Address PR comments
---
 .../runners/portability/fn_api_runner/fn_runner.py |  14 +-
 .../apache_beam/runners/worker/sdk_worker.py       | 172 +++++++++++++++------
 .../apache_beam/runners/worker/sdk_worker_test.py  |  92 +++++++++++
 3 files changed, 225 insertions(+), 53 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index 4ce14e2..9c43f33 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -793,12 +793,15 @@ class BundleManager(object):
         split_response = self._worker_handler.control_conn.push(
             split_request).get()  # type: beam_fn_api_pb2.InstructionResponse
         for t in (0.05, 0.1, 0.2):
-          waiting = ('Instruction not running', 'not yet scheduled')
-          if any(msg in split_response.error for msg in waiting):
+          if ('Unknown process bundle' in split_response.error or
+              split_response.process_bundle_split ==
+              beam_fn_api_pb2.ProcessBundleSplitResponse()):
             time.sleep(t)
             split_response = self._worker_handler.control_conn.push(
                 split_request).get()
-        if 'Unknown process bundle' in split_response.error:
+        if ('Unknown process bundle' in split_response.error or
+            split_response.process_bundle_split ==
+            beam_fn_api_pb2.ProcessBundleSplitResponse()):
           # It may have finished too fast.
           split_result = None
         elif split_response.error:
@@ -894,7 +897,10 @@ class BundleManager(object):
       finalize_request = beam_fn_api_pb2.InstructionRequest(
           finalize_bundle=beam_fn_api_pb2.FinalizeBundleRequest(
               instruction_id=process_bundle_id))
-      self._worker_handler.control_conn.push(finalize_request)
+      finalize_response = self._worker_handler.control_conn.push(
+          finalize_request).get()
+      if finalize_response.error:
+        raise RuntimeError(finalize_response.error)
 
     return result, split_results
 
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 1a8e093..b6070dc 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -87,6 +87,13 @@ LOG_LULL_FULL_THREAD_DUMP_INTERVAL_S = 20 * 60
 # Full thread dump is performed if the lull is more than 20 minutes.
 LOG_LULL_FULL_THREAD_DUMP_LULL_S = 20 * 60
 
+# The number of ProcessBundleRequest instruction ids the BundleProcessorCache
+# will remember for not running instructions.
+MAX_KNOWN_NOT_RUNNING_INSTRUCTIONS = 1000
+# The number of ProcessBundleRequest instruction ids that BundleProcessorCache
+# will remember for failed instructions.
+MAX_FAILED_INSTRUCTIONS = 10000
+
 
 class ShortIdCache(object):
   """ Cache for MonitoringInfo "short ids"
@@ -274,6 +281,7 @@ class SdkHarness(object):
 
   def _request_process_bundle(self, request):
     # type: (beam_fn_api_pb2.InstructionRequest) -> None
+    self._bundle_processor_cache.activate(request.instruction_id)
     self._request_execute(request)
 
   def _request_process_bundle_split(self, request):
@@ -286,22 +294,9 @@ class SdkHarness(object):
 
   def _request_process_bundle_action(self, request):
     # type: (beam_fn_api_pb2.InstructionRequest) -> None
-
     def task():
-      instruction_id = getattr(
-          request, request.WhichOneof('request')).instruction_id
-      # only process progress/split request when a bundle is in processing.
-      if (instruction_id in
-          self._bundle_processor_cache.active_bundle_processors):
-        self._execute(
-            lambda: self.create_worker().do_instruction(request), request)
-      else:
-        self._execute(
-            lambda: beam_fn_api_pb2.InstructionResponse(
-                instruction_id=request.instruction_id,
-                error=('Unknown process bundle instruction {}').format(
-                    instruction_id)),
-            request)
+      self._execute(
+          lambda: self.create_worker().do_instruction(request), request)
 
     self._report_progress_executor.submit(task)
 
@@ -354,6 +349,8 @@ class BundleProcessorCache(object):
     self.fns = fns
     self.state_handler_factory = state_handler_factory
     self.data_channel_factory = data_channel_factory
+    self.known_not_running_instruction_ids = collections.OrderedDict()
+    self.failed_instruction_ids = collections.OrderedDict()
     self.active_bundle_processors = {
     }  # type: Dict[str, Tuple[str, bundle_processor.BundleProcessor]]
     self.cached_bundle_processors = collections.defaultdict(
@@ -361,6 +358,7 @@ class BundleProcessorCache(object):
     self.last_access_times = collections.defaultdict(
         float)  # type: DefaultDict[str, float]
     self._schedule_periodic_shutdown()
+    self._lock = threading.Lock()
 
   def register(self, bundle_descriptor):
     # type: (beam_fn_api_pb2.ProcessBundleDescriptor) -> None
@@ -368,6 +366,17 @@ class BundleProcessorCache(object):
     """Register a ``beam_fn_api_pb2.ProcessBundleDescriptor`` by its id."""
     self.fns[bundle_descriptor.id] = bundle_descriptor
 
+  def activate(self, instruction_id):
+    # type: (str) -> None
+
+    """Makes the ``instruction_id`` known to the bundle processor.
+
+    Allows ``lookup`` to return ``None``. Necessary if ``lookup`` can occur
+    before ``get``.
+    """
+    with self._lock:
+      self.known_not_running_instruction_ids[instruction_id] = True
+
   def get(self, instruction_id, bundle_descriptor_id):
     # type: (str, str) -> bundle_processor.BundleProcessor
 
@@ -376,17 +385,37 @@ class BundleProcessorCache(object):
 
     Moves the ``BundleProcessor`` from the inactive to the active cache.
     """
-    try:
-      # pop() is threadsafe
-      processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
-    except IndexError:
-      processor = bundle_processor.BundleProcessor(
-          self.fns[bundle_descriptor_id],
-          self.state_handler_factory.create_state_handler(
-              self.fns[bundle_descriptor_id].state_api_service_descriptor),
-          self.data_channel_factory)
-    self.active_bundle_processors[
+    with self._lock:
+      try:
+        # pop() is threadsafe
+        processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
+        self.active_bundle_processors[
+          instruction_id] = bundle_descriptor_id, processor
+        try:
+          del self.known_not_running_instruction_ids[instruction_id]
+        except KeyError:
+          # The instruction may have not been pre-registered before execution
+          # since activate() may have never been invoked
+          pass
+        return processor
+      except IndexError:
+        pass
+
+    # Make sure we instantiate the processor while not holding the lock.
+    processor = bundle_processor.BundleProcessor(
+        self.fns[bundle_descriptor_id],
+        self.state_handler_factory.create_state_handler(
+            self.fns[bundle_descriptor_id].state_api_service_descriptor),
+        self.data_channel_factory)
+    with self._lock:
+      self.active_bundle_processors[
         instruction_id] = bundle_descriptor_id, processor
+      try:
+        del self.known_not_running_instruction_ids[instruction_id]
+      except KeyError:
+        # The instruction may have not been pre-registered before execution
+        # since activate() may have never been invoked
+        pass
     return processor
 
   def lookup(self, instruction_id):
@@ -394,17 +423,38 @@ class BundleProcessorCache(object):
 
     """
     Return the requested ``BundleProcessor`` from the cache.
+
+    Will return ``None`` if the BundleProcessor is known but not yet ready. Will
+    raise an error if the ``instruction_id`` is not known or has been discarded.
     """
-    return self.active_bundle_processors.get(instruction_id, (None, None))[-1]
+    with self._lock:
+      if instruction_id in self.failed_instruction_ids:
+        raise RuntimeError(
+            'Bundle processing associated with %s has failed. '
+            'Check prior failing response for details.' % instruction_id)
+      processor = self.active_bundle_processors.get(
+          instruction_id, (None, None))[-1]
+      if processor:
+        return processor
+      if instruction_id in self.known_not_running_instruction_ids:
+        return None
+      raise RuntimeError('Unknown process bundle id %s.' % instruction_id)
 
   def discard(self, instruction_id):
     # type: (str) -> None
 
     """
-    Remove the ``BundleProcessor`` from the cache.
+    Marks the instruction id as failed shutting down the ``BundleProcessor``.
     """
-    self.active_bundle_processors[instruction_id][1].shutdown()
-    del self.active_bundle_processors[instruction_id]
+    with self._lock:
+      self.failed_instruction_ids[instruction_id] = True
+      while len(self.failed_instruction_ids) > MAX_FAILED_INSTRUCTIONS:
+        self.failed_instruction_ids.popitem(last=False)
+      processor = self.active_bundle_processors[instruction_id][1]
+      del self.active_bundle_processors[instruction_id]
+
+    # Perform the shutdown while not holding the lock.
+    processor.shutdown()
 
   def release(self, instruction_id):
     # type: (str) -> None
@@ -415,10 +465,19 @@ class BundleProcessorCache(object):
     Resets the ``BundleProcessor`` and moves it from the active to the
     inactive cache.
     """
-    descriptor_id, processor = self.active_bundle_processors.pop(instruction_id)
+    with self._lock:
+      self.known_not_running_instruction_ids[instruction_id] = True
+      while len(self.known_not_running_instruction_ids
+                ) > MAX_KNOWN_NOT_RUNNING_INSTRUCTIONS:
+        self.known_not_running_instruction_ids.popitem(last=False)
+      descriptor_id, processor = (
+          self.active_bundle_processors.pop(instruction_id))
+
+    # Make sure that we reset the processor while not holding the lock.
     processor.reset()
-    self.last_access_times[descriptor_id] = time.time()
-    self.cached_bundle_processors[descriptor_id].append(processor)
+    with self._lock:
+      self.last_access_times[descriptor_id] = time.time()
+      self.cached_bundle_processors[descriptor_id].append(processor)
 
   def shutdown(self):
     """
@@ -543,15 +602,19 @@ class SdkWorker(object):
                            instruction_id  # type: str
                           ):
     # type: (...) -> beam_fn_api_pb2.InstructionResponse
-    processor = self.bundle_processor_cache.lookup(request.instruction_id)
-    if processor:
-      return beam_fn_api_pb2.InstructionResponse(
-          instruction_id=instruction_id,
-          process_bundle_split=processor.try_split(request))
-    else:
+    try:
+      processor = self.bundle_processor_cache.lookup(request.instruction_id)
+    except RuntimeError:
       return beam_fn_api_pb2.InstructionResponse(
-          instruction_id=instruction_id,
-          error='Instruction not running: %s' % instruction_id)
+          instruction_id=instruction_id, error=traceback.format_exc())
+    # Return an empty response if we aren't running. This can happen
+    # if the ProcessBundleRequest has not started or already finished.
+    process_bundle_split = (
+        processor.try_split(request)
+        if processor else beam_fn_api_pb2.ProcessBundleSplitResponse())
+    return beam_fn_api_pb2.InstructionResponse(
+        instruction_id=instruction_id,
+        process_bundle_split=process_bundle_split)
 
   def _log_lull_in_bundle_processor(self, processor):
     sampler_info = processor.state_sampler.get_info()
@@ -603,12 +666,18 @@ class SdkWorker(object):
                               instruction_id  # type: str
                              ):
     # type: (...) -> beam_fn_api_pb2.InstructionResponse
-    # It is an error to get progress for a not-in-flight bundle.
-    processor = self.bundle_processor_cache.lookup(request.instruction_id)
+    try:
+      processor = self.bundle_processor_cache.lookup(request.instruction_id)
+    except RuntimeError:
+      return beam_fn_api_pb2.InstructionResponse(
+          instruction_id=instruction_id, error=traceback.format_exc())
     if processor:
       self._log_lull_in_bundle_processor(processor)
-
-    monitoring_infos = processor.monitoring_infos() if processor else []
+      monitoring_infos = processor.monitoring_infos()
+    else:
+      # Return an empty response if we aren't running. This can happen
+      # if the ProcessBundleRequest has not started or already finished.
+      monitoring_infos = []
     return beam_fn_api_pb2.InstructionResponse(
         instruction_id=instruction_id,
         process_bundle_progress=beam_fn_api_pb2.ProcessBundleProgressResponse(
@@ -634,7 +703,11 @@ class SdkWorker(object):
                       instruction_id  # type: str
                      ):
     # type: (...) -> beam_fn_api_pb2.InstructionResponse
-    processor = self.bundle_processor_cache.lookup(request.instruction_id)
+    try:
+      processor = self.bundle_processor_cache.lookup(request.instruction_id)
+    except RuntimeError:
+      return beam_fn_api_pb2.InstructionResponse(
+          instruction_id=instruction_id, error=traceback.format_exc())
     if processor:
       try:
         finalize_response = processor.finalize_bundle()
@@ -644,10 +717,11 @@ class SdkWorker(object):
       except:
         self.bundle_processor_cache.discard(request.instruction_id)
         raise
-    else:
-      return beam_fn_api_pb2.InstructionResponse(
-          instruction_id=instruction_id,
-          error='Instruction not running: %s' % instruction_id)
+    # We can reach this state if there was an erroneous request to finalize
+    # the bundle while it is being initialized or has already been finalized
+    # and released.
+    raise RuntimeError(
+        'Bundle is not in a finalizable state for %s' % instruction_id)
 
   @contextlib.contextmanager
   def maybe_profile(self, instruction_id):
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index 7f430c3..611ab7e 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -32,6 +32,7 @@ from builtins import range
 from collections import namedtuple
 
 import grpc
+import hamcrest as hc
 import mock
 
 from apache_beam.coders import VarIntCoder
@@ -42,6 +43,7 @@ from apache_beam.portability.api import metrics_pb2
 from apache_beam.runners.worker import sdk_worker
 from apache_beam.runners.worker import statecache
 from apache_beam.runners.worker import statesampler
+from apache_beam.runners.worker.sdk_worker import BundleProcessorCache
 from apache_beam.runners.worker.sdk_worker import CachingStateHandler
 from apache_beam.runners.worker.sdk_worker import SdkWorker
 from apache_beam.utils import thread_pool_executor
@@ -134,6 +136,96 @@ class SdkWorkerTest(unittest.TestCase):
         lull_duration_s * 1e9,
         threading.current_thread())
 
+  def test_inactive_bundle_processor_returns_empty_progress_response(self):
+    bundle_processor = mock.MagicMock()
+    bundle_processor_cache = BundleProcessorCache(None, None, {})
+    bundle_processor_cache.activate('instruction_id')
+    worker = SdkWorker(bundle_processor_cache)
+    split_request = beam_fn_api_pb2.InstructionRequest(
+        instruction_id='progress_instruction_id',
+        process_bundle_progress=beam_fn_api_pb2.ProcessBundleProgressRequest(
+            instruction_id='instruction_id'))
+    self.assertEqual(
+        worker.do_instruction(split_request),
+        beam_fn_api_pb2.InstructionResponse(
+            instruction_id='progress_instruction_id',
+            process_bundle_progress=beam_fn_api_pb2.
+            ProcessBundleProgressResponse()))
+
+    # Add a mock bundle processor as if it was running before it's released
+    bundle_processor_cache.active_bundle_processors['instruction_id'] = (
+        'descriptor_id', bundle_processor)
+    bundle_processor_cache.release('instruction_id')
+    self.assertEqual(
+        worker.do_instruction(split_request),
+        beam_fn_api_pb2.InstructionResponse(
+            instruction_id='progress_instruction_id',
+            process_bundle_progress=beam_fn_api_pb2.
+            ProcessBundleProgressResponse()))
+
+  def test_failed_bundle_processor_returns_failed_progress_response(self):
+    bundle_processor = mock.MagicMock()
+    bundle_processor_cache = BundleProcessorCache(None, None, {})
+    bundle_processor_cache.activate('instruction_id')
+    worker = SdkWorker(bundle_processor_cache)
+
+    # Add a mock bundle processor as if it was running before it's discarded
+    bundle_processor_cache.active_bundle_processors['instruction_id'] = (
+        'descriptor_id', bundle_processor)
+    bundle_processor_cache.discard('instruction_id')
+    split_request = beam_fn_api_pb2.InstructionRequest(
+        instruction_id='progress_instruction_id',
+        process_bundle_progress=beam_fn_api_pb2.ProcessBundleProgressRequest(
+            instruction_id='instruction_id'))
+    hc.assert_that(
+        worker.do_instruction(split_request).error,
+        hc.contains_string(
+            'Bundle processing associated with instruction_id has failed'))
+
+  def test_inactive_bundle_processor_returns_empty_split_response(self):
+    bundle_processor = mock.MagicMock()
+    bundle_processor_cache = BundleProcessorCache(None, None, {})
+    bundle_processor_cache.activate('instruction_id')
+    worker = SdkWorker(bundle_processor_cache)
+    split_request = beam_fn_api_pb2.InstructionRequest(
+        instruction_id='split_instruction_id',
+        process_bundle_split=beam_fn_api_pb2.ProcessBundleSplitRequest(
+            instruction_id='instruction_id'))
+    self.assertEqual(
+        worker.do_instruction(split_request),
+        beam_fn_api_pb2.InstructionResponse(
+            instruction_id='split_instruction_id',
+            process_bundle_split=beam_fn_api_pb2.ProcessBundleSplitResponse()))
+
+    # Add a mock bundle processor as if it was running before it's released
+    bundle_processor_cache.active_bundle_processors['instruction_id'] = (
+        'descriptor_id', bundle_processor)
+    bundle_processor_cache.release('instruction_id')
+    self.assertEqual(
+        worker.do_instruction(split_request),
+        beam_fn_api_pb2.InstructionResponse(
+            instruction_id='split_instruction_id',
+            process_bundle_split=beam_fn_api_pb2.ProcessBundleSplitResponse()))
+
+  def test_failed_bundle_processor_returns_failed_split_response(self):
+    bundle_processor = mock.MagicMock()
+    bundle_processor_cache = BundleProcessorCache(None, None, {})
+    bundle_processor_cache.activate('instruction_id')
+    worker = SdkWorker(bundle_processor_cache)
+
+    # Add a mock bundle processor as if it was running before it's discarded
+    bundle_processor_cache.active_bundle_processors['instruction_id'] = (
+        'descriptor_id', bundle_processor)
+    bundle_processor_cache.discard('instruction_id')
+    split_request = beam_fn_api_pb2.InstructionRequest(
+        instruction_id='split_instruction_id',
+        process_bundle_split=beam_fn_api_pb2.ProcessBundleSplitRequest(
+            instruction_id='instruction_id'))
+    hc.assert_that(
+        worker.do_instruction(split_request).error,
+        hc.contains_string(
+            'Bundle processing associated with instruction_id has failed'))
+
   def test_log_lull_in_bundle_processor(self):
     bundle_processor_cache = mock.MagicMock()
     worker = SdkWorker(bundle_processor_cache)