You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/26 20:52:35 UTC

[GitHub] [beam] lukecwik opened a new pull request, #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

lukecwik opened a new pull request, #22924:
URL: https://github.com/apache/beam/pull/22924

   This relies on getting the deep object size. objsize seemed like an appropriate library to consume over the larger and more complex pympler library.
   
   We get rid of the state cache metrics in favor of plugging in with the status client output since the state cache metrics are not defined within the monitoring.proto as a well defined type. In the future we may re-introduce a similar set of metrics.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22924:
URL: https://github.com/apache/beam/pull/22924#issuecomment-1233336546

   @tweise note the removal of the metrics, do you see this to be an issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22924:
URL: https://github.com/apache/beam/pull/22924#issuecomment-1229041454

   Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22924:
URL: https://github.com/apache/beam/pull/22924#discussion_r957751000


##########
sdks/python/setup.py:
##########
@@ -223,6 +223,7 @@ def get_portability_package_data():
         'hdfs>=2.1.0,<3.0.0',
         'httplib2>=0.8,<0.21.0',
         'numpy>=1.14.3,<1.23.0',
+        'objsize>=0.5.1,<1'

Review Comment:
   Thanks a bunch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22924:
URL: https://github.com/apache/beam/pull/22924#issuecomment-1233335552

   R: @tweise @ryanthompson


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22924:
URL: https://github.com/apache/beam/pull/22924#discussion_r962020219


##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -20,245 +20,180 @@
 # mypy: disallow-untyped-defs
 
 import collections
+import gc
 import logging
 import threading
-from typing import TYPE_CHECKING
 from typing import Any
-from typing import Callable
-from typing import Generic
-from typing import Hashable
 from typing import List
 from typing import Optional
-from typing import Set
 from typing import Tuple
-from typing import TypeVar
 
-from apache_beam.metrics import monitoring_infos
-
-if TYPE_CHECKING:
-  from apache_beam.portability.api import metrics_pb2
+import objsize
 
 _LOGGER = logging.getLogger(__name__)
 
-CallableT = TypeVar('CallableT', bound='Callable')
-KT = TypeVar('KT')
-VT = TypeVar('VT')
 
+class WeightedValue(object):
+  """Value type that stores corresponding weight.
 
-class Metrics(object):
-  """Metrics container for state cache metrics."""
+  :arg value The value to be stored.
+  :arg weight The associated weight of the value. If unspecified, the objects
+  size will be used.
+  """
+  def __init__(self, value, weight):
+    # type: (Any, int) -> None
+    self._value = value
+    if weight <= 0:
+      raise ValueError(
+          'Expected weight to be > 0 for %s but received %d' % (value, weight))
+    self._weight = weight
+
+  def weight(self):
+    # type: () -> int
+    return self._weight
 
-  # A set of all registered metrics
-  ALL_METRICS = set()  # type: Set[Hashable]
-  PREFIX = "beam:metric:statecache:"
+  def value(self):
+    # type: () -> Any
+    return self._value
 
-  def __init__(self):
-    # type: () -> None
-    self._context = threading.local()
 
-  def initialize(self):
+class CacheAware(object):
+  def __init__(self):
     # type: () -> None
+    pass
 
-    """Needs to be called once per thread to initialize the local metrics cache.
-    """
-    if hasattr(self._context, 'metrics'):
-      return  # Already initialized
-    self._context.metrics = collections.defaultdict(int)
-
-  def count(self, name):
-    # type: (str) -> None
-    self._context.metrics[name] += 1
-
-  def hit_miss(self, total_name, hit_miss_name):
-    # type: (str, str) -> None
-    self._context.metrics[total_name] += 1
-    self._context.metrics[hit_miss_name] += 1
+  def get_referents_for_cache(self):
+    # type: () -> List[Any]
 
-  def get_monitoring_infos(self, cache_size, cache_capacity):
-    # type: (int, int) -> List[metrics_pb2.MonitoringInfo]
+    """Returns the list of objects accounted during cache measurement."""
+    raise NotImplementedError()
 
-    """Returns the metrics scoped to the current bundle."""
-    metrics = self._context.metrics
-    if len(metrics) == 0:
-      # No metrics collected, do not report
-      return []
-    # Add all missing metrics which were not reported
-    for key in Metrics.ALL_METRICS:
-      if key not in metrics:
-        metrics[key] = 0
-    # Gauges which reflect the state since last queried
-    gauges = [
-        monitoring_infos.int64_gauge(self.PREFIX + name, val) for name,
-        val in metrics.items()
-    ]
-    gauges.append(
-        monitoring_infos.int64_gauge(self.PREFIX + 'size', cache_size))
-    gauges.append(
-        monitoring_infos.int64_gauge(self.PREFIX + 'capacity', cache_capacity))
-    # Counters for the summary across all metrics
-    counters = [
-        monitoring_infos.int64_counter(self.PREFIX + name + '_total', val)
-        for name,
-        val in metrics.items()
-    ]
-    # Reinitialize metrics for this thread/bundle
-    metrics.clear()
-    return gauges + counters
 
-  @staticmethod
-  def counter_hit_miss(total_name, hit_name, miss_name):
-    # type: (str, str, str) -> Callable[[CallableT], CallableT]
+def get_referents_for_cache(*objs):
+  # type: (List[Any]) -> List[Any]
 
-    """Decorator for counting function calls and whether
-       the return value equals None (=miss) or not (=hit)."""
-    Metrics.ALL_METRICS.update([total_name, hit_name, miss_name])
+  """Returns the list of objects accounted during cache measurement.
 
-    def decorator(function):
-      # type: (CallableT) -> CallableT
-      def reporter(self, *args, **kwargs):
-        # type: (StateCache, Any, Any) -> Any
-        value = function(self, *args, **kwargs)
-        if value is None:
-          self._metrics.hit_miss(total_name, miss_name)
-        else:
-          self._metrics.hit_miss(total_name, hit_name)
-        return value
-
-      return reporter  # type: ignore[return-value]
-
-    return decorator
-
-  @staticmethod
-  def counter(metric_name):
-    # type: (str) -> Callable[[CallableT], CallableT]
-
-    """Decorator for counting function calls."""
-    Metrics.ALL_METRICS.add(metric_name)
-
-    def decorator(function):
-      # type: (CallableT) -> CallableT
-      def reporter(self, *args, **kwargs):
-        # type: (StateCache, Any, Any) -> Any
-        self._metrics.count(metric_name)
-        return function(self, *args, **kwargs)
-
-      return reporter  # type: ignore[return-value]
-
-    return decorator
+  Users can inherit CacheAware to override which referrents should be
+  used when measuring the deep size of the object. The default is to
+  use gc.get_referents(*objs).
+  """
+  # print(objs)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22924:
URL: https://github.com/apache/beam/pull/22924#issuecomment-1230655329

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tweise commented on pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
tweise commented on PR #22924:
URL: https://github.com/apache/beam/pull/22924#issuecomment-1233430671

   @lukecwik I'm not working on this any more but as the user I would look for metrics that give me an indication how the cache performs and should be tuned. So removing the metrics w/o a replacement looks like a step back.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22924:
URL: https://github.com/apache/beam/pull/22924#discussion_r962022811


##########
sdks/python/apache_beam/runners/worker/statecache_test.py:
##########
@@ -21,209 +21,157 @@
 import logging
 import unittest
 
-from apache_beam.metrics import monitoring_infos
+from apache_beam.runners.worker.statecache import CacheAware
 from apache_beam.runners.worker.statecache import StateCache
+from apache_beam.runners.worker.statecache import WeightedValue
 
 
 class StateCacheTest(unittest.TestCase):
   def test_empty_cache_get(self):
-    cache = self.get_cache(5)
+    cache = StateCache(5 << 20)
     self.assertEqual(cache.get("key", 'cache_token'), None)
     with self.assertRaises(Exception):
       # Invalid cache token provided
       self.assertEqual(cache.get("key", None), None)
-    self.verify_metrics(
-        cache,
-        {
-            'get': 1,
-            'put': 0,
-            'miss': 1,
-            'hit': 0,
-            'clear': 0,
-            'evict': 0,
-            'size': 0,
-            'capacity': 5
-        })
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 0/5 MB, hit 0.00%, lookups 1, evictions 0')
 
   def test_put_get(self):
