You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2024/03/22 03:55:30 UTC
(beam) branch master updated: Add warning when retrieving state cache inefficiency detected (#30696)
This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bada1082d3c Add warning when retrieving state cache inefficiency detected (#30696)
bada1082d3c is described below
commit bada1082d3c52c7c56a996206bd3a39a78eb4e39
Author: Yi Hu <ya...@google.com>
AuthorDate: Thu Mar 21 23:55:24 2024 -0400
Add warning when retrieving state cache inefficiency detected (#30696)
* Add warning when retrieving state cache inefficiency detected
* State the specific pipeline option or API
* fix pylint
* formatting
---
.../apache_beam/runners/worker/sdk_worker.py | 46 +++++++++++++++++++---
1 file changed, 41 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 65059ab054f..c4182bdc452 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -1165,6 +1165,11 @@ class GlobalCachingStateHandler(CachingStateHandler):
self._state_cache = global_state_cache
self._context = threading.local()
+ # state retrieval time statistics
+ self._retrieval_time = 0.0
+ self._get_raw_called = 0
+ self._warn_interval = 60.0
+
@contextlib.contextmanager
def process_instruction_id(self, bundle_id, cache_tokens):
# type: (str, Iterable[beam_fn_api_pb2.ProcessBundleRequest.CacheToken]) -> Iterator[None]
@@ -1278,14 +1283,46 @@ class GlobalCachingStateHandler(CachingStateHandler):
:return A generator which returns the next element if advanced.
"""
while True:
- data, continuation_token = (
- self._underlying.get_raw(state_key, continuation_token))
- input_stream = coder_impl.create_InputStream(data)
+ input_stream, continuation_token = self._get_raw(
+ state_key, continuation_token)
+
while input_stream.size() > 0:
yield coder.decode_from_stream(input_stream, True)
if not continuation_token:
break
+ def _get_raw(self,
+ state_key, # type: beam_fn_api_pb2.StateKey
+ continuation_token # type: Optional[bytes]
+ ):
+ # type: (...) -> Tuple[coder_impl.create_InputStream, Optional[bytes]]
+
+ """Call underlying get_raw with performance statistics and detection."""
+ start_time = time.time()
+
+ data, continuation_token = (
+ self._underlying.get_raw(state_key, continuation_token))
+
+ input_stream = coder_impl.create_InputStream(data)
+
+ self._retrieval_time += time.time() - start_time
+ self._get_raw_called += 1
+
+ if self._retrieval_time > self._warn_interval:
+ _LOGGER.warning(
+ "Retrieving state %d times costed %.0f seconds. It may be due to "
+ "insufficient state cache size and/or frequent direct access of "
+ "states.\nConsider adding '--max_cache_memory_usage_mb' pipeline "
+ "option to increase state cache size or switch to materialized "
+ "(pvalue.AsList) side input if applicable." %
+ (self._get_raw_called, self._retrieval_time))
+ # reset counts
+ self._retrieval_time = 0.0
+ self._get_raw_called = 0
+ self._warn_interval *= 2
+
+ return input_stream, continuation_token
+
def _get_cache_token(self, state_key):
# type: (beam_fn_api_pb2.StateKey) -> Optional[bytes]
if not self._state_cache.is_cache_enabled():
@@ -1312,9 +1349,8 @@ class GlobalCachingStateHandler(CachingStateHandler):
"""Materialized the first page of data, concatenated with a lazy iterable
of the rest, if any.
"""
- data, continuation_token = self._underlying.get_raw(state_key, None)
+ input_stream, continuation_token = self._get_raw(state_key, None)
head = []
- input_stream = coder_impl.create_InputStream(data)
while input_stream.size() > 0:
head.append(coder.decode_from_stream(input_stream, True))