You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/16 04:06:19 UTC

[GitHub] [beam] tweise commented on a change in pull request #12012: [BEAM-10260] Fix continuation token support with statecache

tweise commented on a change in pull request #12012:
URL: https://github.com/apache/beam/pull/12012#discussion_r440568137



##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -941,15 +942,29 @@ def extend(self,
              is_cached=False
             ):
     # type: (...) -> _Future
+    # Make sure the input is a list of elements
+    elements = list(elements)

Review comment:
       Isn't `cached_value.extend(elements)` the only place where this matters?

##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -941,15 +942,29 @@ def extend(self,
              is_cached=False
             ):
     # type: (...) -> _Future
+    # Make sure the input is a list of elements
+    elements = list(elements)
     cache_token = self._get_cache_token(state_key, is_cached)
     if cache_token:
       # Update the cache
       cache_key = self._convert_to_cache_key(state_key)
-      if self._state_cache.get(cache_key, cache_token) is None:
-        # We have never cached this key before, first initialize cache
-        self.blocking_get(state_key, coder, is_cached=True)
-      # Now update the values in the cache
-      self._state_cache.extend(cache_key, cache_token, elements)
+      cached_value = self._state_cache.get(cache_key, cache_token)
+      # Keep in mind that the state for this key can be evicted
+      # while executing this function. Either read or write to the cache
+      # but never do both here!
+      if cached_value is None:
+        # We have never cached this key before, first retrieve state
+        cached_value = self.blocking_get(state_key, coder)
+      # Just extend the already cached value
+      if isinstance(cached_value, list):
+        # The state is fully cached and can be extended
+        cached_value.extend(elements)
+      elif isinstance(cached_value, itertools.chain):
+        # The state is too large to be fully cached (continuation token used),

Review comment:
       Why would `extend` be called in this case? Should this be an error?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org