You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/06 17:25:49 UTC

[GitHub] [beam] lukecwik opened a new pull request, #23046: [#23000] Update the Python SDK harness state cache to be a loading cache

lukecwik opened a new pull request, #23046:
URL: https://github.com/apache/beam/pull/23046

   Also record statistics for the cost of performing a load.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryanthompson591 commented on pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
ryanthompson591 commented on PR #23046:
URL: https://github.com/apache/beam/pull/23046#issuecomment-1243932997

   Is this still being changed? There were a couple outstanding questions I had, but I think they were simple.  Overall this looked pretty good to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #23046:
URL: https://github.com/apache/beam/pull/23046#discussion_r966397992


##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -1149,22 +1149,9 @@ def blocking_get(
       return self._lazy_iterator(state_key, coder)
     # Cache lookup
     cache_state_key = self._convert_to_cache_key(state_key)
-    cached_value = self._state_cache.get(cache_state_key, cache_token)
-    if cached_value is None:
-      # Cache miss, need to retrieve from the Runner
-      # Further size estimation or the use of the continuation token on the
-      # runner side could fall back to materializing one item at a time.
-      # https://jira.apache.org/jira/browse/BEAM-8297
-      materialized = cached_value = (
-          self._partially_cached_iterable(state_key, coder))
-      if isinstance(materialized, (list, self.ContinuationIterable)):
-        self._state_cache.put(cache_state_key, cache_token, materialized)
-      else:
-        _LOGGER.error(
-            "Uncacheable type %s for key %s. Not caching.",
-            materialized,
-            state_key)
-    return cached_value
+    return self._state_cache.get(
+        (cache_state_key, cache_token),

Review Comment:
   The cache token is referring to this [token](https://github.com/apache/beam/blob/f0cd27596e3a59d6ee013ee2db232d24f46ad70a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L297) that represents all user state. We still need to create a cache key that is scoped to the specific user state (e.g. which ptransform and what state key).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #23046:
URL: https://github.com/apache/beam/pull/23046#discussion_r966414091


##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -87,18 +88,46 @@ def get_referents_for_cache(*objs):
   return rval
 
 
+class _LoadingValue(WeightedValue):
+  """Allows concurrent users of the cache to wait for a value to be loaded."""
+  def __init__(self):
+    # type: () -> None
+    super().__init__(None, 1)
+    self._wait_event = threading.Event()
+
+  def load(self, key, loading_fn):
+    # type: (Any, Callable[[Any], Any]) -> None
+    try:
+      self._value = loading_fn(key)
+    except Exception as err:
+      self._error = err
+    finally:
+      self._wait_event.set()
+
+  def value(self):
+    # type: () -> Any
+    self._wait_event.wait()
+    err = getattr(self, "_error", None)
+    if err:
+      raise err
+    return self._value
+
+
 class StateCache(object):
   """Cache for Beam state access, scoped by state key and cache_token.
      Assumes a bag state implementation.
 
-  For a given state_key and cache_token, caches a value and allows to
-    a) read from the cache (get),
-           if the currently stored cache_token matches the provided
-    b) write to the cache (put),
-           storing the new value alongside with a cache token
-    c) empty a cached element (clear),
-           if the currently stored cache_token matches the provided
+  For a given key, caches a value and allows to
+    a) peek at the cache (peek),

Review Comment:
   Clarified in comments that the cache implements an LRU cache.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #23046:
URL: https://github.com/apache/beam/pull/23046#discussion_r972396116


##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -111,28 +140,86 @@ def __init__(self, max_weight):
     self._max_weight = max_weight
     self._current_weight = 0
     self._cache = collections.OrderedDict(
-    )  # type: collections.OrderedDict[Tuple[bytes, Optional[bytes]], WeightedValue]
+    )  # type: collections.OrderedDict[Any, WeightedValue]
     self._hit_count = 0
     self._miss_count = 0
     self._evict_count = 0
+    self._load_time_ns = 0
+    self._load_count = 0
     self._lock = threading.RLock()
 
-  def get(self, state_key, cache_token):
-    # type: (bytes, Optional[bytes]) -> Any
-    assert cache_token and self.is_cache_enabled()
-    key = (state_key, cache_token)
+  def peek(self, key):
+    # type: (Any) -> Any
+    assert self.is_cache_enabled()
     with self._lock:
       value = self._cache.get(key, None)
-      if value is None:
+      if value is None or isinstance(value, _LoadingValue):
         self._miss_count += 1
         return None
+
       self._cache.move_to_end(key)
       self._hit_count += 1
-      return value.value()
+    return value.value()
+
+  def get(self, key, loading_fn):
+    # type: (Any, Callable[[Any], Any]) -> Any
+    assert self.is_cache_enabled() and callable(loading_fn)
+
+    self._lock.acquire()
+    value = self._cache.get(key, None)
+    if value is None:
+      self._miss_count += 1
+      loading_value = _LoadingValue()
+      self._cache[key] = loading_value
 
-  def put(self, state_key, cache_token, value):
-    # type: (bytes, Optional[bytes], Any) -> None
-    assert cache_token and self.is_cache_enabled()
+      # Ensure that we unlock the lock while loading to allow for parallel gets
+      self._lock.release()
+
+      start_time_ns = time.time_ns()
+      loading_value.load(key, loading_fn)
+      elapsed_time_ns = time.time_ns() - start_time_ns
+
+      # Replace the value in the cache with a weighted value now that the
+      # loading has completed successfully.
+      value = loading_value.value()
+      weight = objsize.get_deep_size(
+          value, get_referents_func=get_referents_for_cache)
+      if weight <= 0:
+        _LOGGER.warning(
+            'Expected object size to be >= 0 for %s but received %d.',
+            value,
+            weight)
+        weight = 8
+      value = WeightedValue(value, weight)
+      with self._lock:
+        self._load_count += 1
+        self._load_time_ns += elapsed_time_ns
+        # Don't replace values that have already been replaced with a different
+        # value by a put/invalidate that occurred concurrently with the load.
+        # The put/invalidate will have been responsible for updating the
+        # cache weight appropriately already.
+        old_value = self._cache.get(key, None)
+        if old_value is not loading_value:
+          return value.value()
+
+        self._current_weight -= loading_value.weight()
+        self._cache[key] = value
+        self._current_weight += value.weight()
+        while self._current_weight > self._max_weight:
+          (_, weighted_value) = self._cache.popitem(last=False)
+          self._current_weight -= weighted_value.weight()
+          self._evict_count += 1
+
+    else:
+      self._cache.move_to_end(key)
+      self._hit_count += 1
+      self._lock.release()
+
+    return value.value()

Review Comment:
   A single failure will be shared by all callers that are blocked but your right that the value should be removed from the cache on failure so that a future attempt can try again, updated to do so.



##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -181,12 +263,17 @@ def describe_stats(self):
         hit_ratio = 100.0 * self._hit_count / request_count
       else:
         hit_ratio = 100.0
-      return 'used/max %d/%d MB, hit %.2f%%, lookups %d, evictions %d' % (
-          self._current_weight >> 20,
-          self._max_weight >> 20,
-          hit_ratio,
-          request_count,
-          self._evict_count)
+      return (
+          'used/max %d/%d MB, hit %.2f%%, lookups %d, '
+          'avg load time %.0f ns, loads %d, evictions %d') % (
+              self._current_weight >> 20,
+              self._max_weight >> 20,
+              hit_ratio,
+              request_count,
+              self._load_time_ns /
+              self._load_count if self._load_count > 0 else 0,

Review Comment:
   yup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryanthompson591 commented on a diff in pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
ryanthompson591 commented on code in PR #23046:
URL: https://github.com/apache/beam/pull/23046#discussion_r982763780


##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -111,28 +141,104 @@ def __init__(self, max_weight):
     self._max_weight = max_weight
     self._current_weight = 0
     self._cache = collections.OrderedDict(
-    )  # type: collections.OrderedDict[Tuple[bytes, Optional[bytes]], WeightedValue]
+    )  # type: collections.OrderedDict[Any, WeightedValue]
     self._hit_count = 0
     self._miss_count = 0
     self._evict_count = 0
+    self._load_time_ns = 0
+    self._load_count = 0
     self._lock = threading.RLock()
 
-  def get(self, state_key, cache_token):
-    # type: (bytes, Optional[bytes]) -> Any
-    assert cache_token and self.is_cache_enabled()
-    key = (state_key, cache_token)
+  def peek(self, key):
+    # type: (Any) -> Any
+    assert self.is_cache_enabled()
     with self._lock:
       value = self._cache.get(key, None)
-      if value is None:
+      if value is None or isinstance(value, _LoadingValue):
         self._miss_count += 1
         return None
+
+      self._cache.move_to_end(key)
+      self._hit_count += 1
+    return value.value()
+
+  def get(self, key, loading_fn):
+    # type: (Any, Callable[[Any], Any]) -> Any
+    assert self.is_cache_enabled() and callable(loading_fn)
+
+    self._lock.acquire()
+    value = self._cache.get(key, None)
+    if value is None:
+      self._miss_count += 1
+      loading_value = _LoadingValue()
+      self._cache[key] = loading_value
+
+      # Ensure that we unlock the lock while loading to allow for parallel gets
+      self._lock.release()
+
+      start_time_ns = time.time_ns()
+      loading_value.load(key, loading_fn)
+      elapsed_time_ns = time.time_ns() - start_time_ns
+
+      try:
+        value = loading_value.value()
+      except Exception as err:
+        # If loading failed then delete the value from the cache allowing for
+        # the next lookup to possibly succeed.
+        with self._lock:
+          self._load_count += 1
+          self._load_time_ns += elapsed_time_ns
+          # Don't remove values that have already been replaced with a different
+          # value by a put/invalidate that occurred concurrently with the load.
+          # The put/invalidate will have been responsible for updating the
+          # cache weight appropriately already.
+          old_value = self._cache.get(key, None)
+          if old_value is not loading_value:
+            raise err
+          self._current_weight -= loading_value.weight()
+          del self._cache[key]
+        raise err
+
+      # Replace the value in the cache with a weighted value now that the
+      # loading has completed successfully.
+      weight = objsize.get_deep_size(
+          value, get_referents_func=get_referents_for_cache)
+      if weight <= 0:
+        _LOGGER.warning(
+            'Expected object size to be >= 0 for %s but received %d.',
+            value,
+            weight)
+        weight = 8
+      value = WeightedValue(value, weight)
+      with self._lock:
+        self._load_count += 1
+        self._load_time_ns += elapsed_time_ns
+        # Don't replace values that have already been replaced with a different
+        # value by a put/invalidate that occurred concurrently with the load.
+        # The put/invalidate will have been responsible for updating the
+        # cache weight appropriately already.
+        old_value = self._cache.get(key, None)
+        if old_value is not loading_value:
+          return value.value()

Review Comment:
   what could make this happen?



##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -111,28 +141,104 @@ def __init__(self, max_weight):
     self._max_weight = max_weight
     self._current_weight = 0
     self._cache = collections.OrderedDict(
-    )  # type: collections.OrderedDict[Tuple[bytes, Optional[bytes]], WeightedValue]
+    )  # type: collections.OrderedDict[Any, WeightedValue]
     self._hit_count = 0
     self._miss_count = 0
     self._evict_count = 0
+    self._load_time_ns = 0
+    self._load_count = 0
     self._lock = threading.RLock()
 
-  def get(self, state_key, cache_token):
-    # type: (bytes, Optional[bytes]) -> Any
-    assert cache_token and self.is_cache_enabled()
-    key = (state_key, cache_token)
+  def peek(self, key):
+    # type: (Any) -> Any
+    assert self.is_cache_enabled()
     with self._lock:
       value = self._cache.get(key, None)
-      if value is None:
+      if value is None or isinstance(value, _LoadingValue):
         self._miss_count += 1
         return None
+
+      self._cache.move_to_end(key)
+      self._hit_count += 1
+    return value.value()
+
+  def get(self, key, loading_fn):
+    # type: (Any, Callable[[Any], Any]) -> Any
+    assert self.is_cache_enabled() and callable(loading_fn)
+
+    self._lock.acquire()
+    value = self._cache.get(key, None)
+    if value is None:
+      self._miss_count += 1
+      loading_value = _LoadingValue()
+      self._cache[key] = loading_value
+
+      # Ensure that we unlock the lock while loading to allow for parallel gets
+      self._lock.release()
+
+      start_time_ns = time.time_ns()
+      loading_value.load(key, loading_fn)
+      elapsed_time_ns = time.time_ns() - start_time_ns
+
+      try:
+        value = loading_value.value()
+      except Exception as err:
+        # If loading failed then delete the value from the cache allowing for
+        # the next lookup to possibly succeed.
+        with self._lock:
+          self._load_count += 1
+          self._load_time_ns += elapsed_time_ns
+          # Don't remove values that have already been replaced with a different
+          # value by a put/invalidate that occurred concurrently with the load.
+          # The put/invalidate will have been responsible for updating the
+          # cache weight appropriately already.
+          old_value = self._cache.get(key, None)
+          if old_value is not loading_value:
+            raise err
+          self._current_weight -= loading_value.weight()
+          del self._cache[key]
+        raise err
+
+      # Replace the value in the cache with a weighted value now that the
+      # loading has completed successfully.
+      weight = objsize.get_deep_size(
+          value, get_referents_func=get_referents_for_cache)
+      if weight <= 0:
+        _LOGGER.warning(
+            'Expected object size to be >= 0 for %s but received %d.',
+            value,
+            weight)
+        weight = 8
+      value = WeightedValue(value, weight)
+      with self._lock:
+        self._load_count += 1
+        self._load_time_ns += elapsed_time_ns
+        # Don't replace values that have already been replaced with a different
+        # value by a put/invalidate that occurred concurrently with the load.
+        # The put/invalidate will have been responsible for updating the
+        # cache weight appropriately already.
+        old_value = self._cache.get(key, None)
+        if old_value is not loading_value:
+          return value.value()
+
+        self._current_weight -= loading_value.weight()
+        self._cache[key] = value
+        self._current_weight += value.weight()
+        while self._current_weight > self._max_weight:
+          (_, weighted_value) = self._cache.popitem(last=False)
+          self._current_weight -= weighted_value.weight()
+          self._evict_count += 1
+
+    else:

Review Comment:
   This is such a long method that I was trying to see where this else matched.  Maybe add a comment like value is not None.
   
   Or potentially just put this at the top
   
   with self._lock:
     value = self._cache.get(key, None)
     if value is not None:
       self._cache.move_to_end(key)
       self._hit_count += 1
       return value.value()
     ## all the code for if the loading needs to happen.
   
   It's up to you.
   



##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -111,28 +141,104 @@ def __init__(self, max_weight):
     self._max_weight = max_weight
     self._current_weight = 0
     self._cache = collections.OrderedDict(
-    )  # type: collections.OrderedDict[Tuple[bytes, Optional[bytes]], WeightedValue]
+    )  # type: collections.OrderedDict[Any, WeightedValue]
     self._hit_count = 0
     self._miss_count = 0
     self._evict_count = 0
+    self._load_time_ns = 0
+    self._load_count = 0
     self._lock = threading.RLock()
 
-  def get(self, state_key, cache_token):
-    # type: (bytes, Optional[bytes]) -> Any
-    assert cache_token and self.is_cache_enabled()
-    key = (state_key, cache_token)
+  def peek(self, key):
+    # type: (Any) -> Any
+    assert self.is_cache_enabled()
     with self._lock:
       value = self._cache.get(key, None)
-      if value is None:
+      if value is None or isinstance(value, _LoadingValue):
         self._miss_count += 1
         return None
+
+      self._cache.move_to_end(key)

Review Comment:
   what I was asking about earlier was this line here.  I'm just validating that we do want peek to modify the cache by counting a peek as a hit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #23046: [#23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #23046:
URL: https://github.com/apache/beam/pull/23046#issuecomment-1238451404

   R: @ryanthompson591 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryanthompson591 commented on a diff in pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
ryanthompson591 commented on code in PR #23046:
URL: https://github.com/apache/beam/pull/23046#discussion_r966300851


##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -1149,22 +1149,9 @@ def blocking_get(
       return self._lazy_iterator(state_key, coder)
     # Cache lookup
     cache_state_key = self._convert_to_cache_key(state_key)
-    cached_value = self._state_cache.get(cache_state_key, cache_token)
-    if cached_value is None:
-      # Cache miss, need to retrieve from the Runner
-      # Further size estimation or the use of the continuation token on the
-      # runner side could fall back to materializing one item at a time.
-      # https://jira.apache.org/jira/browse/BEAM-8297

Review Comment:
   Is this now resolved with this PR?



##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -1149,22 +1149,9 @@ def blocking_get(
       return self._lazy_iterator(state_key, coder)
     # Cache lookup
     cache_state_key = self._convert_to_cache_key(state_key)
-    cached_value = self._state_cache.get(cache_state_key, cache_token)
-    if cached_value is None:
-      # Cache miss, need to retrieve from the Runner
-      # Further size estimation or the use of the continuation token on the
-      # runner side could fall back to materializing one item at a time.
-      # https://jira.apache.org/jira/browse/BEAM-8297
-      materialized = cached_value = (
-          self._partially_cached_iterable(state_key, coder))
-      if isinstance(materialized, (list, self.ContinuationIterable)):
-        self._state_cache.put(cache_state_key, cache_token, materialized)
-      else:
-        _LOGGER.error(
-            "Uncacheable type %s for key %s. Not caching.",
-            materialized,
-            state_key)
-    return cached_value
+    return self._state_cache.get(
+        (cache_state_key, cache_token),

Review Comment:
   I'm just curious about why we need a state key and a token to retrieve data from the cache. Isn't a token adquate?



##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -1149,7 +1149,7 @@ def blocking_get(
       return self._lazy_iterator(state_key, coder)
     # Cache lookup
     cache_state_key = self._convert_to_cache_key(state_key)
-    cached_value = self._state_cache.get(cache_state_key, cache_token)
+    cached_value = self._state_cache.peek((cache_state_key, cache_token))

Review Comment:
   are the extra brackets because peek wants a tuple?



##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -87,18 +88,46 @@ def get_referents_for_cache(*objs):
   return rval
 
 
+class _LoadingValue(WeightedValue):
+  """Allows concurrent users of the cache to wait for a value to be loaded."""
+  def __init__(self):
+    # type: () -> None
+    super().__init__(None, 1)
+    self._wait_event = threading.Event()
+
+  def load(self, key, loading_fn):
+    # type: (Any, Callable[[Any], Any]) -> None
+    try:
+      self._value = loading_fn(key)
+    except Exception as err:
+      self._error = err
+    finally:
+      self._wait_event.set()
+
+  def value(self):
+    # type: () -> Any
+    self._wait_event.wait()
+    err = getattr(self, "_error", None)
+    if err:
+      raise err
+    return self._value
+
+
 class StateCache(object):
   """Cache for Beam state access, scoped by state key and cache_token.
      Assumes a bag state implementation.
 
-  For a given state_key and cache_token, caches a value and allows to
-    a) read from the cache (get),
-           if the currently stored cache_token matches the provided
-    b) write to the cache (put),
-           storing the new value alongside with a cache token
-    c) empty a cached element (clear),
-           if the currently stored cache_token matches the provided
+  For a given key, caches a value and allows to
+    a) peek at the cache (peek),

