You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/10 12:00:03 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

cadonna commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r520445787



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -155,6 +155,7 @@
     private final StreamsMetricsImpl streamsMetrics;
     private final ProcessorTopology taskTopology;
     private final ProcessorTopology globalTaskTopology;
+    private Long totalCacheSize;

Review comment:
       I think this can be a `final long` if we remove the check as I proposed below.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,21 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private long getCacheSizePerThread(final int numStreamThreads) {
+        if (totalCacheSize < 0) {
+            totalCacheSize = 0L;
+            log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
+        }

Review comment:
       I see that this check was there before, but I actually think it is not needed because the configs are validated and there `CACHE_MAX_BYTES_BUFFERING_CONFIG` is specified as at least 0.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -71,6 +71,16 @@ public long flushes() {
         return numFlushes;
     }
 
+    public void resize(final long maxCacheSizeBytes) {
+        final boolean shrink = maxCacheSizeBytes < this.maxCacheSizeBytes;
+        this.maxCacheSizeBytes = maxCacheSizeBytes;
+        if (shrink) {
+            for (final NamedCache cache : caches.values()) {
+                maybeEvict(cache.name());
+            }

Review comment:
       This loop has the disadvantage that it first evict entries of one named cache, if all entries are evicted and we still need to free space, it starts to evict entries of the next named cache etc. I guess it would be better to avoid such a skewed emission of records to downstream by continuously iterating over the named caches and evict one entry at a time from each named cache until enough space is freed.  

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -589,6 +589,10 @@ private void subscribeConsumer() {
         }
     }
 
+    public void resizeCache(final long size) {
+        taskManager.resizeCache(size);

Review comment:
       I am in favour of keeping a reference to the thread cache in the `StreamThread` and do the re-sizing here. I think it makes the code a bit easier to follow. 
   You will need synchronization, because the thread that will add the new stream thread will also resize the thread caches.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org