You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/08 02:30:52 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #11278: KAFKA-12648: Enforce size limits for each task's cache

ableegoldman commented on a change in pull request #11278:
URL: https://github.com/apache/kafka/pull/11278#discussion_r703973474



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -84,13 +85,34 @@ public synchronized void resize(final long newCacheSizeBytes) {
             while (sizeBytes() > maxCacheSizeBytes) {
                 final NamedCache cache = circularIterator.next();
                 cache.evict();
-                numEvicts++;
+                numEvicts.incrementAndGet();
             }
         } else {
             log.debug("Cache size was expanded to {}", newCacheSizeBytes);
         }
     }
 
+    public synchronized void resize(final Map<String, Long> newCacheSizes) {
+        maxCacheSizeBytes = newCacheSizes.values().stream().reduce(0L, Long::sum);
+        log.debug("Cache size was changed to {}", newCacheSizes);
+        for (final Map.Entry<String, Long> taskMaxSize: newCacheSizes.entrySet()) {
+            for (final Map.Entry<String, NamedCache> cache: caches.entrySet()) {
+                if (cache.getKey().contains(taskMaxSize.getKey())) {
+                    cache.getValue().setMaxBytes(taskMaxSize.getValue());
+                }
+            }
+        }
+        if (caches.values().isEmpty()) {
+            return;
+        }
+        final CircularIterator<NamedCache> circularIterator = new CircularIterator<>(caches.values());
+        while (sizeBytes() > maxCacheSizeBytes) {
+            final NamedCache cache = circularIterator.next();
+            cache.evict();
+            numEvicts.incrementAndGet();
+        }

Review comment:
       nit: we do this same thing in the other `#resize` for thread count changes, can you factor it out into a helper method? Then I think we can narrow the scope and make only that helper synchronized (should double check that though)

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -84,13 +85,34 @@ public synchronized void resize(final long newCacheSizeBytes) {
             while (sizeBytes() > maxCacheSizeBytes) {
                 final NamedCache cache = circularIterator.next();
                 cache.evict();
-                numEvicts++;
+                numEvicts.incrementAndGet();
             }
         } else {
             log.debug("Cache size was expanded to {}", newCacheSizeBytes);
         }
     }
 
+    public synchronized void resize(final Map<String, Long> newCacheSizes) {
+        maxCacheSizeBytes = newCacheSizes.values().stream().reduce(0L, Long::sum);
+        log.debug("Cache size was changed to {}", newCacheSizes);
+        for (final Map.Entry<String, Long> taskMaxSize: newCacheSizes.entrySet()) {
+            for (final Map.Entry<String, NamedCache> cache: caches.entrySet()) {
+                if (cache.getKey().contains(taskMaxSize.getKey())) {
+                    cache.getValue().setMaxBytes(taskMaxSize.getValue());
+                }
+            }
+        }
+        if (caches.values().isEmpty()) {

Review comment:
       Any reason this checks emptiness of `caches.values()` instead of `caches.keys()`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -43,7 +44,7 @@
     // internal stats
     private long numPuts = 0;
     private long numGets = 0;
-    private long numEvicts = 0;
+    private AtomicLong numEvicts = new AtomicLong(0);

Review comment:
       why make this atomic, we're still only ever evicting/accessing this from the actual StreamThread right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -502,7 +504,8 @@ public StreamThread(final Time time,
         this.assignmentErrorCode = assignmentErrorCode;
         this.shutdownErrorHook = shutdownErrorHook;
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
-        this.cacheResizer = cacheResizer;
+        this.threadCache = threadCache;
+        cacheSizes = new ConcurrentHashMap<>();

Review comment:
       Does this need to be a concurrent map? Seems to only be accessed by the StreamThread itself

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -247,4 +247,6 @@ default boolean commitRequested() {
      * @return This returns the time the task started idling. If it is not idling it returns empty.
      */
     Optional<Long> timeCurrentIdlingStarted();
+
+    long maxBuffer();

Review comment:
       Should probably specify what kind of buffer in the name (esp. with KIP-770 adding another relevant buffer type)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org