Review Comment:
   is it worth documenting that peek doesn't just look if the value is there but also moves the value the head of the cache?
   
   That's the behavior we want with peek right?



##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -1149,22 +1149,9 @@ def blocking_get(
       return self._lazy_iterator(state_key, coder)
     # Cache lookup
     cache_state_key = self._convert_to_cache_key(state_key)
-    cached_value = self._state_cache.get(cache_state_key, cache_token)
-    if cached_value is None:
-      # Cache miss, need to retrieve from the Runner
-      # Further size estimation or the use of the continuation token on the
-      # runner side could fall back to materializing one item at a time.
-      # https://jira.apache.org/jira/browse/BEAM-8297
-      materialized = cached_value = (
-          self._partially_cached_iterable(state_key, coder))
-      if isinstance(materialized, (list, self.ContinuationIterable)):
-        self._state_cache.put(cache_state_key, cache_token, materialized)
-      else:
-        _LOGGER.error(

Review Comment:
   we are removing this are there no longer cache misses we worry about? How are those handled?



##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -181,12 +263,17 @@ def describe_stats(self):
         hit_ratio = 100.0 * self._hit_count / request_count
       else:
         hit_ratio = 100.0
-      return 'used/max %d/%d MB, hit %.2f%%, lookups %d, evictions %d' % (
-          self._current_weight >> 20,
-          self._max_weight >> 20,
-          hit_ratio,
-          request_count,
-          self._evict_count)
+      return (
+          'used/max %d/%d MB, hit %.2f%%, lookups %d, '
+          'avg load time %.0f ns, loads %d, evictions %d') % (
+              self._current_weight >> 20,
+              self._max_weight >> 20,
+              hit_ratio,
+              request_count,
+              self._load_time_ns /
+              self._load_count if self._load_count > 0 else 0,

Review Comment:
   This is just to prevent a divide by 0 right?



##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -111,28 +140,86 @@ def __init__(self, max_weight):
     self._max_weight = max_weight
     self._current_weight = 0
     self._cache = collections.OrderedDict(
-    )  # type: collections.OrderedDict[Tuple[bytes, Optional[bytes]], WeightedValue]
+    )  # type: collections.OrderedDict[Any, WeightedValue]
     self._hit_count = 0
     self._miss_count = 0
     self._evict_count = 0
+    self._load_time_ns = 0
+    self._load_count = 0
     self._lock = threading.RLock()
 
-  def get(self, state_key, cache_token):
-    # type: (bytes, Optional[bytes]) -> Any
-    assert cache_token and self.is_cache_enabled()
-    key = (state_key, cache_token)
+  def peek(self, key):
+    # type: (Any) -> Any
+    assert self.is_cache_enabled()
     with self._lock:

Review Comment:
   If the lock is set because we are waiting on a load, peek will block?  Is that what we want?



##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -1175,29 +1162,21 @@ def extend(
     # type: (...) -> _Future
     cache_token = self._get_cache_token(state_key)
     if cache_token:
-      # Update the cache
+      # Update the cache if the value is already present and
+      # can be updated.
       cache_key = self._convert_to_cache_key(state_key)
-      cached_value = self._state_cache.get(cache_key, cache_token)
-      # Keep in mind that the state for this key can be evicted
-      # while executing this function. Either read or write to the cache
-      # but never do both here!
-      if cached_value is None:
-        # We have never cached this key before, first retrieve state
-        cached_value = self.blocking_get(state_key, coder)
-      # Just extend the already cached value
+      cached_value = self._state_cache.peek((cache_key, cache_token))

Review Comment:
   So is the purpose of a peek vs a get, that get has some sort of effect on the cache and peek has none?
   
   I guess I'm not sure why we are peeking, shouldn't just the fact that we peek mean we are interested in updating the state of the data we looked at in the cache.



##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -181,12 +263,17 @@ def describe_stats(self):
         hit_ratio = 100.0 * self._hit_count / request_count
       else:
         hit_ratio = 100.0
-      return 'used/max %d/%d MB, hit %.2f%%, lookups %d, evictions %d' % (
-          self._current_weight >> 20,
-          self._max_weight >> 20,
-          hit_ratio,
-          request_count,
-          self._evict_count)
+      return (
+          'used/max %d/%d MB, hit %.2f%%, lookups %d, '
+          'avg load time %.0f ns, loads %d, evictions %d') % (

Review Comment:
   At first when I read loads, I though load time seconds num_loads, total_loads?



##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -111,28 +140,86 @@ def __init__(self, max_weight):
     self._max_weight = max_weight
     self._current_weight = 0
     self._cache = collections.OrderedDict(
-    )  # type: collections.OrderedDict[Tuple[bytes, Optional[bytes]], WeightedValue]
+    )  # type: collections.OrderedDict[Any, WeightedValue]
     self._hit_count = 0
     self._miss_count = 0
     self._evict_count = 0
+    self._load_time_ns = 0
+    self._load_count = 0
     self._lock = threading.RLock()
 
-  def get(self, state_key, cache_token):
-    # type: (bytes, Optional[bytes]) -> Any
-    assert cache_token and self.is_cache_enabled()
-    key = (state_key, cache_token)
+  def peek(self, key):
+    # type: (Any) -> Any
+    assert self.is_cache_enabled()
     with self._lock:
       value = self._cache.get(key, None)
-      if value is None:
+      if value is None or isinstance(value, _LoadingValue):
         self._miss_count += 1
         return None
+
       self._cache.move_to_end(key)
       self._hit_count += 1
-      return value.value()
+    return value.value()
+
+  def get(self, key, loading_fn):
+    # type: (Any, Callable[[Any], Any]) -> Any
+    assert self.is_cache_enabled() and callable(loading_fn)
+
+    self._lock.acquire()
+    value = self._cache.get(key, None)
+    if value is None:
+      self._miss_count += 1
+      loading_value = _LoadingValue()
+      self._cache[key] = loading_value
 
-  def put(self, state_key, cache_token, value):
-    # type: (bytes, Optional[bytes], Any) -> None
-    assert cache_token and self.is_cache_enabled()
+      # Ensure that we unlock the lock while loading to allow for parallel gets
+      self._lock.release()
+
+      start_time_ns = time.time_ns()
+      loading_value.load(key, loading_fn)
+      elapsed_time_ns = time.time_ns() - start_time_ns
+
+      # Replace the value in the cache with a weighted value now that the
+      # loading has completed successfully.
+      value = loading_value.value()
+      weight = objsize.get_deep_size(
+          value, get_referents_func=get_referents_for_cache)
+      if weight <= 0:
+        _LOGGER.warning(
+            'Expected object size to be >= 0 for %s but received %d.',
+            value,
+            weight)
+        weight = 8
+      value = WeightedValue(value, weight)
+      with self._lock:
+        self._load_count += 1
+        self._load_time_ns += elapsed_time_ns
+        # Don't replace values that have already been replaced with a different
+        # value by a put/invalidate that occurred concurrently with the load.
+        # The put/invalidate will have been responsible for updating the
+        # cache weight appropriately already.
+        old_value = self._cache.get(key, None)
+        if old_value is not loading_value:
+          return value.value()
+
+        self._current_weight -= loading_value.weight()
+        self._cache[key] = value
+        self._current_weight += value.weight()
+        while self._current_weight > self._max_weight:
+          (_, weighted_value) = self._cache.popitem(last=False)
+          self._current_weight -= weighted_value.weight()
+          self._evict_count += 1
+
+    else:
+      self._cache.move_to_end(key)
+      self._hit_count += 1
+      self._lock.release()
+
+    return value.value()

Review Comment:
   This assumes the value is either in the cache or the load function will return something we can use. Is there are case where the load function could fail and we should do something different?
   
   If I'm correct the value method has a raise method if there was an exception. Is there a case where the load function could no op and then we would have strange values here?



##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -87,18 +88,46 @@ def get_referents_for_cache(*objs):
   return rval
 
 
+class _LoadingValue(WeightedValue):
+  """Allows concurrent users of the cache to wait for a value to be loaded."""
+  def __init__(self):
+    # type: () -> None
+    super().__init__(None, 1)
+    self._wait_event = threading.Event()
+
+  def load(self, key, loading_fn):
+    # type: (Any, Callable[[Any], Any]) -> None
+    try:
+      self._value = loading_fn(key)
+    except Exception as err:

Review Comment:
   what kind of exceptions should we expect? Can we be more specific about which exceptions we catch?
   
   Oh I see we are raising this exception below in the value method.



##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -111,28 +140,86 @@ def __init__(self, max_weight):
     self._max_weight = max_weight
     self._current_weight = 0
     self._cache = collections.OrderedDict(
-    )  # type: collections.OrderedDict[Tuple[bytes, Optional[bytes]], WeightedValue]
+    )  # type: collections.OrderedDict[Any, WeightedValue]
     self._hit_count = 0
     self._miss_count = 0
     self._evict_count = 0
