You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/15 15:31:46 UTC

[GitHub] [beam] mxm opened a new pull request #12012: [BEAM-10260] Fix continuation token support with statecache

mxm opened a new pull request #12012:
URL: https://github.com/apache/beam/pull/12012


   The continuation mode was always-on by mistake. Also, continuation mode would
   arbitrarily nest appended state via iterators which would lead to the following
   error:
   
   ```
   RecursionError: maximum recursion depth exceeded while calling a Python object
   ```
   
   In case of continuations tokens, we do not cache anything anymore after the
   continuation. Caching after the continuation token would defeat the purpose of
   using continuation tokens for large state. A test was added for continuation
   tokens.
   
   The extend operation was removed from the statecache in favor of using only
   put/get/clear which avoids any race conditions with the cache eviction.
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #12012: [BEAM-10260] Fix continuation token support with statecache

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12012:
URL: https://github.com/apache/beam/pull/12012#issuecomment-644210200


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm merged pull request #12012: [BEAM-10260] Fix continuation token support with statecache

Posted by GitBox <gi...@apache.org>.
mxm merged pull request #12012:
URL: https://github.com/apache/beam/pull/12012


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tweise commented on a change in pull request #12012: [BEAM-10260] Fix continuation token support with statecache

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #12012:
URL: https://github.com/apache/beam/pull/12012#discussion_r440568137



##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -941,15 +942,29 @@ def extend(self,
              is_cached=False
             ):
     # type: (...) -> _Future
+    # Make sure the input is a list of elements
+    elements = list(elements)

Review comment:
       Isn't `cached_value.extend(elements)` the only place where this matters?

##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -941,15 +942,29 @@ def extend(self,
              is_cached=False
             ):
     # type: (...) -> _Future
+    # Make sure the input is a list of elements
+    elements = list(elements)
     cache_token = self._get_cache_token(state_key, is_cached)
     if cache_token:
       # Update the cache
       cache_key = self._convert_to_cache_key(state_key)
-      if self._state_cache.get(cache_key, cache_token) is None:
-        # We have never cached this key before, first initialize cache
-        self.blocking_get(state_key, coder, is_cached=True)
-      # Now update the values in the cache
-      self._state_cache.extend(cache_key, cache_token, elements)
+      cached_value = self._state_cache.get(cache_key, cache_token)
+      # Keep in mind that the state for this key can be evicted
+      # while executing this function. Either read or write to the cache
+      # but never do both here!
+      if cached_value is None:
+        # We have never cached this key before, first retrieve state
+        cached_value = self.blocking_get(state_key, coder)
+      # Just extend the already cached value
+      if isinstance(cached_value, list):
+        # The state is fully cached and can be extended
+        cached_value.extend(elements)
+      elif isinstance(cached_value, itertools.chain):
+        # The state is too large to be fully cached (continuation token used),

Review comment:
       Why would `extend` be called in this case? Should this be an error?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #12012: [BEAM-10260] Fix continuation token support with statecache

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #12012:
URL: https://github.com/apache/beam/pull/12012#discussion_r440745656



##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -941,15 +942,29 @@ def extend(self,
              is_cached=False
             ):
     # type: (...) -> _Future
+    # Make sure the input is a list of elements
+    elements = list(elements)

Review comment:
       I've moved the call down to execute only when caching is enabled.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tweise commented on a change in pull request #12012: [BEAM-10260] Fix continuation token support with statecache

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #12012:
URL: https://github.com/apache/beam/pull/12012#discussion_r440981766



##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -941,15 +942,29 @@ def extend(self,
              is_cached=False
             ):
     # type: (...) -> _Future
+    # Make sure the input is a list of elements
+    elements = list(elements)
     cache_token = self._get_cache_token(state_key, is_cached)
     if cache_token:
       # Update the cache
       cache_key = self._convert_to_cache_key(state_key)
-      if self._state_cache.get(cache_key, cache_token) is None:
-        # We have never cached this key before, first initialize cache
-        self.blocking_get(state_key, coder, is_cached=True)
-      # Now update the values in the cache
-      self._state_cache.extend(cache_key, cache_token, elements)
+      cached_value = self._state_cache.get(cache_key, cache_token)
+      # Keep in mind that the state for this key can be evicted
+      # while executing this function. Either read or write to the cache
+      # but never do both here!
+      if cached_value is None:
+        # We have never cached this key before, first retrieve state
+        cached_value = self.blocking_get(state_key, coder)
+      # Just extend the already cached value
+      if isinstance(cached_value, list):
+        # The state is fully cached and can be extended
+        cached_value.extend(elements)
+      elif isinstance(cached_value, itertools.chain):
+        # The state is too large to be fully cached (continuation token used),

Review comment:
       Makes sense, I missed that the new elements are still written to the state handler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #12012: [BEAM-10260] Fix continuation token support with statecache

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #12012:
URL: https://github.com/apache/beam/pull/12012#discussion_r440644321



##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -941,15 +942,29 @@ def extend(self,
              is_cached=False
             ):
     # type: (...) -> _Future
+    # Make sure the input is a list of elements
+    elements = list(elements)

Review comment:
       Yes, though we will iterate twice over `elements`. 1) for the cache 2) for serializing the elements for the runner. Materializing to a list ensures that the results will be the same. An Iterable could theoretically return different results.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #12012: [BEAM-10260] Fix continuation token support with statecache

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #12012:
URL: https://github.com/apache/beam/pull/12012#discussion_r440643909



##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -941,15 +942,29 @@ def extend(self,
              is_cached=False
             ):
     # type: (...) -> _Future
+    # Make sure the input is a list of elements
+    elements = list(elements)
     cache_token = self._get_cache_token(state_key, is_cached)
     if cache_token:
       # Update the cache
       cache_key = self._convert_to_cache_key(state_key)
-      if self._state_cache.get(cache_key, cache_token) is None:
-        # We have never cached this key before, first initialize cache
-        self.blocking_get(state_key, coder, is_cached=True)
-      # Now update the values in the cache
-      self._state_cache.extend(cache_key, cache_token, elements)
+      cached_value = self._state_cache.get(cache_key, cache_token)
+      # Keep in mind that the state for this key can be evicted
+      # while executing this function. Either read or write to the cache
+      # but never do both here!
+      if cached_value is None:
+        # We have never cached this key before, first retrieve state
+        cached_value = self.blocking_get(state_key, coder)
+      # Just extend the already cached value
+      if isinstance(cached_value, list):
+        # The state is fully cached and can be extended
+        cached_value.extend(elements)
+      elif isinstance(cached_value, itertools.chain):
+        # The state is too large to be fully cached (continuation token used),

Review comment:
       Extend is called when new state gets appended. In line 963 we know that the cache already contains the head of the state + an iterator which retrieves the rest from the runner due to the use of continuation tokens for large state. We don't want to cache further values, that's why we skip updating the cache and just send the new values to the Runner (below). When we retrieve this state from the cache, it will have the head cached and retrieve the rest from the Runner, including the appended values.
   
   Should it be an error? No, because we still want to be able to append to large state.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #12012: [BEAM-10260] Fix continuation token support with statecache

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #12012:
URL: https://github.com/apache/beam/pull/12012#discussion_r440644321



##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -941,15 +942,29 @@ def extend(self,
              is_cached=False
             ):
     # type: (...) -> _Future
+    # Make sure the input is a list of elements
+    elements = list(elements)

Review comment:
       Yes, though we will iterate twice over elements. 1) for the cache 2) for serializing the elements for the runner. Materializing to a list ensures that the results will be the same. An Iterable could theoretically return different results.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org