-    cache = self.get_cache(5)
-    cache.put("key", "cache_token", "value")
+    cache = StateCache(5 << 20)
+    cache.put("key", "cache_token", WeightedValue("value", 1 << 20))
     self.assertEqual(cache.size(), 1)
     self.assertEqual(cache.get("key", "cache_token"), "value")
     self.assertEqual(cache.get("key", "cache_token2"), None)
     with self.assertRaises(Exception):
       self.assertEqual(cache.get("key", None), None)
-    self.verify_metrics(
-        cache,
-        {
-            'get': 2,
-            'put': 1,
-            'miss': 1,
-            'hit': 1,
-            'clear': 0,
-            'evict': 0,
-            'size': 1,
-            'capacity': 5
-        })
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 1/5 MB, hit 50.00%, lookups 2, evictions 0')
 
   def test_clear(self):
-    cache = self.get_cache(5)
+    cache = StateCache(5 << 20)
     cache.clear("new-key", "cache_token")
-    cache.put("key", "cache_token", ["value"])
+    cache.put("key", "cache_token", WeightedValue(["value"], 1 << 20))

Review Comment:
   We specifically test the `get_deep_size` version with `test_default_sized_put` and use `WeigtedValue` for the other tests to get consistent results in the test which aren't reliant on Python's GC and size estimation implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #22924:
URL: https://github.com/apache/beam/pull/22924#issuecomment-1233309703

   # [Codecov](https://codecov.io/gh/apache/beam/pull/22924?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#22924](https://codecov.io/gh/apache/beam/pull/22924?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c3ffcef) into [master](https://codecov.io/gh/apache/beam/commit/e83192c4fe5e2bc2b8e7f6c3568d00a49d983cdb?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e83192c) will **decrease** coverage by `0.03%`.
   > The diff coverage is `84.04%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #22924      +/-   ##
   ==========================================
   - Coverage   73.66%   73.62%   -0.04%     
   ==========================================
     Files         713      713              
     Lines       94970    94996      +26     
   ==========================================
   - Hits        69960    69944      -16     
   - Misses      23709    23751      +42     
     Partials     1301     1301              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.45% <84.04%> (-0.06%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/22924?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...thon/apache\_beam/runners/worker/sdk\_worker\_main.py](https://codecov.io/gh/apache/beam/pull/22924/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlcl9tYWluLnB5) | `78.48% <ø> (ø)` | |
   | [...python/apache\_beam/runners/worker/worker\_status.py](https://codecov.io/gh/apache/beam/pull/22924/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvd29ya2VyX3N0YXR1cy5weQ==) | `75.33% <46.15%> (-4.38%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/statecache.py](https://codecov.io/gh/apache/beam/pull/22924/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc3RhdGVjYWNoZS5weQ==) | `89.69% <88.88%> (-6.47%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/22924/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `77.89% <100.00%> (ø)` | |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/22924/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `88.94% <100.00%> (ø)` | |
   | [...dks/python/apache\_beam/metrics/monitoring\_infos.py](https://codecov.io/gh/apache/beam/pull/22924/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWV0cmljcy9tb25pdG9yaW5nX2luZm9zLnB5) | `92.50% <0.00%> (-4.50%)` | :arrow_down: |
   | [sdks/python/apache\_beam/typehints/schemas.py](https://codecov.io/gh/apache/beam/pull/22924/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL3NjaGVtYXMucHk=) | `93.84% <0.00%> (-0.31%)` | :arrow_down: |
   | [sdks/python/apache\_beam/portability/common\_urns.py](https://codecov.io/gh/apache/beam/pull/22924/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcG9ydGFiaWxpdHkvY29tbW9uX3VybnMucHk=) | `100.00% <0.00%> (ø)` | |
   | [...thon/apache\_beam/ml/inference/pytorch\_inference.py](https://codecov.io/gh/apache/beam/pull/22924/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWwvaW5mZXJlbmNlL3B5dG9yY2hfaW5mZXJlbmNlLnB5) | `0.00% <0.00%> (ø)` | |
   | ... and [5 more](https://codecov.io/gh/apache/beam/pull/22924/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22924:
URL: https://github.com/apache/beam/pull/22924#discussion_r957678980


##########
sdks/python/apache_beam/io/mongodbio_it_test.py:
##########
@@ -21,13 +21,12 @@
 import logging
 import time
 
-from pymongo import MongoClient

Review Comment:
   Note that I had to fix the mongodb imports due to pylint failing for me even though it passes at head.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22924:
URL: https://github.com/apache/beam/pull/22924#discussion_r962019019


##########
sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:
##########
@@ -653,7 +651,8 @@ def __init__(self,
 
     from apache_beam.transforms.environments import EmbeddedPythonGrpcEnvironment
     config = EmbeddedPythonGrpcEnvironment.parse_config(payload.decode('utf-8'))
-    self._state_cache_size = config.get('state_cache_size') or STATE_CACHE_SIZE
+    self._state_cache_size = (
+        config.get('state_cache_size') or STATE_CACHE_SIZE_MB) << 20

Review Comment:
   This is from an experiment. The idea is that we will get rid of this option and migrate to the caching options that Java has: 
   https://github.com/apache/beam/blob/4a8947e3cd8aa0a1f2ead7a471bbf7040f56e691/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java#L132
   https://github.com/apache/beam/blob/4a8947e3cd8aa0a1f2ead7a471bbf7040f56e691/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java#L150
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22924:
URL: https://github.com/apache/beam/pull/22924#discussion_r962020480


##########
sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:
##########
@@ -360,16 +360,14 @@ def __init__(self,
         self, data_plane.InMemoryDataChannel(), state, provision_info)
     self.control_conn = self  # type: ignore  # need Protocol to describe this
     self.data_conn = self.data_plane_handler
-    state_cache = StateCache(STATE_CACHE_SIZE)
+    state_cache = StateCache(STATE_CACHE_SIZE_MB << 20)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22924:
URL: https://github.com/apache/beam/pull/22924#issuecomment-1235910327

   @ryanthompson591 addressed PR comments, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryanthompson591 commented on a diff in pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
ryanthompson591 commented on code in PR #22924:
URL: https://github.com/apache/beam/pull/22924#discussion_r961885728


##########
sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:
##########
@@ -360,16 +360,14 @@ def __init__(self,
         self, data_plane.InMemoryDataChannel(), state, provision_info)
     self.control_conn = self  # type: ignore  # need Protocol to describe this
     self.data_conn = self.data_plane_handler
-    state_cache = StateCache(STATE_CACHE_SIZE)
+    state_cache = StateCache(STATE_CACHE_SIZE_MB << 20)

Review Comment:
   I like the idea of using a constant  here in some way like:
   STATE_CACHE_SIZE_MB << MB_TO_BYTES



##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -20,245 +20,180 @@
 # mypy: disallow-untyped-defs
 
 import collections
+import gc
 import logging
 import threading
-from typing import TYPE_CHECKING
 from typing import Any
-from typing import Callable
-from typing import Generic
-from typing import Hashable
 from typing import List
 from typing import Optional
-from typing import Set
 from typing import Tuple
-from typing import TypeVar
 
-from apache_beam.metrics import monitoring_infos
-
-if TYPE_CHECKING:
-  from apache_beam.portability.api import metrics_pb2
+import objsize
 
 _LOGGER = logging.getLogger(__name__)
 
-CallableT = TypeVar('CallableT', bound='Callable')
-KT = TypeVar('KT')
-VT = TypeVar('VT')
 
+class WeightedValue(object):
+  """Value type that stores corresponding weight.
 
-class Metrics(object):
-  """Metrics container for state cache metrics."""
+  :arg value The value to be stored.
+  :arg weight The associated weight of the value. If unspecified, the objects
+  size will be used.
+  """
+  def __init__(self, value, weight):
+    # type: (Any, int) -> None
+    self._value = value
+    if weight <= 0:
+      raise ValueError(
+          'Expected weight to be > 0 for %s but received %d' % (value, weight))
+    self._weight = weight
+
+  def weight(self):
+    # type: () -> int
+    return self._weight
 
-  # A set of all registered metrics
-  ALL_METRICS = set()  # type: Set[Hashable]
-  PREFIX = "beam:metric:statecache:"
+  def value(self):
+    # type: () -> Any
+    return self._value
 
-  def __init__(self):
-    # type: () -> None
-    self._context = threading.local()
 
-  def initialize(self):
+class CacheAware(object):
+  def __init__(self):
     # type: () -> None
+    pass
 
-    """Needs to be called once per thread to initialize the local metrics cache.
-    """
-    if hasattr(self._context, 'metrics'):
-      return  # Already initialized
-    self._context.metrics = collections.defaultdict(int)
-
-  def count(self, name):
-    # type: (str) -> None
-    self._context.metrics[name] += 1
-
-  def hit_miss(self, total_name, hit_miss_name):
-    # type: (str, str) -> None
-    self._context.metrics[total_name] += 1
-    self._context.metrics[hit_miss_name] += 1
+  def get_referents_for_cache(self):
+    # type: () -> List[Any]
 
-  def get_monitoring_infos(self, cache_size, cache_capacity):
-    # type: (int, int) -> List[metrics_pb2.MonitoringInfo]
+    """Returns the list of objects accounted during cache measurement."""
+    raise NotImplementedError()
 
-    """Returns the metrics scoped to the current bundle."""
-    metrics = self._context.metrics
-    if len(metrics) == 0:
-      # No metrics collected, do not report
-      return []
-    # Add all missing metrics which were not reported
-    for key in Metrics.ALL_METRICS:
-      if key not in metrics:
-        metrics[key] = 0
-    # Gauges which reflect the state since last queried
-    gauges = [
-        monitoring_infos.int64_gauge(self.PREFIX + name, val) for name,
-        val in metrics.items()
-    ]
-    gauges.append(
-        monitoring_infos.int64_gauge(self.PREFIX + 'size', cache_size))
-    gauges.append(
-        monitoring_infos.int64_gauge(self.PREFIX + 'capacity', cache_capacity))
-    # Counters for the summary across all metrics
-    counters = [
-        monitoring_infos.int64_counter(self.PREFIX + name + '_total', val)
-        for name,
-        val in metrics.items()
-    ]
-    # Reinitialize metrics for this thread/bundle
-    metrics.clear()
-    return gauges + counters
 
-  @staticmethod
-  def counter_hit_miss(total_name, hit_name, miss_name):
-    # type: (str, str, str) -> Callable[[CallableT], CallableT]
+def get_referents_for_cache(*objs):
+  # type: (List[Any]) -> List[Any]
 
-    """Decorator for counting function calls and whether
-       the return value equals None (=miss) or not (=hit)."""
-    Metrics.ALL_METRICS.update([total_name, hit_name, miss_name])
+  """Returns the list of objects accounted during cache measurement.
 
-    def decorator(function):
-      # type: (CallableT) -> CallableT
-      def reporter(self, *args, **kwargs):
-        # type: (StateCache, Any, Any) -> Any
-        value = function(self, *args, **kwargs)
-        if value is None:
-          self._metrics.hit_miss(total_name, miss_name)
-        else:
-          self._metrics.hit_miss(total_name, hit_name)
-        return value
-
-      return reporter  # type: ignore[return-value]
-
-    return decorator
-
-  @staticmethod
-  def counter(metric_name):
-    # type: (str) -> Callable[[CallableT], CallableT]
-
-    """Decorator for counting function calls."""
-    Metrics.ALL_METRICS.add(metric_name)
-
-    def decorator(function):
-      # type: (CallableT) -> CallableT
-      def reporter(self, *args, **kwargs):
-        # type: (StateCache, Any, Any) -> Any
-        self._metrics.count(metric_name)
-        return function(self, *args, **kwargs)
-
-      return reporter  # type: ignore[return-value]
-
-    return decorator
+  Users can inherit CacheAware to override which referrents should be
+  used when measuring the deep size of the object. The default is to
+  use gc.get_referents(*objs).
+  """
+  # print(objs)
+  rval = []
+  for obj in objs:
+    if isinstance(obj, CacheAware):
+      rval.extend(obj.get_referents_for_cache())
+    else:
+      rval.extend(gc.get_referents(obj))
+  return rval
 
 
 class StateCache(object):
-  """ Cache for Beam state access, scoped by state key and cache_token.
-      Assumes a bag state implementation.
+  """Cache for Beam state access, scoped by state key and cache_token.
+     Assumes a bag state implementation.
 
-  For a given state_key, caches a (cache_token, value) tuple and allows to
+  For a given state_key and cache_token, caches a value and allows to
     a) read from the cache (get),
            if the currently stored cache_token matches the provided
-    a) write to the cache (put),
+    b) write to the cache (put),
            storing the new value alongside with a cache token
-    c) append to the currently cache item (extend),
-           if the currently stored cache_token matches the provided
     c) empty a cached element (clear),
            if the currently stored cache_token matches the provided
-    d) evict a cached element (evict)
+    d) invalidate a cached element (invalidate)
+    e) invalidate all cached elements (invalidate_all)
 
   The operations on the cache are thread-safe for use by multiple workers.
 
