You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2024/03/22 03:55:30 UTC

(beam) branch master updated: Add warning when retrieving state cache inefficiency detected (#30696)

This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bada1082d3c Add warning when retrieving state cache inefficiency detected (#30696)
bada1082d3c is described below

commit bada1082d3c52c7c56a996206bd3a39a78eb4e39
Author: Yi Hu <ya...@google.com>
AuthorDate: Thu Mar 21 23:55:24 2024 -0400

    Add warning when retrieving state cache inefficiency detected (#30696)
    
    * Add warning when retrieving state cache inefficiency detected
    
    * State the specific pipeline option or API
    
    * fix pylint
    
    * formatting
---
 .../apache_beam/runners/worker/sdk_worker.py       | 46 +++++++++++++++++++---
 1 file changed, 41 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 65059ab054f..c4182bdc452 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -1165,6 +1165,11 @@ class GlobalCachingStateHandler(CachingStateHandler):
     self._state_cache = global_state_cache
     self._context = threading.local()
 
+    # state retrieval time statistics
+    self._retrieval_time = 0.0
+    self._get_raw_called = 0
+    self._warn_interval = 60.0
+
   @contextlib.contextmanager
   def process_instruction_id(self, bundle_id, cache_tokens):
     # type: (str, Iterable[beam_fn_api_pb2.ProcessBundleRequest.CacheToken]) -> Iterator[None]
@@ -1278,14 +1283,46 @@ class GlobalCachingStateHandler(CachingStateHandler):
        :return A generator which returns the next element if advanced.
     """
     while True:
-      data, continuation_token = (
-          self._underlying.get_raw(state_key, continuation_token))
-      input_stream = coder_impl.create_InputStream(data)
+      input_stream, continuation_token = self._get_raw(
+          state_key, continuation_token)
+
       while input_stream.size() > 0:
         yield coder.decode_from_stream(input_stream, True)
       if not continuation_token:
         break
 
+  def _get_raw(self,
+      state_key,  # type: beam_fn_api_pb2.StateKey
+      continuation_token  # type: Optional[bytes]
+               ):
+    # type: (...) -> Tuple[coder_impl.create_InputStream, Optional[bytes]]
+
+    """Call underlying get_raw with performance statistics and detection."""
+    start_time = time.time()
+
+    data, continuation_token = (
+        self._underlying.get_raw(state_key, continuation_token))
+
+    input_stream = coder_impl.create_InputStream(data)
+
+    self._retrieval_time += time.time() - start_time
+    self._get_raw_called += 1
+
+    if self._retrieval_time > self._warn_interval:
+      _LOGGER.warning(
+          "Retrieving state %d times costed %.0f seconds. It may be due to "
+          "insufficient state cache size and/or frequent direct access of "
+          "states.\nConsider adding '--max_cache_memory_usage_mb' pipeline "
+          "option to increase state cache size or switch to materialized "
+          "(pvalue.AsList) side input if applicable." %
+          (self._get_raw_called, self._retrieval_time))
+      # reset counts
+      self._retrieval_time = 0.0
+      self._get_raw_called = 0
+      self._warn_interval *= 2
+
+    return input_stream, continuation_token
+
   def _get_cache_token(self, state_key):
     # type: (beam_fn_api_pb2.StateKey) -> Optional[bytes]
     if not self._state_cache.is_cache_enabled():
@@ -1312,9 +1349,8 @@ class GlobalCachingStateHandler(CachingStateHandler):
     """Materialized the first page of data, concatenated with a lazy iterable
     of the rest, if any.
     """
-    data, continuation_token = self._underlying.get_raw(state_key, None)
+    input_stream, continuation_token = self._get_raw(state_key, None)
     head = []
-    input_stream = coder_impl.create_InputStream(data)
     while input_stream.size() > 0:
       head.append(coder.decode_from_stream(input_stream, True))