You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/08/27 18:59:01 UTC

[GitHub] [kafka] wcarlson5 opened a new pull request #11278: KAFKA-12648: Enforce size limits for each task's cache

wcarlson5 opened a new pull request #11278:
URL: https://github.com/apache/kafka/pull/11278


   make max buffer cache settable for a name topology
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11278: KAFKA-12648: Enforce size limits for each task's cache

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11278:
URL: https://github.com/apache/kafka/pull/11278#discussion_r703973474



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -84,13 +85,34 @@ public synchronized void resize(final long newCacheSizeBytes) {
             while (sizeBytes() > maxCacheSizeBytes) {
                 final NamedCache cache = circularIterator.next();
                 cache.evict();
-                numEvicts++;
+                numEvicts.incrementAndGet();
             }
         } else {
             log.debug("Cache size was expanded to {}", newCacheSizeBytes);
         }
     }
 
+    public synchronized void resize(final Map<String, Long> newCacheSizes) {
+        maxCacheSizeBytes = newCacheSizes.values().stream().reduce(0L, Long::sum);
+        log.debug("Cache size was changed to {}", newCacheSizes);
+        for (final Map.Entry<String, Long> taskMaxSize: newCacheSizes.entrySet()) {
+            for (final Map.Entry<String, NamedCache> cache: caches.entrySet()) {
+                if (cache.getKey().contains(taskMaxSize.getKey())) {
+                    cache.getValue().setMaxBytes(taskMaxSize.getValue());
+                }
+            }
+        }
+        if (caches.values().isEmpty()) {
+            return;
+        }
+        final CircularIterator<NamedCache> circularIterator = new CircularIterator<>(caches.values());
+        while (sizeBytes() > maxCacheSizeBytes) {
+            final NamedCache cache = circularIterator.next();
+            cache.evict();
+            numEvicts.incrementAndGet();
+        }

Review comment:
       nit: we do this same thing in the other `#resize` for thread count changes, can you factor it out into a helper method? Then I think we can narrow the scope and make only that helper synchronized (should double check that though)

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -84,13 +85,34 @@ public synchronized void resize(final long newCacheSizeBytes) {
             while (sizeBytes() > maxCacheSizeBytes) {
                 final NamedCache cache = circularIterator.next();
                 cache.evict();
-                numEvicts++;
+                numEvicts.incrementAndGet();
             }
         } else {
             log.debug("Cache size was expanded to {}", newCacheSizeBytes);
         }
     }
 