-  :arg max_entries The maximum number of entries to store in the cache.
-  TODO Memory-based caching: https://github.com/apache/beam/issues/19857
+  :arg max_weight The maximum weight of entries to store in the cache in bytes.
   """
-  def __init__(self, max_entries):
+  def __init__(self, max_weight):
     # type: (int) -> None
-    _LOGGER.info('Creating state cache with size %s', max_entries)
-    self._missing = None
-    self._cache = self.LRUCache[Tuple[bytes, Optional[bytes]],
-                                Any](max_entries, self._missing)
+    _LOGGER.info('Creating state cache with size %s', max_weight)
+    self._max_weight = max_weight
+    self._current_weight = 0
+    self._cache = collections.OrderedDict(
+    )  # type: collections.OrderedDict[Tuple[bytes, Optional[bytes]], WeightedValue]
+    self._hit_count = 0
+    self._miss_count = 0
+    self._evict_count = 0
     self._lock = threading.RLock()
-    self._metrics = Metrics()
 
-  @Metrics.counter_hit_miss("get", "hit", "miss")
   def get(self, state_key, cache_token):
     # type: (bytes, Optional[bytes]) -> Any
     assert cache_token and self.is_cache_enabled()
+    key = (state_key, cache_token)
     with self._lock:
-      return self._cache.get((state_key, cache_token))
+      value = self._cache.get(key, None)
+      if value is None:
+        self._miss_count += 1
+        return None
+      self._cache.move_to_end(key)
+      self._hit_count += 1
+      return value.value()
 
-  @Metrics.counter("put")
   def put(self, state_key, cache_token, value):
     # type: (bytes, Optional[bytes], Any) -> None
     assert cache_token and self.is_cache_enabled()
+    if not isinstance(value, WeightedValue):
+      weight = objsize.get_deep_size(
+          value, get_referents_func=get_referents_for_cache)
+      if weight <= 0:
+        _LOGGER.warning(

Review Comment:
   here this logs a warning and elsewhere it is an exception. What's up with that?



##########
sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:
##########
@@ -653,7 +651,8 @@ def __init__(self,
 
     from apache_beam.transforms.environments import EmbeddedPythonGrpcEnvironment
     config = EmbeddedPythonGrpcEnvironment.parse_config(payload.decode('utf-8'))
-    self._state_cache_size = config.get('state_cache_size') or STATE_CACHE_SIZE
+    self._state_cache_size = (
+        config.get('state_cache_size') or STATE_CACHE_SIZE_MB) << 20

Review Comment:
   Is it clear that state_cache_size in the config is in MB?



##########
sdks/python/apache_beam/runners/worker/statecache_test.py:
##########
@@ -21,209 +21,157 @@
 import logging
 import unittest
 
-from apache_beam.metrics import monitoring_infos
+from apache_beam.runners.worker.statecache import CacheAware
 from apache_beam.runners.worker.statecache import StateCache
+from apache_beam.runners.worker.statecache import WeightedValue
 
 
 class StateCacheTest(unittest.TestCase):
   def test_empty_cache_get(self):
-    cache = self.get_cache(5)
+    cache = StateCache(5 << 20)
     self.assertEqual(cache.get("key", 'cache_token'), None)
     with self.assertRaises(Exception):
       # Invalid cache token provided
       self.assertEqual(cache.get("key", None), None)
-    self.verify_metrics(
-        cache,
-        {
-            'get': 1,
-            'put': 0,
-            'miss': 1,
-            'hit': 0,
-            'clear': 0,
-            'evict': 0,
-            'size': 0,
-            'capacity': 5
-        })
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 0/5 MB, hit 0.00%, lookups 1, evictions 0')
 
   def test_put_get(self):
-    cache = self.get_cache(5)
-    cache.put("key", "cache_token", "value")
+    cache = StateCache(5 << 20)
+    cache.put("key", "cache_token", WeightedValue("value", 1 << 20))
     self.assertEqual(cache.size(), 1)
     self.assertEqual(cache.get("key", "cache_token"), "value")
     self.assertEqual(cache.get("key", "cache_token2"), None)
     with self.assertRaises(Exception):
       self.assertEqual(cache.get("key", None), None)
-    self.verify_metrics(
-        cache,
-        {
-            'get': 2,
-            'put': 1,
-            'miss': 1,
-            'hit': 1,
-            'clear': 0,
-            'evict': 0,
-            'size': 1,
-            'capacity': 5
-        })
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 1/5 MB, hit 50.00%, lookups 2, evictions 0')
 
   def test_clear(self):
-    cache = self.get_cache(5)
+    cache = StateCache(5 << 20)
     cache.clear("new-key", "cache_token")
-    cache.put("key", "cache_token", ["value"])
+    cache.put("key", "cache_token", WeightedValue(["value"], 1 << 20))

Review Comment:
   I might be confused. But isn't a weighted value something different than a regular value? Would it make sense for these tests to stick with regular values and then make a new set of tests for weighted values?



##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -20,245 +20,180 @@
 # mypy: disallow-untyped-defs
 
 import collections
+import gc
 import logging
 import threading
-from typing import TYPE_CHECKING
 from typing import Any
-from typing import Callable
-from typing import Generic
-from typing import Hashable
 from typing import List
 from typing import Optional
-from typing import Set
 from typing import Tuple
-from typing import TypeVar
 
-from apache_beam.metrics import monitoring_infos
-
-if TYPE_CHECKING:
-  from apache_beam.portability.api import metrics_pb2
+import objsize
 
 _LOGGER = logging.getLogger(__name__)
 
-CallableT = TypeVar('CallableT', bound='Callable')
-KT = TypeVar('KT')
-VT = TypeVar('VT')
 
+class WeightedValue(object):
+  """Value type that stores corresponding weight.
 