+    self._load_time_ns = 0
+    self._load_count = 0
     self._lock = threading.RLock()
 
-  def get(self, state_key, cache_token):
-    # type: (bytes, Optional[bytes]) -> Any
-    assert cache_token and self.is_cache_enabled()
-    key = (state_key, cache_token)
+  def peek(self, key):
+    # type: (Any) -> Any
+    assert self.is_cache_enabled()
     with self._lock:
       value = self._cache.get(key, None)
-      if value is None:
+      if value is None or isinstance(value, _LoadingValue):
         self._miss_count += 1
         return None
+
       self._cache.move_to_end(key)
       self._hit_count += 1
-      return value.value()
+    return value.value()
+
+  def get(self, key, loading_fn):
+    # type: (Any, Callable[[Any], Any]) -> Any
+    assert self.is_cache_enabled() and callable(loading_fn)
+
+    self._lock.acquire()
+    value = self._cache.get(key, None)
+    if value is None:
+      self._miss_count += 1
+      loading_value = _LoadingValue()

Review Comment:
   I'm just trying to go through the possible calls to the method in my head and trying to determine if the lock could be released and the value at cache[key] could be a _LoadingValue still.
   
   Do we need any logic to handle that. I think if we have strong tests to prevent this kind of race case we should be fine.



