You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/05 23:48:46 UTC

[GitHub] [kafka] ableegoldman commented on a diff in pull request #12903: KAFKA-14415: Faster ThreadCache

ableegoldman commented on code in PR #12903:
URL: https://github.com/apache/kafka/pull/12903#discussion_r1040224575


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java:
##########
@@ -81,9 +85,11 @@ public synchronized void resize(final long newCacheSizeBytes) {
                 return;
             }
             final CircularIterator<NamedCache> circularIterator = new CircularIterator<>(caches.values());
-            while (sizeBytes() > maxCacheSizeBytes) {
+            while (sizeInBytes.get() > maxCacheSizeBytes) {
                 final NamedCache cache = circularIterator.next();
+                final long oldSize = cache.sizeInBytes();

Review Comment:
   probably a relatively unimportant optimization, but we could save two `synchronized` calls by having `#evict` return the number of bytes we evicted from that NamedCache -- just a thought



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java:
##########
@@ -186,7 +206,13 @@ public LRUCacheEntry delete(final String namespace, final Bytes key) {
             return null;
         }
 
-        return cache.delete(key);
+        final LRUCacheEntry entry;
+        synchronized (cache) {
+            final long oldSize = cache.sizeInBytes();
+            entry = cache.delete(key);

Review Comment:
   in this case we can't the change in bytes, but because `delete` has no danger of side effects affecting the cache size (eg as in `put` which may trigger an eviction, so the delta isn't just the bytes passed in to `put`) we can just compute the change in size directly by subtracting the size of this, which should be known, right?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java:
##########
@@ -158,15 +169,24 @@ public void put(final String namespace, final Bytes key, final LRUCacheEntry val
         numPuts++;
 
         final NamedCache cache = getOrCreateCache(namespace);
-        cache.put(key, value);
-        maybeEvict(namespace);
+
+        synchronized (cache) {
+            final long oldSize = cache.sizeInBytes();
+            cache.put(key, value);

Review Comment:
   ditto here as well -- just return the change in bytes after the put? 
   
   same for all applicable cases, I'll stop commenting on every one but you get the gist



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java:
##########
@@ -133,7 +139,12 @@ public void flush(final String namespace) {
         if (cache == null) {
             return;
         }
-        cache.flush();
+
+        synchronized (cache) {
+            final long oldSize = cache.sizeInBytes();
+            cache.flush();

Review Comment:
   ditto here, `#flush` could return the number of bytes flushed/evicted -- although I guess in this case the overhead saved is less because we already hold the lock on the `cache` itself
   
   Still, it might encourage more sensible bookkeeping of the cache sizes to always return the number of bytes that changed for a given call that was return type `void` anyways



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java:
##########
@@ -241,35 +267,30 @@ private boolean isOverflowing(final long size) {
     }
 
     long sizeBytes() {
-        long sizeInBytes = 0;
-        for (final NamedCache namedCache : caches.values()) {
-            sizeInBytes += namedCache.sizeInBytes();
-            if (isOverflowing(sizeInBytes)) {
-                return Long.MAX_VALUE;
-            }
-        }
-        return sizeInBytes;
+        return sizeInBytes.get();
     }
 
     synchronized void close(final String namespace) {
         final NamedCache removed = caches.remove(namespace);
         if (removed != null) {
+            sizeInBytes.getAndAdd(-removed.sizeInBytes());
             removed.close();
         }
     }
 
-    private void maybeEvict(final String namespace) {
+    private void maybeEvict(final String namespace, final NamedCache cache) {

Review Comment:
   is `namespace` unused now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org