-class Metrics(object):
-  """Metrics container for state cache metrics."""
+  :arg value The value to be stored.
+  :arg weight The associated weight of the value. If unspecified, the objects
+  size will be used.
+  """
+  def __init__(self, value, weight):
+    # type: (Any, int) -> None
+    self._value = value
+    if weight <= 0:
+      raise ValueError(
+          'Expected weight to be > 0 for %s but received %d' % (value, weight))
+    self._weight = weight
+
+  def weight(self):
+    # type: () -> int
+    return self._weight
 
-  # A set of all registered metrics
-  ALL_METRICS = set()  # type: Set[Hashable]
-  PREFIX = "beam:metric:statecache:"
+  def value(self):
+    # type: () -> Any
+    return self._value
 
-  def __init__(self):
-    # type: () -> None
-    self._context = threading.local()
 
-  def initialize(self):
+class CacheAware(object):
+  def __init__(self):
     # type: () -> None
+    pass
 
-    """Needs to be called once per thread to initialize the local metrics cache.
-    """
-    if hasattr(self._context, 'metrics'):
-      return  # Already initialized
-    self._context.metrics = collections.defaultdict(int)
-
-  def count(self, name):
-    # type: (str) -> None
-    self._context.metrics[name] += 1
-
-  def hit_miss(self, total_name, hit_miss_name):
-    # type: (str, str) -> None
-    self._context.metrics[total_name] += 1
-    self._context.metrics[hit_miss_name] += 1
+  def get_referents_for_cache(self):
+    # type: () -> List[Any]
 
-  def get_monitoring_infos(self, cache_size, cache_capacity):
-    # type: (int, int) -> List[metrics_pb2.MonitoringInfo]
+    """Returns the list of objects accounted during cache measurement."""
+    raise NotImplementedError()
 
-    """Returns the metrics scoped to the current bundle."""
-    metrics = self._context.metrics
-    if len(metrics) == 0:
-      # No metrics collected, do not report
-      return []
-    # Add all missing metrics which were not reported
-    for key in Metrics.ALL_METRICS:
-      if key not in metrics:
-        metrics[key] = 0
-    # Gauges which reflect the state since last queried
-    gauges = [
-        monitoring_infos.int64_gauge(self.PREFIX + name, val) for name,
-        val in metrics.items()
-    ]
-    gauges.append(
-        monitoring_infos.int64_gauge(self.PREFIX + 'size', cache_size))
-    gauges.append(
-        monitoring_infos.int64_gauge(self.PREFIX + 'capacity', cache_capacity))
-    # Counters for the summary across all metrics
-    counters = [
-        monitoring_infos.int64_counter(self.PREFIX + name + '_total', val)
-        for name,
-        val in metrics.items()
-    ]
-    # Reinitialize metrics for this thread/bundle
-    metrics.clear()
-    return gauges + counters
 
-  @staticmethod
-  def counter_hit_miss(total_name, hit_name, miss_name):
-    # type: (str, str, str) -> Callable[[CallableT], CallableT]
+def get_referents_for_cache(*objs):
+  # type: (List[Any]) -> List[Any]
 
-    """Decorator for counting function calls and whether
-       the return value equals None (=miss) or not (=hit)."""
-    Metrics.ALL_METRICS.update([total_name, hit_name, miss_name])
+  """Returns the list of objects accounted during cache measurement.
 