##########
sdks/python/apache_beam/runners/worker/statecache_test.py:
##########
@@ -19,144 +19,239 @@
 # pytype: skip-file
 
 import logging
+import re
+import threading
+import time
 import unittest
 
+from hamcrest import assert_that
+from hamcrest import contains_string
+
 from apache_beam.runners.worker.statecache import CacheAware
 from apache_beam.runners.worker.statecache import StateCache
 from apache_beam.runners.worker.statecache import WeightedValue
 
 
 class StateCacheTest(unittest.TestCase):
-  def test_empty_cache_get(self):
+  def test_empty_cache_peek(self):
     cache = StateCache(5 << 20)
-    self.assertEqual(cache.get("key", 'cache_token'), None)
-    with self.assertRaises(Exception):
-      # Invalid cache token provided
-      self.assertEqual(cache.get("key", None), None)
+    self.assertEqual(cache.peek("key"), None)
     self.assertEqual(
         cache.describe_stats(),
-        'used/max 0/5 MB, hit 0.00%, lookups 1, evictions 0')
+        (
+            'used/max 0/5 MB, hit 0.00%, lookups 1, '
+            'avg load time 0 ns, loads 0, evictions 0'))
 
-  def test_put_get(self):
+  def test_put_peek(self):
     cache = StateCache(5 << 20)