+    public synchronized void resize(final Map<String, Long> newCacheSizes) {
+        maxCacheSizeBytes = newCacheSizes.values().stream().reduce(0L, Long::sum);
+        log.debug("Cache size was changed to {}", newCacheSizes);
+        for (final Map.Entry<String, Long> taskMaxSize: newCacheSizes.entrySet()) {
+            for (final Map.Entry<String, NamedCache> cache: caches.entrySet()) {
+                if (cache.getKey().contains(taskMaxSize.getKey())) {
+                    cache.getValue().setMaxBytes(taskMaxSize.getValue());
+                }
+            }
+        }
+        if (caches.values().isEmpty()) {

Review comment:
       Any reason this checks emptiness of `caches.values()` instead of `caches.keys()`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -43,7 +44,7 @@
     // internal stats
     private long numPuts = 0;
     private long numGets = 0;
-    private long numEvicts = 0;
+    private AtomicLong numEvicts = new AtomicLong(0);

Review comment:
       why make this atomic, we're still only ever evicting/accessing this from the actual StreamThread right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -502,7 +504,8 @@ public StreamThread(final Time time,
         this.assignmentErrorCode = assignmentErrorCode;
         this.shutdownErrorHook = shutdownErrorHook;
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
-        this.cacheResizer = cacheResizer;
+        this.threadCache = threadCache;
+        cacheSizes = new ConcurrentHashMap<>();

Review comment:
       Does this need to be a concurrent map? Seems to only be accessed by the StreamThread itself

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -247,4 +247,6 @@ default boolean commitRequested() {
      * @return This returns the time the task started idling. If it is not idling it returns empty.
      */
     Optional<Long> timeCurrentIdlingStarted();
+
+    long maxBuffer();

Review comment:
       Should probably specify what kind of buffer in the name (esp. with KIP-770 adding another relevant buffer type)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11278: KAFKA-12648: Enforce size limits for each task's cache

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11278:
URL: https://github.com/apache/kafka/pull/11278#discussion_r704796291



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -233,6 +234,14 @@ public boolean hasNamedTopologies() {
         return !builders.containsKey(UNNAMED_TOPOLOGY);
     }
 
+    /**
+     * @return true iff the app is using named topologies, or was started up with no topology at all
+     * and the max buffer was set for the named topologies

Review comment:
       This may be related to @ableegoldman 's meta question: do we set `maxBufferSize = true` in the future if one of the named topology has it overridden, or only when all topologies inside the `TopologyMetadata` has this config overridden?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -151,9 +152,26 @@ void handleRebalanceComplete() {
 
         releaseLockedUnassignedTaskDirectories();
 
+        if (topologyMetadata.hasNamedTopologies()) {
+            for (final Task task : tasks.allTasks()) {
+                tasksTotalMaxBuffer.put(
+                    task.id().topologyName(),
+                    task.maxBuffer() + tasksTotalMaxBuffer.getOrDefault(task.id().topologyName(), 0L)

Review comment:
       Not sure I understand this logic: why we add these two values to update the `tasksTotalMaxBuffer`? How would `task.maxBuffer()` be inferred in the future? Since now they are only 0 I cannot tell how would this impact the update logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #11278: KAFKA-12648: Enforce size limits for each task's cache

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #11278:
URL: https://github.com/apache/kafka/pull/11278#issuecomment-920174685


   @guozhangwang That is a really good point. Maybe we should more be specific.  What if each topology could request a percentage of the total cache? If a request made the total exceed 99% the request would be rejected. Any unclaimed cache would be split among the topologies that did not claim any. A topology could lower their cache size if they want to make space for a new topology.
   
   1. If A requests 50% it gets it an the rest is unused
   2. B joins but does not request so it gets the other 50
   3. C joins but request 75% so it fails. C then requests 25% so now A has 50%, B 25% and C 25%
   4. D joins without a request so now  A has 50%, B 12%, C 25% and D 13%
   5. A reduces its request to 25% now all have 25%
   6. E joins and requests 0%, not using any cache and all other topologies are unchanged
   
   I think that should work. now we have both an upper bound on total memory and a minimum guarantee  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11278: KAFKA-12648: Enforce size limits for each task's cache

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11278:
URL: https://github.com/apache/kafka/pull/11278#issuecomment-920445983


   > I think that should work. now we have both an upper bound on total memory and a minimum guarantee
   
   Yeah, I feel more comfortable for this proposal :) Basically I think instead of defining the config as an "override" on the per-application config, we should just consider having a separate config on the per-topology level (e.g. your proposed one based on percentage of the total per-application). In that way user's can specify clearly what they want, and get exactly what they specified.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11278: KAFKA-12648: Enforce size limits for each task's cache

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11278:
URL: https://github.com/apache/kafka/pull/11278#issuecomment-919601918


   > @guozhangwang I think there are defiantly questions to be answered about this and really what we want to the user to achieve. I think the maxbufferedbytes is really being used for two purposes and perhaps we can split it out into 2 different things that will give a better result. I think it bounds the heap space and it reserves an amount of space for each thread.
   > It might make sense to have the users set a bound of the heap on the cluster. Then for each topology reserve some fraction of that space for that topology (either in percentages or num of bytes), then any unclaimed space can be split among the tasks (or just the tasks who have not reserved space). I think this would clear up some confusion about what this config is for. WDYT? cc/ @ableegoldman
   
   Hey @wcarlson5 sorry I'm late replying here. I think on the high level what you proposed as two purposes make sense to me, it's just that in practice it may be hard to implement it in a simple and elegant way. Just throw some scenarios to deep dive here, say the instance's total cache size configured as 100, with no topology added yet.
   
   1. If a topology A is added with that config overridden as 150, should we just silently accepts it but only give it 100 or should we reject that topology?
   2. Assume we accept A as in 1) above, if a topology B without config overridden is added, how should we distribute the cache between the two topology? If we just consider B as 100 (the instance's config value), should we distribute the cache as 60:40 among A and B? And if another C with config overridden as 250 is added, should we redistribute the total cache size as 30:20:50 among A / B / C?
   3. Assume we reject A as in 1) above, and suppose now B without config overridden is added first, which would get all of 100, and then later D with config overridden as 40 is added, would we distribute 100 as 30:70 (i.e. D first get 40, and then B/D split the remaining 60)?
   4. Assume that we have multiple threads, do we dynamically change the cache size allocation to each thread upon rebalance based on which tasks of A / B /C each thread hosts?
   
   Thinking about all that, I'm a bit concerned that this config would be very hard for users to understand: it seems in either way "you do not get what you specified", and the overridden value would just be used as a relative ratio. Plus if we distribute the cache among threads according to the topologies specified distribution, that would get even more cumbersome to understand.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #11278: KAFKA-12648: Enforce size limits for each task's cache

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #11278:
URL: https://github.com/apache/kafka/pull/11278#issuecomment-916170147


   @guozhangwang I think there are defiantly questions to be answered about this and really what we want to the user to achieve.  I think the maxbufferedbytes is really being used for two purposes and perhaps we can split it out into 2 different things that will give a better result. I think it bounds the heap space and it reserves an amount of space for each thread.
   It might make sense to have the users set a bound of the heap on the cluster. Then for each topology reserve some fraction of that space for that topology (either in percentages or num of bytes), then any unclaimed space can be split among the tasks (or just the tasks who have not reserved space). I think this would clear up some confusion about what this config is for. WDYT? cc/ @ableegoldman 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #11278: KAFKA-12648: Enforce size limits for each task's cache

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #11278:
URL: https://github.com/apache/kafka/pull/11278#issuecomment-907410140


   @ableegoldman here is my idea for `cache.max.bytes.buffering` for named topologies. I will need to make a few changes once https://github.com/apache/kafka/pull/11272 gets in but the main idea is here. Can you give it a look?
   
   cc/ @guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org