-    def decorator(function):
-      # type: (CallableT) -> CallableT
-      def reporter(self, *args, **kwargs):
-        # type: (StateCache, Any, Any) -> Any
-        value = function(self, *args, **kwargs)
-        if value is None:
-          self._metrics.hit_miss(total_name, miss_name)
-        else:
-          self._metrics.hit_miss(total_name, hit_name)
-        return value
-
-      return reporter  # type: ignore[return-value]
-
-    return decorator
-
-  @staticmethod
-  def counter(metric_name):
-    # type: (str) -> Callable[[CallableT], CallableT]
-
-    """Decorator for counting function calls."""
-    Metrics.ALL_METRICS.add(metric_name)
-
-    def decorator(function):
-      # type: (CallableT) -> CallableT
-      def reporter(self, *args, **kwargs):
-        # type: (StateCache, Any, Any) -> Any
-        self._metrics.count(metric_name)
-        return function(self, *args, **kwargs)
-
-      return reporter  # type: ignore[return-value]
-
-    return decorator
+  Users can inherit CacheAware to override which referrents should be
+  used when measuring the deep size of the object. The default is to
+  use gc.get_referents(*objs).
+  """
+  # print(objs)

Review Comment:
   remove this line?
   



##########
sdks/python/apache_beam/runners/worker/statecache_test.py:
##########
@@ -21,209 +21,157 @@
 import logging
 import unittest
 
-from apache_beam.metrics import monitoring_infos
+from apache_beam.runners.worker.statecache import CacheAware
 from apache_beam.runners.worker.statecache import StateCache
+from apache_beam.runners.worker.statecache import WeightedValue
 
 
 class StateCacheTest(unittest.TestCase):
   def test_empty_cache_get(self):
-    cache = self.get_cache(5)
+    cache = StateCache(5 << 20)
     self.assertEqual(cache.get("key", 'cache_token'), None)
     with self.assertRaises(Exception):
       # Invalid cache token provided
       self.assertEqual(cache.get("key", None), None)
-    self.verify_metrics(
-        cache,
-        {
-            'get': 1,
-            'put': 0,
-            'miss': 1,
-            'hit': 0,
-            'clear': 0,
-            'evict': 0,
-            'size': 0,
-            'capacity': 5
-        })
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 0/5 MB, hit 0.00%, lookups 1, evictions 0')
 
   def test_put_get(self):
-    cache = self.get_cache(5)
-    cache.put("key", "cache_token", "value")
+    cache = StateCache(5 << 20)
+    cache.put("key", "cache_token", WeightedValue("value", 1 << 20))
     self.assertEqual(cache.size(), 1)
     self.assertEqual(cache.get("key", "cache_token"), "value")
     self.assertEqual(cache.get("key", "cache_token2"), None)
     with self.assertRaises(Exception):
       self.assertEqual(cache.get("key", None), None)
-    self.verify_metrics(
-        cache,
-        {
-            'get': 2,
-            'put': 1,
-            'miss': 1,
-            'hit': 1,
-            'clear': 0,
-            'evict': 0,
-            'size': 1,
-            'capacity': 5
-        })
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 1/5 MB, hit 50.00%, lookups 2, evictions 0')
 
   def test_clear(self):
-    cache = self.get_cache(5)
+    cache = StateCache(5 << 20)
     cache.clear("new-key", "cache_token")
-    cache.put("key", "cache_token", ["value"])
+    cache.put("key", "cache_token", WeightedValue(["value"], 1 << 20))
     self.assertEqual(cache.size(), 2)
     self.assertEqual(cache.get("new-key", "new_token"), None)
     self.assertEqual(cache.get("key", "cache_token"), ['value'])
     # test clear without existing key/token
     cache.clear("non-existing", "token")
     self.assertEqual(cache.size(), 3)
     self.assertEqual(cache.get("non-existing", "token"), [])
-    self.verify_metrics(
-        cache,
-        {
-            'get': 3,
-            'put': 1,
-            'miss': 1,
-            'hit': 2,
-            'clear': 2,
-            'evict': 0,
-            'size': 3,
-            'capacity': 5
-        })
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 1/5 MB, hit 66.67%, lookups 3, evictions 0')
+
+  def test_default_sized_put(self):
+    cache = StateCache(5 << 20)
+    cache.put("key", "cache_token", bytearray(1 << 20))
+    cache.put("key2", "cache_token", bytearray(1 << 20))
+    cache.put("key3", "cache_token", bytearray(1 << 20))
+    self.assertEqual(cache.get("key3", "cache_token"), bytearray(1 << 20))
+    cache.put("key4", "cache_token", bytearray(1 << 20))
+    cache.put("key5", "cache_token", bytearray(1 << 20))
+    # note that each byte array instance takes slightly over 1 MB which is why
+    # these 5 byte arrays can't all be stored in the cache causing a single
+    # eviction
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 4/5 MB, hit 100.00%, lookups 1, evictions 1')
 
   def test_max_size(self):
-    cache = self.get_cache(2)
-    cache.put("key", "cache_token", "value")
-    cache.put("key2", "cache_token", "value")
-    self.assertEqual(cache.size(), 2)
-    cache.put("key2", "cache_token", "value")
+    cache = StateCache(2 << 20)
+    cache.put("key", "cache_token", WeightedValue("value", 1 << 20))
+    cache.put("key2", "cache_token", WeightedValue("value2", 1 << 20))
     self.assertEqual(cache.size(), 2)
-    cache.put("key", "cache_token", "value")
+    cache.put("key3", "cache_token", WeightedValue("value3", 1 << 20))
     self.assertEqual(cache.size(), 2)
-    self.verify_metrics(
-        cache,
-        {
-            'get': 0,
-            'put': 4,
-            'miss': 0,
-            'hit': 0,
-            'clear': 0,
-            'evict': 0,
-            'size': 2,
-            'capacity': 2
-        })
-
-  def test_evict_all(self):
-    cache = self.get_cache(5)
-    cache.put("key", "cache_token", "value")
-    cache.put("key2", "cache_token", "value2")
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 2/2 MB, hit 100.00%, lookups 0, evictions 1')
+
+  def test_invalidate_all(self):
+    cache = StateCache(5 << 20)
+    cache.put("key", "cache_token", WeightedValue("value", 1 << 20))
+    cache.put("key2", "cache_token", WeightedValue("value2", 1 << 20))
     self.assertEqual(cache.size(), 2)
-    cache.evict_all()
+    cache.invalidate_all()
     self.assertEqual(cache.size(), 0)
     self.assertEqual(cache.get("key", "cache_token"), None)
     self.assertEqual(cache.get("key2", "cache_token"), None)
-    self.verify_metrics(
-        cache,
-        {
-            'get': 2,
-            'put': 2,
-            'miss': 2,
-            'hit': 0,
-            'clear': 0,
-            'evict': 0,
-            'size': 0,
-            'capacity': 5
-        })
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 0/5 MB, hit 0.00%, lookups 2, evictions 0')
 
   def test_lru(self):
-    cache = self.get_cache(5)
-    cache.put("key", "cache_token", "value")
-    cache.put("key2", "cache_token2", "value2")
-    cache.put("key3", "cache_token", "value0")
-    cache.put("key3", "cache_token", "value3")
-    cache.put("key4", "cache_token4", "value4")
-    cache.put("key5", "cache_token", "value0")
-    cache.put("key5", "cache_token", ["value5"])
+    cache = StateCache(5 << 20)
+    cache.put("key", "cache_token", WeightedValue("value", 1 << 20))
+    cache.put("key2", "cache_token2", WeightedValue("value2", 1 << 20))
+    cache.put("key3", "cache_token", WeightedValue("value0", 1 << 20))
+    cache.put("key3", "cache_token", WeightedValue("value3", 1 << 20))
+    cache.put("key4", "cache_token4", WeightedValue("value4", 1 << 20))
+    cache.put("key5", "cache_token", WeightedValue("value0", 1 << 20))
+    cache.put("key5", "cache_token", WeightedValue(["value5"], 1 << 20))
     self.assertEqual(cache.size(), 5)
     self.assertEqual(cache.get("key", "cache_token"), "value")
     self.assertEqual(cache.get("key2", "cache_token2"), "value2")
     self.assertEqual(cache.get("key3", "cache_token"), "value3")
     self.assertEqual(cache.get("key4", "cache_token4"), "value4")
     self.assertEqual(cache.get("key5", "cache_token"), ["value5"])
     # insert another key to trigger cache eviction
-    cache.put("key6", "cache_token2", "value7")
+    cache.put("key6", "cache_token2", WeightedValue("value6", 1 << 20))
     self.assertEqual(cache.size(), 5)
     # least recently used key should be gone ("key")
     self.assertEqual(cache.get("key", "cache_token"), None)
     # trigger a read on "key2"
     cache.get("key2", "cache_token2")
     # insert another key to trigger cache eviction
-    cache.put("key7", "cache_token", "value7")
+    cache.put("key7", "cache_token", WeightedValue("value7", 1 << 20))
     self.assertEqual(cache.size(), 5)
     # least recently used key should be gone ("key3")
     self.assertEqual(cache.get("key3", "cache_token"), None)
     # trigger a put on "key2"
-    cache.put("key2", "cache_token", "put")
+    cache.put("key2", "cache_token", WeightedValue("put", 1 << 20))
     self.assertEqual(cache.size(), 5)
     # insert another key to trigger cache eviction
-    cache.put("key8", "cache_token", "value8")
+    cache.put("key8", "cache_token", WeightedValue("value8", 1 << 20))
     self.assertEqual(cache.size(), 5)
     # least recently used key should be gone ("key4")
     self.assertEqual(cache.get("key4", "cache_token"), None)
     # make "key5" used by writing to it
-    cache.put("key5", "cache_token", "val")
+    cache.put("key5", "cache_token", WeightedValue("val", 1 << 20))
     # least recently used key should be gone ("key6")
     self.assertEqual(cache.get("key6", "cache_token"), None)
-    self.verify_metrics(
-        cache,
-        {
-            'get': 10,
-            'put': 12,
-            'miss': 4,
-            'hit': 6,
-            'clear': 0,
-            'evict': 0,
-            'size': 5,
-            'capacity': 5
-        })
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 5/5 MB, hit 60.00%, lookups 10, evictions 5')
 
   def test_is_cached_enabled(self):
-    cache = self.get_cache(1)
+    cache = StateCache(1 << 20)
     self.assertEqual(cache.is_cache_enabled(), True)
-    self.verify_metrics(cache, {})
-    cache = self.get_cache(0)
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 0/1 MB, hit 100.00%, lookups 0, evictions 0')
+    cache = StateCache(0)
     self.assertEqual(cache.is_cache_enabled(), False)
-    self.verify_metrics(cache, {})
-
-  def verify_metrics(self, cache, expected_metrics):
-    infos = cache.get_monitoring_infos()
-    # Reconstruct metrics dictionary from monitoring infos
-    metrics = {
-        info.urn.rsplit(':',
-                        1)[1]: monitoring_infos.extract_gauge_value(info)[1]
-        for info in infos if "_total" not in info.urn and
-        info.type == monitoring_infos.LATEST_INT64_TYPE
-    }
-    self.assertDictEqual(metrics, expected_metrics)
-    # Metrics and total metrics should be identical for a single bundle.
-    # The following two gauges are not part of the total metrics:
-    try:
-      del metrics['capacity']
-      del metrics['size']
-    except KeyError:
-      pass
-    total_metrics = {
-        info.urn.rsplit(':', 1)[1].rsplit("_total")[0]:
-        monitoring_infos.extract_counter_value(info)
-        for info in infos
-        if "_total" in info.urn and info.type == monitoring_infos.SUM_INT64_TYPE
-    }
-    self.assertDictEqual(metrics, total_metrics)
-
-  @staticmethod
-  def get_cache(size):
-    cache = StateCache(size)
-    cache.initialize_metrics()
-    return cache
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 0/0 MB, hit 100.00%, lookups 0, evictions 0')
+
+  def test_get_referents_for_cache(self):
+    class GetReferentsForCache(CacheAware):
+      def __init__(self):
+        self.key = bytearray(1 << 20)
+        self.value = bytearray(2 << 20)
+
+      def get_referents_for_cache(self):
+        return [self.key]
+
+    cache = StateCache(5 << 20)
+    cache.put("key", "cache_token", GetReferentsForCache())

Review Comment:
   My first reading of this was a little hard but I think it makes sense.
   
   If I don't have this right, maybe add a comment so it will make sense.
   
   CacheAware allows overriding get_references_for_cache to return a list of objects.  It's not clear to me why we return the key in this test instead of the key/value.
   
   then we're just testing that we measure the size of the referents as returned in this object right?



##########
sdks/python/apache_beam/runners/portability/flink_runner_test.py:
##########
@@ -296,95 +292,6 @@ def test_flattened_side_input(self):
   def test_metrics(self):
     super().test_metrics(check_gauge=False)
 
-  def test_flink_metrics(self):

Review Comment:
   why does this PR involve removing the metrics test? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryanthompson591 commented on a diff in pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
ryanthompson591 commented on code in PR #22924:
URL: https://github.com/apache/beam/pull/22924#discussion_r962033392


##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -20,245 +20,179 @@
 # mypy: disallow-untyped-defs
 
 import collections
+import gc
 import logging
 import threading
-from typing import TYPE_CHECKING
 from typing import Any
-from typing import Callable
-from typing import Generic
-from typing import Hashable
 from typing import List
 from typing import Optional
-from typing import Set
 from typing import Tuple
-from typing import TypeVar
 
-from apache_beam.metrics import monitoring_infos
-
-if TYPE_CHECKING:
-  from apache_beam.portability.api import metrics_pb2
+import objsize
 
 _LOGGER = logging.getLogger(__name__)
 
-CallableT = TypeVar('CallableT', bound='Callable')
-KT = TypeVar('KT')
-VT = TypeVar('VT')
 
+class WeightedValue(object):
+  """Value type that stores corresponding weight.
 