-    cache.put("key", "cache_token", WeightedValue("value", 1 << 20))
+    cache.put("key", WeightedValue("value", 1 << 20))
     self.assertEqual(cache.size(), 1)
-    self.assertEqual(cache.get("key", "cache_token"), "value")
-    self.assertEqual(cache.get("key", "cache_token2"), None)
-    with self.assertRaises(Exception):
-      self.assertEqual(cache.get("key", None), None)
-    self.assertEqual(
-        cache.describe_stats(),
-        'used/max 1/5 MB, hit 50.00%, lookups 2, evictions 0')
-
-  def test_clear(self):
-    cache = StateCache(5 << 20)
-    cache.clear("new-key", "cache_token")
-    cache.put("key", "cache_token", WeightedValue(["value"], 1 << 20))
-    self.assertEqual(cache.size(), 2)
-    self.assertEqual(cache.get("new-key", "new_token"), None)
-    self.assertEqual(cache.get("key", "cache_token"), ['value'])
-    # test clear without existing key/token
-    cache.clear("non-existing", "token")
-    self.assertEqual(cache.size(), 3)
-    self.assertEqual(cache.get("non-existing", "token"), [])
+    self.assertEqual(cache.peek("key"), "value")
+    self.assertEqual(cache.peek("key2"), None)
     self.assertEqual(
         cache.describe_stats(),
-        'used/max 1/5 MB, hit 66.67%, lookups 3, evictions 0')
+        (
+            'used/max 1/5 MB, hit 50.00%, lookups 2, '
+            'avg load time 0 ns, loads 0, evictions 0'))
 
   def test_default_sized_put(self):
     cache = StateCache(5 << 20)
