You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/27 23:59:07 UTC

[GitHub] [beam] TheNeuralBit commented on a diff in pull request #23391: Update state cache to not fail when measuring object sizes.

TheNeuralBit commented on code in PR #23391:
URL: https://github.com/apache/beam/pull/23391#discussion_r981822093


##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -69,24 +87,100 @@ def get_referents_for_cache(self):
     raise NotImplementedError()
 
 
-def get_referents_for_cache(*objs):
+def _safe_isinstance(obj, type):
+  # type: (Any, Union[type, Tuple[type, ...]]) -> bool
+
+  """
+  Return whether an object is an instance of a class or of a subclass thereof.
+  See `isinstance()` for more information.
+
+  Returns false on `isinstance()` failure. For example applying `isinstance()`
+  on `weakref.proxy` objects attempts to dereference the proxy objects, which
+  may yield an exception. See https://github.com/apache/beam/issues/23389 for
+  additional details.
+  """
+  try:
+    return isinstance(obj, type)
+  except Exception:
+    return False
+
+
+def _size_func(obj):
+  # type: (Any) -> int
+
+  """
+  Returns the size of the object or a default size if an error occurred during
+  sizing.
+  """
+  try:
+    return sys.getsizeof(obj)
+  except Exception as e:
+    current_time = time.time()
+    # Limit outputting this log so we don't spam the logs on these
+    # occurrences.
+    if _size_func.last_log_time + 300 < current_time:  # type: ignore
+      _LOGGER.warning(
+          'Failed to size %s of type %s. Note that this may '
+          'impact cache sizing such that the cache is over '
+          'utilized which may lead to out of memory errors.',
+          obj,
+          type(obj),
+          exc_info=e)
+      _size_func.last_log_time = current_time  # type: ignore
+    # Use an arbitrary default size that would account for some of the object
+    # overhead.
+    return _DEFAULT_WEIGHT
+
+
+_size_func.last_log_time = 0  # type: ignore
+
+
+def _get_referents_func(*objs):
   # type: (List[Any]) -> List[Any]
 
   """Returns the list of objects accounted during cache measurement.
 
-  Users can inherit CacheAware to override which referrents should be
+  Users can inherit CacheAware to override which referents should be
   used when measuring the deep size of the object. The default is to
   use gc.get_referents(*objs).
   """
   rval = []
   for obj in objs:
-    if isinstance(obj, CacheAware):
-      rval.extend(obj.get_referents_for_cache())
+    if _safe_isinstance(obj, CacheAware):
+      rval.extend(obj.get_referents_for_cache())  # type: ignore
     else:
       rval.extend(gc.get_referents(obj))
   return rval
 
 
+def _filter_func(o):
+  # type: (Any) -> bool
+
+  """
+  Filter out specific types from being measured.
+
+  Note that we do want to measure the cost of weak references as they will only
+  stay in scope as long as other code references them and will effectively be
+  garbage collected as soon as there isn't a strong reference anymore.
+
+  Note that we cannot use the default filter function due to isinstance raising
+  an error on weakref.proxy types. See
+  https://github.com/liran-funaro/objsize/issues/6 for additional details.

Review Comment:
   The issue indicates there was a release that addresses it, I'm assuming that's not sufficient for our usecase?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org