-class Metrics(object):
-  """Metrics container for state cache metrics."""
+  :arg value The value to be stored.
+  :arg weight The associated weight of the value. If unspecified, the objects

Review Comment:
   A suggestion:  Maybe specify the unit of the weight.  I think it's supposed to be bytes, is that right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22924:
URL: https://github.com/apache/beam/pull/22924#issuecomment-1233336542

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
pabloem commented on code in PR #22924:
URL: https://github.com/apache/beam/pull/22924#discussion_r957742170


##########
sdks/python/setup.py:
##########
@@ -223,6 +223,7 @@ def get_portability_package_data():
         'hdfs>=2.1.0,<3.0.0',
         'httplib2>=0.8,<0.21.0',
         'numpy>=1.14.3,<1.23.0',
+        'objsize>=0.5.1,<1'

Review Comment:
   ```suggestion
           'objsize>=0.5.1,<1',
   ```
   you need that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22924:
URL: https://github.com/apache/beam/pull/22924#discussion_r962022318


##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -20,245 +20,180 @@
 # mypy: disallow-untyped-defs
 
 import collections
+import gc
 import logging
 import threading
-from typing import TYPE_CHECKING
 from typing import Any
-from typing import Callable
-from typing import Generic
-from typing import Hashable
 from typing import List
 from typing import Optional
-from typing import Set
 from typing import Tuple
-from typing import TypeVar
 
-from apache_beam.metrics import monitoring_infos
-
-if TYPE_CHECKING:
-  from apache_beam.portability.api import metrics_pb2
+import objsize
 
 _LOGGER = logging.getLogger(__name__)
 
-CallableT = TypeVar('CallableT', bound='Callable')
-KT = TypeVar('KT')
-VT = TypeVar('VT')
 
+class WeightedValue(object):
+  """Value type that stores corresponding weight.
 
-class Metrics(object):
-  """Metrics container for state cache metrics."""
+  :arg value The value to be stored.
+  :arg weight The associated weight of the value. If unspecified, the objects
+  size will be used.
+  """
+  def __init__(self, value, weight):
+    # type: (Any, int) -> None
+    self._value = value
+    if weight <= 0:
+      raise ValueError(
+          'Expected weight to be > 0 for %s but received %d' % (value, weight))
+    self._weight = weight
+
+  def weight(self):
+    # type: () -> int
+    return self._weight
 
-  # A set of all registered metrics
-  ALL_METRICS = set()  # type: Set[Hashable]
-  PREFIX = "beam:metric:statecache:"
+  def value(self):
+    # type: () -> Any
+    return self._value
 
-  def __init__(self):
-    # type: () -> None
-    self._context = threading.local()
 
-  def initialize(self):
+class CacheAware(object):
+  def __init__(self):
     # type: () -> None
+    pass
 
-    """Needs to be called once per thread to initialize the local metrics cache.
-    """
-    if hasattr(self._context, 'metrics'):
-      return  # Already initialized
-    self._context.metrics = collections.defaultdict(int)
-
-  def count(self, name):
-    # type: (str) -> None
-    self._context.metrics[name] += 1
-
-  def hit_miss(self, total_name, hit_miss_name):
-    # type: (str, str) -> None
-    self._context.metrics[total_name] += 1
-    self._context.metrics[hit_miss_name] += 1
+  def get_referents_for_cache(self):
+    # type: () -> List[Any]
 
-  def get_monitoring_infos(self, cache_size, cache_capacity):
-    # type: (int, int) -> List[metrics_pb2.MonitoringInfo]
+    """Returns the list of objects accounted during cache measurement."""
+    raise NotImplementedError()
 
-    """Returns the metrics scoped to the current bundle."""
-    metrics = self._context.metrics
-    if len(metrics) == 0:
-      # No metrics collected, do not report
-      return []
-    # Add all missing metrics which were not reported
-    for key in Metrics.ALL_METRICS:
-      if key not in metrics:
-        metrics[key] = 0
-    # Gauges which reflect the state since last queried
-    gauges = [
-        monitoring_infos.int64_gauge(self.PREFIX + name, val) for name,
-        val in metrics.items()
-    ]
-    gauges.append(
-        monitoring_infos.int64_gauge(self.PREFIX + 'size', cache_size))
-    gauges.append(
-        monitoring_infos.int64_gauge(self.PREFIX + 'capacity', cache_capacity))
-    # Counters for the summary across all metrics
-    counters = [
-        monitoring_infos.int64_counter(self.PREFIX + name + '_total', val)
-        for name,
-        val in metrics.items()
-    ]
-    # Reinitialize metrics for this thread/bundle
-    metrics.clear()
-    return gauges + counters
 
-  @staticmethod
-  def counter_hit_miss(total_name, hit_name, miss_name):
-    # type: (str, str, str) -> Callable[[CallableT], CallableT]
+def get_referents_for_cache(*objs):
+  # type: (List[Any]) -> List[Any]
 
-    """Decorator for counting function calls and whether
-       the return value equals None (=miss) or not (=hit)."""
-    Metrics.ALL_METRICS.update([total_name, hit_name, miss_name])
+  """Returns the list of objects accounted during cache measurement.
 
-    def decorator(function):
-      # type: (CallableT) -> CallableT
-      def reporter(self, *args, **kwargs):
-        # type: (StateCache, Any, Any) -> Any
-        value = function(self, *args, **kwargs)
-        if value is None:
-          self._metrics.hit_miss(total_name, miss_name)
-        else:
-          self._metrics.hit_miss(total_name, hit_name)
-        return value
-
-      return reporter  # type: ignore[return-value]
-
-    return decorator
-
-  @staticmethod
-  def counter(metric_name):
-    # type: (str) -> Callable[[CallableT], CallableT]
-
-    """Decorator for counting function calls."""
-    Metrics.ALL_METRICS.add(metric_name)
-
-    def decorator(function):
-      # type: (CallableT) -> CallableT
-      def reporter(self, *args, **kwargs):
-        # type: (StateCache, Any, Any) -> Any
-        self._metrics.count(metric_name)
-        return function(self, *args, **kwargs)
-
-      return reporter  # type: ignore[return-value]
-
-    return decorator
+  Users can inherit CacheAware to override which referrents should be
+  used when measuring the deep size of the object. The default is to
+  use gc.get_referents(*objs).
+  """
+  # print(objs)
+  rval = []
+  for obj in objs:
+    if isinstance(obj, CacheAware):
+      rval.extend(obj.get_referents_for_cache())
+    else:
+      rval.extend(gc.get_referents(obj))
+  return rval
 
 
 class StateCache(object):
