You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/16 07:33:23 UTC

[GitHub] [beam] mxm commented on a change in pull request #12012: [BEAM-10260] Fix continuation token support with statecache

mxm commented on a change in pull request #12012:
URL: https://github.com/apache/beam/pull/12012#discussion_r440643909



##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -941,15 +942,29 @@ def extend(self,
              is_cached=False
             ):
     # type: (...) -> _Future
+    # Make sure the input is a list of elements
+    elements = list(elements)
     cache_token = self._get_cache_token(state_key, is_cached)
     if cache_token:
       # Update the cache
       cache_key = self._convert_to_cache_key(state_key)
-      if self._state_cache.get(cache_key, cache_token) is None:
-        # We have never cached this key before, first initialize cache
-        self.blocking_get(state_key, coder, is_cached=True)
-      # Now update the values in the cache
-      self._state_cache.extend(cache_key, cache_token, elements)
+      cached_value = self._state_cache.get(cache_key, cache_token)
+      # Keep in mind that the state for this key can be evicted
+      # while executing this function. Either read or write to the cache
+      # but never do both here!
+      if cached_value is None:
+        # We have never cached this key before, first retrieve state
+        cached_value = self.blocking_get(state_key, coder)
+      # Just extend the already cached value
+      if isinstance(cached_value, list):
+        # The state is fully cached and can be extended
+        cached_value.extend(elements)
+      elif isinstance(cached_value, itertools.chain):
+        # The state is too large to be fully cached (continuation token used),

Review comment:
       Extend is called when new state gets appended. In line 963 we know that the cache already contains the head of the state + an iterator which retrieves the rest from the runner due to the use of continuation tokens for large state. We don't want to cache further values, that's why we skip updating the cache and just send the new values to the Runner (below). When we retrieve this state from the cache, it will have the head cached and retrieve the rest from the Runner, including the appended values.
   
   Should it be an error? No, because we still want to be able to append to large state.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org