-    cache.put("key", "cache_token", bytearray(1 << 20))
-    cache.put("key2", "cache_token", bytearray(1 << 20))
-    cache.put("key3", "cache_token", bytearray(1 << 20))
-    self.assertEqual(cache.get("key3", "cache_token"), bytearray(1 << 20))
-    cache.put("key4", "cache_token", bytearray(1 << 20))
-    cache.put("key5", "cache_token", bytearray(1 << 20))
+    cache.put("key", bytearray(1 << 20))
+    cache.put("key2", bytearray(1 << 20))
+    cache.put("key3", bytearray(1 << 20))
+    self.assertEqual(cache.peek("key3"), bytearray(1 << 20))
+    cache.put("key4", bytearray(1 << 20))
+    cache.put("key5", bytearray(1 << 20))
     # note that each byte array instance takes slightly over 1 MB which is why
     # these 5 byte arrays can't all be stored in the cache causing a single
     # eviction
     self.assertEqual(
         cache.describe_stats(),
-        'used/max 4/5 MB, hit 100.00%, lookups 1, evictions 1')
+        (
+            'used/max 4/5 MB, hit 100.00%, lookups 1, '
+            'avg load time 0 ns, loads 0, evictions 1'))
 
   def test_max_size(self):
     cache = StateCache(2 << 20)
-    cache.put("key", "cache_token", WeightedValue("value", 1 << 20))
-    cache.put("key2", "cache_token", WeightedValue("value2", 1 << 20))
+    cache.put("key", WeightedValue("value", 1 << 20))
+    cache.put("key2", WeightedValue("value2", 1 << 20))
     self.assertEqual(cache.size(), 2)
-    cache.put("key3", "cache_token", WeightedValue("value3", 1 << 20))
+    cache.put("key3", WeightedValue("value3", 1 << 20))
     self.assertEqual(cache.size(), 2)
     self.assertEqual(
         cache.describe_stats(),
-        'used/max 2/2 MB, hit 100.00%, lookups 0, evictions 1')
+        (
+            'used/max 2/2 MB, hit 100.00%, lookups 0, '
+            'avg load time 0 ns, loads 0, evictions 1'))
 
   def test_invalidate_all(self):
     cache = StateCache(5 << 20)
-    cache.put("key", "cache_token", WeightedValue("value", 1 << 20))
-    cache.put("key2", "cache_token", WeightedValue("value2", 1 << 20))
+    cache.put("key", WeightedValue("value", 1 << 20))
+    cache.put("key2", WeightedValue("value2", 1 << 20))
     self.assertEqual(cache.size(), 2)
     cache.invalidate_all()
     self.assertEqual(cache.size(), 0)
-    self.assertEqual(cache.get("key", "cache_token"), None)
-    self.assertEqual(cache.get("key2", "cache_token"), None)
+    self.assertEqual(cache.peek("key"), None)
+    self.assertEqual(cache.peek("key2"), None)
     self.assertEqual(
         cache.describe_stats(),
-        'used/max 0/5 MB, hit 0.00%, lookups 2, evictions 0')
+        (
+            'used/max 0/5 MB, hit 0.00%, lookups 2, '
+            'avg load time 0 ns, loads 0, evictions 0'))
 
   def test_lru(self):
     cache = StateCache(5 << 20)
-    cache.put("key", "cache_token", WeightedValue("value", 1 << 20))
-    cache.put("key2", "cache_token2", WeightedValue("value2", 1 << 20))
-    cache.put("key3", "cache_token", WeightedValue("value0", 1 << 20))
-    cache.put("key3", "cache_token", WeightedValue("value3", 1 << 20))
-    cache.put("key4", "cache_token4", WeightedValue("value4", 1 << 20))
-    cache.put("key5", "cache_token", WeightedValue("value0", 1 << 20))
-    cache.put("key5", "cache_token", WeightedValue(["value5"], 1 << 20))
+    cache.put("key", WeightedValue("value", 1 << 20))
+    cache.put("key2", WeightedValue("value2", 1 << 20))
+    cache.put("key3", WeightedValue("value0", 1 << 20))
+    cache.put("key3", WeightedValue("value3", 1 << 20))
+    cache.put("key4", WeightedValue("value4", 1 << 20))
+    cache.put("key5", WeightedValue("value0", 1 << 20))
+    cache.put("key5", WeightedValue(["value5"], 1 << 20))
     self.assertEqual(cache.size(), 5)
-    self.assertEqual(cache.get("key", "cache_token"), "value")
-    self.assertEqual(cache.get("key2", "cache_token2"), "value2")
-    self.assertEqual(cache.get("key3", "cache_token"), "value3")
-    self.assertEqual(cache.get("key4", "cache_token4"), "value4")
-    self.assertEqual(cache.get("key5", "cache_token"), ["value5"])
+    self.assertEqual(cache.peek("key"), "value")
+    self.assertEqual(cache.peek("key2"), "value2")
+    self.assertEqual(cache.peek("key3"), "value3")
+    self.assertEqual(cache.peek("key4"), "value4")
+    self.assertEqual(cache.peek("key5"), ["value5"])
     # insert another key to trigger cache eviction
-    cache.put("key6", "cache_token2", WeightedValue("value6", 1 << 20))
+    cache.put("key6", WeightedValue("value6", 1 << 20))
     self.assertEqual(cache.size(), 5)
     # least recently used key should be gone ("key")
-    self.assertEqual(cache.get("key", "cache_token"), None)
+    self.assertEqual(cache.peek("key"), None)
     # trigger a read on "key2"
-    cache.get("key2", "cache_token2")
+    cache.peek("key2")
     # insert another key to trigger cache eviction
-    cache.put("key7", "cache_token", WeightedValue("value7", 1 << 20))
+    cache.put("key7", WeightedValue("value7", 1 << 20))
     self.assertEqual(cache.size(), 5)
     # least recently used key should be gone ("key3")
-    self.assertEqual(cache.get("key3", "cache_token"), None)
-    # trigger a put on "key2"
-    cache.put("key2", "cache_token", WeightedValue("put", 1 << 20))
+    self.assertEqual(cache.peek("key3"), None)
+    # insert another key to trigger cache eviction
+    cache.put("key8", WeightedValue("put", 1 << 20))
     self.assertEqual(cache.size(), 5)
     # insert another key to trigger cache eviction
-    cache.put("key8", "cache_token", WeightedValue("value8", 1 << 20))
+    cache.put("key9", WeightedValue("value8", 1 << 20))
     self.assertEqual(cache.size(), 5)
     # least recently used key should be gone ("key4")
-    self.assertEqual(cache.get("key4", "cache_token"), None)
+    self.assertEqual(cache.peek("key4"), None)
     # make "key5" used by writing to it
-    cache.put("key5", "cache_token", WeightedValue("val", 1 << 20))
+    cache.put("key5", WeightedValue("val", 1 << 20))
     # least recently used key should be gone ("key6")