-  """ Cache for Beam state access, scoped by state key and cache_token.
-      Assumes a bag state implementation.
+  """Cache for Beam state access, scoped by state key and cache_token.
+     Assumes a bag state implementation.
 
-  For a given state_key, caches a (cache_token, value) tuple and allows to
+  For a given state_key and cache_token, caches a value and allows to
     a) read from the cache (get),
            if the currently stored cache_token matches the provided
-    a) write to the cache (put),
+    b) write to the cache (put),
            storing the new value alongside with a cache token
-    c) append to the currently cache item (extend),
-           if the currently stored cache_token matches the provided
     c) empty a cached element (clear),
            if the currently stored cache_token matches the provided
-    d) evict a cached element (evict)
+    d) invalidate a cached element (invalidate)
+    e) invalidate all cached elements (invalidate_all)
 
   The operations on the cache are thread-safe for use by multiple workers.
 
-  :arg max_entries The maximum number of entries to store in the cache.
-  TODO Memory-based caching: https://github.com/apache/beam/issues/19857
+  :arg max_weight The maximum weight of entries to store in the cache in bytes.
   """
-  def __init__(self, max_entries):
+  def __init__(self, max_weight):
     # type: (int) -> None
-    _LOGGER.info('Creating state cache with size %s', max_entries)
-    self._missing = None
-    self._cache = self.LRUCache[Tuple[bytes, Optional[bytes]],
-                                Any](max_entries, self._missing)
+    _LOGGER.info('Creating state cache with size %s', max_weight)
+    self._max_weight = max_weight
+    self._current_weight = 0
+    self._cache = collections.OrderedDict(
+    )  # type: collections.OrderedDict[Tuple[bytes, Optional[bytes]], WeightedValue]
+    self._hit_count = 0
+    self._miss_count = 0
+    self._evict_count = 0
     self._lock = threading.RLock()
-    self._metrics = Metrics()
 
-  @Metrics.counter_hit_miss("get", "hit", "miss")
   def get(self, state_key, cache_token):
     # type: (bytes, Optional[bytes]) -> Any
     assert cache_token and self.is_cache_enabled()
+    key = (state_key, cache_token)
     with self._lock:
-      return self._cache.get((state_key, cache_token))
+      value = self._cache.get(key, None)
+      if value is None:
+        self._miss_count += 1
+        return None
+      self._cache.move_to_end(key)
+      self._hit_count += 1
+      return value.value()
 
-  @Metrics.counter("put")
   def put(self, state_key, cache_token, value):
     # type: (bytes, Optional[bytes], Any) -> None
     assert cache_token and self.is_cache_enabled()
+    if not isinstance(value, WeightedValue):
+      weight = objsize.get_deep_size(
+          value, get_referents_func=get_referents_for_cache)
+      if weight <= 0:
+        _LOGGER.warning(

Review Comment:
   Incorrect usage of WeigtedValue is easy to fix and is a programming bug while users may not even know they are using get_deep_size which and fixing a possibly deep and complex object type issue seems like we should log and continue instead of causing the pipeline to get stuck.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22924:
URL: https://github.com/apache/beam/pull/22924#discussion_r962023481


##########
sdks/python/apache_beam/runners/worker/statecache_test.py:
##########
@@ -21,209 +21,157 @@
 import logging
 import unittest
 
-from apache_beam.metrics import monitoring_infos
+from apache_beam.runners.worker.statecache import CacheAware
 from apache_beam.runners.worker.statecache import StateCache
+from apache_beam.runners.worker.statecache import WeightedValue
 
 
 class StateCacheTest(unittest.TestCase):
   def test_empty_cache_get(self):
-    cache = self.get_cache(5)
+    cache = StateCache(5 << 20)
     self.assertEqual(cache.get("key", 'cache_token'), None)
     with self.assertRaises(Exception):
       # Invalid cache token provided
       self.assertEqual(cache.get("key", None), None)
-    self.verify_metrics(
-        cache,
-        {
-            'get': 1,
-            'put': 0,
-            'miss': 1,
-            'hit': 0,
-            'clear': 0,
-            'evict': 0,
-            'size': 0,
-            'capacity': 5
-        })
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 0/5 MB, hit 0.00%, lookups 1, evictions 0')
 
   def test_put_get(self):
-    cache = self.get_cache(5)
-    cache.put("key", "cache_token", "value")
+    cache = StateCache(5 << 20)
+    cache.put("key", "cache_token", WeightedValue("value", 1 << 20))
     self.assertEqual(cache.size(), 1)
     self.assertEqual(cache.get("key", "cache_token"), "value")
     self.assertEqual(cache.get("key", "cache_token2"), None)
     with self.assertRaises(Exception):
       self.assertEqual(cache.get("key", None), None)
-    self.verify_metrics(
-        cache,
-        {
-            'get': 2,
-            'put': 1,
-            'miss': 1,
-            'hit': 1,
-            'clear': 0,
-            'evict': 0,
-            'size': 1,
-            'capacity': 5
-        })
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 1/5 MB, hit 50.00%, lookups 2, evictions 0')
 
   def test_clear(self):
-    cache = self.get_cache(5)
+    cache = StateCache(5 << 20)
     cache.clear("new-key", "cache_token")
-    cache.put("key", "cache_token", ["value"])
+    cache.put("key", "cache_token", WeightedValue(["value"], 1 << 20))
     self.assertEqual(cache.size(), 2)
     self.assertEqual(cache.get("new-key", "new_token"), None)
     self.assertEqual(cache.get("key", "cache_token"), ['value'])
     # test clear without existing key/token
     cache.clear("non-existing", "token")
     self.assertEqual(cache.size(), 3)
     self.assertEqual(cache.get("non-existing", "token"), [])
-    self.verify_metrics(
-        cache,
-        {
-            'get': 3,
-            'put': 1,
-            'miss': 1,
-            'hit': 2,
-            'clear': 2,
-            'evict': 0,
-            'size': 3,
-            'capacity': 5
-        })
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 1/5 MB, hit 66.67%, lookups 3, evictions 0')
+
+  def test_default_sized_put(self):
+    cache = StateCache(5 << 20)
+    cache.put("key", "cache_token", bytearray(1 << 20))
+    cache.put("key2", "cache_token", bytearray(1 << 20))
+    cache.put("key3", "cache_token", bytearray(1 << 20))
+    self.assertEqual(cache.get("key3", "cache_token"), bytearray(1 << 20))
+    cache.put("key4", "cache_token", bytearray(1 << 20))
+    cache.put("key5", "cache_token", bytearray(1 << 20))
+    # note that each byte array instance takes slightly over 1 MB which is why
+    # these 5 byte arrays can't all be stored in the cache causing a single
+    # eviction
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 4/5 MB, hit 100.00%, lookups 1, evictions 1')
 
   def test_max_size(self):
-    cache = self.get_cache(2)
-    cache.put("key", "cache_token", "value")
-    cache.put("key2", "cache_token", "value")
-    self.assertEqual(cache.size(), 2)
-    cache.put("key2", "cache_token", "value")
+    cache = StateCache(2 << 20)
+    cache.put("key", "cache_token", WeightedValue("value", 1 << 20))
+    cache.put("key2", "cache_token", WeightedValue("value2", 1 << 20))
     self.assertEqual(cache.size(), 2)
-    cache.put("key", "cache_token", "value")
+    cache.put("key3", "cache_token", WeightedValue("value3", 1 << 20))
     self.assertEqual(cache.size(), 2)
-    self.verify_metrics(
-        cache,
-        {
-            'get': 0,
-            'put': 4,
-            'miss': 0,
-            'hit': 0,
-            'clear': 0,
-            'evict': 0,
-            'size': 2,
-            'capacity': 2
-        })
-
-  def test_evict_all(self):
-    cache = self.get_cache(5)
-    cache.put("key", "cache_token", "value")
-    cache.put("key2", "cache_token", "value2")
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 2/2 MB, hit 100.00%, lookups 0, evictions 1')
+
+  def test_invalidate_all(self):
+    cache = StateCache(5 << 20)
+    cache.put("key", "cache_token", WeightedValue("value", 1 << 20))
+    cache.put("key2", "cache_token", WeightedValue("value2", 1 << 20))
     self.assertEqual(cache.size(), 2)
-    cache.evict_all()
+    cache.invalidate_all()
     self.assertEqual(cache.size(), 0)
     self.assertEqual(cache.get("key", "cache_token"), None)
     self.assertEqual(cache.get("key2", "cache_token"), None)
-    self.verify_metrics(
-        cache,
-        {
-            'get': 2,
-            'put': 2,
-            'miss': 2,
-            'hit': 0,
-            'clear': 0,
-            'evict': 0,
-            'size': 0,
-            'capacity': 5
-        })
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 0/5 MB, hit 0.00%, lookups 2, evictions 0')
 
   def test_lru(self):
-    cache = self.get_cache(5)
-    cache.put("key", "cache_token", "value")
-    cache.put("key2", "cache_token2", "value2")
-    cache.put("key3", "cache_token", "value0")
-    cache.put("key3", "cache_token", "value3")
-    cache.put("key4", "cache_token4", "value4")
-    cache.put("key5", "cache_token", "value0")
-    cache.put("key5", "cache_token", ["value5"])
+    cache = StateCache(5 << 20)
+    cache.put("key", "cache_token", WeightedValue("value", 1 << 20))
+    cache.put("key2", "cache_token2", WeightedValue("value2", 1 << 20))
+    cache.put("key3", "cache_token", WeightedValue("value0", 1 << 20))
+    cache.put("key3", "cache_token", WeightedValue("value3", 1 << 20))
+    cache.put("key4", "cache_token4", WeightedValue("value4", 1 << 20))
+    cache.put("key5", "cache_token", WeightedValue("value0", 1 << 20))
+    cache.put("key5", "cache_token", WeightedValue(["value5"], 1 << 20))
     self.assertEqual(cache.size(), 5)
     self.assertEqual(cache.get("key", "cache_token"), "value")
     self.assertEqual(cache.get("key2", "cache_token2"), "value2")
     self.assertEqual(cache.get("key3", "cache_token"), "value3")
     self.assertEqual(cache.get("key4", "cache_token4"), "value4")
     self.assertEqual(cache.get("key5", "cache_token"), ["value5"])
     # insert another key to trigger cache eviction
-    cache.put("key6", "cache_token2", "value7")
+    cache.put("key6", "cache_token2", WeightedValue("value6", 1 << 20))
     self.assertEqual(cache.size(), 5)
     # least recently used key should be gone ("key")
     self.assertEqual(cache.get("key", "cache_token"), None)
     # trigger a read on "key2"
     cache.get("key2", "cache_token2")
     # insert another key to trigger cache eviction
-    cache.put("key7", "cache_token", "value7")
+    cache.put("key7", "cache_token", WeightedValue("value7", 1 << 20))
     self.assertEqual(cache.size(), 5)
     # least recently used key should be gone ("key3")
     self.assertEqual(cache.get("key3", "cache_token"), None)
     # trigger a put on "key2"
-    cache.put("key2", "cache_token", "put")
+    cache.put("key2", "cache_token", WeightedValue("put", 1 << 20))
     self.assertEqual(cache.size(), 5)
     # insert another key to trigger cache eviction
-    cache.put("key8", "cache_token", "value8")
+    cache.put("key8", "cache_token", WeightedValue("value8", 1 << 20))
     self.assertEqual(cache.size(), 5)
     # least recently used key should be gone ("key4")
     self.assertEqual(cache.get("key4", "cache_token"), None)
     # make "key5" used by writing to it
-    cache.put("key5", "cache_token", "val")
+    cache.put("key5", "cache_token", WeightedValue("val", 1 << 20))
     # least recently used key should be gone ("key6")
     self.assertEqual(cache.get("key6", "cache_token"), None)
-    self.verify_metrics(
-        cache,
-        {
-            'get': 10,
-            'put': 12,
-            'miss': 4,
-            'hit': 6,
-            'clear': 0,
-            'evict': 0,
-            'size': 5,
-            'capacity': 5
-        })
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 5/5 MB, hit 60.00%, lookups 10, evictions 5')
 
   def test_is_cached_enabled(self):
-    cache = self.get_cache(1)
+    cache = StateCache(1 << 20)
     self.assertEqual(cache.is_cache_enabled(), True)
-    self.verify_metrics(cache, {})
-    cache = self.get_cache(0)
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 0/1 MB, hit 100.00%, lookups 0, evictions 0')
+    cache = StateCache(0)
     self.assertEqual(cache.is_cache_enabled(), False)
-    self.verify_metrics(cache, {})
-
-  def verify_metrics(self, cache, expected_metrics):
-    infos = cache.get_monitoring_infos()
-    # Reconstruct metrics dictionary from monitoring infos
-    metrics = {
-        info.urn.rsplit(':',
-                        1)[1]: monitoring_infos.extract_gauge_value(info)[1]
-        for info in infos if "_total" not in info.urn and
-        info.type == monitoring_infos.LATEST_INT64_TYPE
-    }
-    self.assertDictEqual(metrics, expected_metrics)
-    # Metrics and total metrics should be identical for a single bundle.
-    # The following two gauges are not part of the total metrics:
-    try:
-      del metrics['capacity']
-      del metrics['size']
-    except KeyError:
-      pass
-    total_metrics = {
-        info.urn.rsplit(':', 1)[1].rsplit("_total")[0]:
-        monitoring_infos.extract_counter_value(info)
-        for info in infos
-        if "_total" in info.urn and info.type == monitoring_infos.SUM_INT64_TYPE
-    }
-    self.assertDictEqual(metrics, total_metrics)
-
-  @staticmethod
-  def get_cache(size):
-    cache = StateCache(size)
-    cache.initialize_metrics()
-    return cache
+    self.assertEqual(
+        cache.describe_stats(),
+        'used/max 0/0 MB, hit 100.00%, lookups 0, evictions 0')
+
+  def test_get_referents_for_cache(self):
+    class GetReferentsForCache(CacheAware):
+      def __init__(self):
+        self.key = bytearray(1 << 20)
+        self.value = bytearray(2 << 20)
+
+      def get_referents_for_cache(self):
+        return [self.key]
+
+    cache = StateCache(5 << 20)
+    cache.put("key", "cache_token", GetReferentsForCache())

Review Comment:
   Your understanding is correct. I renamed `key/value` to `measure_me/ignore_me` which will help with test comprehension



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22924:
URL: https://github.com/apache/beam/pull/22924#issuecomment-1233413882

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22924:
URL: https://github.com/apache/beam/pull/22924#issuecomment-1230737711

   Run Python_PVR_Flink PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22924:
URL: https://github.com/apache/beam/pull/22924#discussion_r962018197


##########
sdks/python/apache_beam/runners/portability/flink_runner_test.py:
##########
@@ -296,95 +292,6 @@ def test_flattened_side_input(self):
   def test_metrics(self):
     super().test_metrics(check_gauge=False)
 
-  def test_flink_metrics(self):

Review Comment:
   This metrics test was only checking that the `...:statecache:...` metrics were there. Generating these metrics was removed with this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22924:
URL: https://github.com/apache/beam/pull/22924#issuecomment-1233456983

   The metrics appear in the status information for the SDK. This is where most of this low level information shows up today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22924:
URL: https://github.com/apache/beam/pull/22924#issuecomment-1233476017

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik merged pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik merged PR #22924:
URL: https://github.com/apache/beam/pull/22924


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22924: [#19857] Migrate to using a memory aware cache within the Python SDK harness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22924:
URL: https://github.com/apache/beam/pull/22924#discussion_r957678980


##########
sdks/python/apache_beam/io/mongodbio_it_test.py:
##########
@@ -21,13 +21,12 @@
 import logging
 import time
 
-from pymongo import MongoClient

Review Comment:
   Note that I had to fix the mongodb imports due to pylint failing for me even though it passes at head.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org