-    self.assertEqual(cache.get("key6", "cache_token"), None)
+    self.assertEqual(cache.peek("key6"), None)
     self.assertEqual(
         cache.describe_stats(),
-        'used/max 5/5 MB, hit 60.00%, lookups 10, evictions 5')
+        (
+            'used/max 5/5 MB, hit 60.00%, lookups 10, '
+            'avg load time 0 ns, loads 0, evictions 5'))
+
+  def test_get(self):
+    def check_key(key):
+      self.assertEqual(key, "key")
+      time.sleep(0.5)
+      return "value"
+
+    cache = StateCache(5 << 20)
+    self.assertEqual("value", cache.get("key", check_key))
+    self.assertEqual("value", cache.peek("key"))
+    cache.invalidate_all()
+    self.assertEqual("value", cache.get("key", check_key))
+    self.assertEqual("value", cache.peek("key"))
+
+    assert_that(cache.describe_stats(), contains_string(", loads 2,"))
+    load_time_ns = re.search(
+        ", avg load time (.+) ns,", cache.describe_stats()).group(1)
+    # Load time should be larger then the sleep time and less than 2x sleep time
+    self.assertGreater(int(load_time_ns), 0.5 * 1_000_000_000)
+    self.assertLess(int(load_time_ns), 1_000_000_000)
+
+  def test_concurrent_get_waits(self):

Review Comment:
   cool test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #23046:
URL: https://github.com/apache/beam/pull/23046#discussion_r966393742


##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -1149,22 +1149,9 @@ def blocking_get(
       return self._lazy_iterator(state_key, coder)
     # Cache lookup
     cache_state_key = self._convert_to_cache_key(state_key)
-    cached_value = self._state_cache.get(cache_state_key, cache_token)
-    if cached_value is None:
-      # Cache miss, need to retrieve from the Runner
-      # Further size estimation or the use of the continuation token on the
-      # runner side could fall back to materializing one item at a time.
-      # https://jira.apache.org/jira/browse/BEAM-8297

Review Comment:
   get is now reponsible for looking up the value and if it is not there it will load it using the supplied lambda



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #23046:
URL: https://github.com/apache/beam/pull/23046#issuecomment-1248586094

   > 
   
   Yes, I got sidetracked and was unable to push up the latest changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik merged pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
lukecwik merged PR #23046:
URL: https://github.com/apache/beam/pull/23046


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #23046:
URL: https://github.com/apache/beam/pull/23046#discussion_r966414829


##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -111,28 +140,86 @@ def __init__(self, max_weight):
     self._max_weight = max_weight
     self._current_weight = 0
     self._cache = collections.OrderedDict(
-    )  # type: collections.OrderedDict[Tuple[bytes, Optional[bytes]], WeightedValue]
+    )  # type: collections.OrderedDict[Any, WeightedValue]
     self._hit_count = 0
     self._miss_count = 0
     self._evict_count = 0
+    self._load_time_ns = 0
+    self._load_count = 0
     self._lock = threading.RLock()
 
-  def get(self, state_key, cache_token):
-    # type: (bytes, Optional[bytes]) -> Any
-    assert cache_token and self.is_cache_enabled()
-    key = (state_key, cache_token)
+  def peek(self, key):
+    # type: (Any) -> Any
+    assert self.is_cache_enabled()
     with self._lock:

Review Comment:
   We check specifically that it is a _LoadingValue on line 156 and return None immediately and yes that is what we want. `peek` should not block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #23046:
URL: https://github.com/apache/beam/pull/23046#discussion_r972394713


##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -181,12 +263,17 @@ def describe_stats(self):
         hit_ratio = 100.0 * self._hit_count / request_count
       else:
         hit_ratio = 100.0
-      return 'used/max %d/%d MB, hit %.2f%%, lookups %d, evictions %d' % (
-          self._current_weight >> 20,
-          self._max_weight >> 20,
-          hit_ratio,
-          request_count,
-          self._evict_count)
+      return (
+          'used/max %d/%d MB, hit %.2f%%, lookups %d, '
+          'avg load time %.0f ns, loads %d, evictions %d') % (

Review Comment:
   This is to match existing statistics collected in java.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #23046: [#23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #23046:
URL: https://github.com/apache/beam/pull/23046#issuecomment-1238473128

   # [Codecov](https://codecov.io/gh/apache/beam/pull/23046?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#23046](https://codecov.io/gh/apache/beam/pull/23046?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (65a8743) into [master](https://codecov.io/gh/apache/beam/commit/723152266978c97a4bbb627f07ab07625128f55a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7231522) will **decrease** coverage by `0.02%`.
   > The diff coverage is `74.07%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #23046      +/-   ##
   ==========================================
   - Coverage   73.67%   73.64%   -0.03%     
   ==========================================
     Files         714      716       +2     
     Lines       95212    95321     +109     
   ==========================================
   + Hits        70150    70202      +52     
   - Misses      23765    23822      +57     
     Partials     1297     1297              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.43% <86.95%> (-0.06%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/23046?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/go/pkg/beam/core/runtime/harness/harness.go](https://codecov.io/gh/apache/beam/pull/23046/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvaGFybmVzcy9oYXJuZXNzLmdv) | `10.18% <0.00%> (ø)` | |
   | [...ks/python/apache\_beam/runners/worker/statecache.py](https://codecov.io/gh/apache/beam/pull/23046/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc3RhdGVjYWNoZS5weQ==) | `87.67% <86.15%> (-2.02%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/23046/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.09% <100.00%> (ø)` | |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/23046/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `95.12% <0.00%> (-2.44%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/localfilesystem.py](https://codecov.io/gh/apache/beam/pull/23046/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vbG9jYWxmaWxlc3lzdGVtLnB5) | `90.97% <0.00%> (-0.76%)` | :arrow_down: |
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/23046/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `91.71% <0.00%> (-0.31%)` | :arrow_down: |
   | [...on/apache\_beam/runners/dataflow/dataflow\_runner.py](https://codecov.io/gh/apache/beam/pull/23046/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9kYXRhZmxvd19ydW5uZXIucHk=) | `82.87% <0.00%> (-0.14%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/23046/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.54% <0.00%> (-0.13%)` | :arrow_down: |
   | [...am/examples/inference/pytorch\_language\_modeling.py](https://codecov.io/gh/apache/beam/pull/23046/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvaW5mZXJlbmNlL3B5dG9yY2hfbGFuZ3VhZ2VfbW9kZWxpbmcucHk=) | `0.00% <0.00%> (ø)` | |
   | [...examples/inference/pytorch\_image\_classification.py](https://codecov.io/gh/apache/beam/pull/23046/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvaW5mZXJlbmNlL3B5dG9yY2hfaW1hZ2VfY2xhc3NpZmljYXRpb24ucHk=) | `0.00% <0.00%> (ø)` | |
   | ... and [6 more](https://codecov.io/gh/apache/beam/pull/23046/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #23046:
URL: https://github.com/apache/beam/pull/23046#issuecomment-1248623931

   Latest changes up now. Thanks for pinging me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #23046:
URL: https://github.com/apache/beam/pull/23046#discussion_r966417187


##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -111,28 +140,86 @@ def __init__(self, max_weight):
     self._max_weight = max_weight
     self._current_weight = 0
     self._cache = collections.OrderedDict(
-    )  # type: collections.OrderedDict[Tuple[bytes, Optional[bytes]], WeightedValue]
+    )  # type: collections.OrderedDict[Any, WeightedValue]
     self._hit_count = 0
     self._miss_count = 0
     self._evict_count = 0
+    self._load_time_ns = 0
+    self._load_count = 0
     self._lock = threading.RLock()
 
-  def get(self, state_key, cache_token):
-    # type: (bytes, Optional[bytes]) -> Any
-    assert cache_token and self.is_cache_enabled()
-    key = (state_key, cache_token)
+  def peek(self, key):
+    # type: (Any) -> Any
+    assert self.is_cache_enabled()
     with self._lock:
       value = self._cache.get(key, None)
-      if value is None:
+      if value is None or isinstance(value, _LoadingValue):
         self._miss_count += 1
         return None
+
       self._cache.move_to_end(key)
       self._hit_count += 1
-      return value.value()
+    return value.value()
+
+  def get(self, key, loading_fn):
+    # type: (Any, Callable[[Any], Any]) -> Any
+    assert self.is_cache_enabled() and callable(loading_fn)
+
+    self._lock.acquire()
+    value = self._cache.get(key, None)
+    if value is None:
+      self._miss_count += 1
+      loading_value = _LoadingValue()

Review Comment:
   I updated the test to ensure that the value after the load isn't a `_LoadingValue`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #23046:
URL: https://github.com/apache/beam/pull/23046#issuecomment-1238662455

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #23046:
URL: https://github.com/apache/beam/pull/23046#discussion_r966392922


##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -1149,7 +1149,7 @@ def blocking_get(
       return self._lazy_iterator(state_key, coder)
     # Cache lookup
     cache_state_key = self._convert_to_cache_key(state_key)
-    cached_value = self._state_cache.get(cache_state_key, cache_token)
+    cached_value = self._state_cache.peek((cache_state_key, cache_token))

Review Comment:
   I swapped the API to use a single key field since the internal implementation of the cache shouldn't care. Now the clients will pass in a tuple that represents the key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #23046:
URL: https://github.com/apache/beam/pull/23046#discussion_r966412972


##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -87,18 +88,46 @@ def get_referents_for_cache(*objs):
   return rval
 
 
+class _LoadingValue(WeightedValue):
+  """Allows concurrent users of the cache to wait for a value to be loaded."""
+  def __init__(self):
+    # type: () -> None
+    super().__init__(None, 1)
+    self._wait_event = threading.Event()
+
+  def load(self, key, loading_fn):
+    # type: (Any, Callable[[Any], Any]) -> None
+    try:
+      self._value = loading_fn(key)
+    except Exception as err:

Review Comment:
   Your right, we catch all and anyone who was blocked will get it during the call to `value`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #23046:
URL: https://github.com/apache/beam/pull/23046#discussion_r972387375


##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -1149,22 +1149,9 @@ def blocking_get(
       return self._lazy_iterator(state_key, coder)
     # Cache lookup
     cache_state_key = self._convert_to_cache_key(state_key)
-    cached_value = self._state_cache.get(cache_state_key, cache_token)
-    if cached_value is None:
-      # Cache miss, need to retrieve from the Runner
-      # Further size estimation or the use of the continuation token on the
-      # runner side could fall back to materializing one item at a time.
-      # https://jira.apache.org/jira/browse/BEAM-8297
-      materialized = cached_value = (
-          self._partially_cached_iterable(state_key, coder))
-      if isinstance(materialized, (list, self.ContinuationIterable)):
-        self._state_cache.put(cache_state_key, cache_token, materialized)
-      else:
-        _LOGGER.error(

Review Comment:
   They aren't misses, this is handled by get now since it will load the value if not present.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #23046:
URL: https://github.com/apache/beam/pull/23046#discussion_r966408485


##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -1175,29 +1162,21 @@ def extend(
     # type: (...) -> _Future
     cache_token = self._get_cache_token(state_key)
     if cache_token:
-      # Update the cache
+      # Update the cache if the value is already present and
+      # can be updated.
       cache_key = self._convert_to_cache_key(state_key)
-      cached_value = self._state_cache.get(cache_key, cache_token)
-      # Keep in mind that the state for this key can be evicted
-      # while executing this function. Either read or write to the cache
-      # but never do both here!
-      if cached_value is None:
-        # We have never cached this key before, first retrieve state
-        cached_value = self.blocking_get(state_key, coder)
-      # Just extend the already cached value
+      cached_value = self._state_cache.peek((cache_key, cache_token))

Review Comment:
   Yes, `peek` doesn't modify the cache and returns the value or None if not present. `get` returns the value (loading if necessary).
   
   We are peeking because we want to support blind writes. This allows us to append to the bag without needing to load it from the runner. As an optimization we also append to the cached in memory version if it is fully loaded since we know that we will have a consistent answer with what the runner would have provided to us.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #23046: [fixes #23000] Update the Python SDK harness state cache to be a loading cache

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #23046:
URL: https://github.com/apache/beam/pull/23046#discussion_r966414829


##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -111,28 +140,86 @@ def __init__(self, max_weight):
     self._max_weight = max_weight
     self._current_weight = 0
     self._cache = collections.OrderedDict(
-    )  # type: collections.OrderedDict[Tuple[bytes, Optional[bytes]], WeightedValue]
+    )  # type: collections.OrderedDict[Any, WeightedValue]
     self._hit_count = 0
     self._miss_count = 0
     self._evict_count = 0
+    self._load_time_ns = 0
+    self._load_count = 0
     self._lock = threading.RLock()
 
-  def get(self, state_key, cache_token):
-    # type: (bytes, Optional[bytes]) -> Any
-    assert cache_token and self.is_cache_enabled()
-    key = (state_key, cache_token)
+  def peek(self, key):
+    # type: (Any) -> Any
+    assert self.is_cache_enabled()
     with self._lock:

Review Comment:
   We check specifically that it is a _LoadingValue on line 156 and return None immediately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org