You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/06 18:48:46 UTC

[GitHub] [kafka] wcarlson5 opened a new pull request #9572: KAFKA-10500: Thread Cache Resizes

wcarlson5 opened a new pull request #9572:
URL: https://github.com/apache/kafka/pull/9572


   The thread cache can now be resized. This will go towards being able to scale the number of threads
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525430251



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -71,6 +72,26 @@ public long flushes() {
         return numFlushes;
     }
 
+    public void resize(final long maxCacheSizeBytes) {
+        final boolean shrink = maxCacheSizeBytes < this.maxCacheSizeBytes;
+        this.maxCacheSizeBytes = maxCacheSizeBytes;
+        if (shrink) {
+            final CircularIterator<NamedCache> circularIterator = new CircularIterator<>(caches.values());
+            while (sizeBytes() > maxCacheSizeBytes) {
+                if (!circularIterator.hasNext()) {
+                    log.error("Unable to remove any more entries as all caches are empty");

Review comment:
       If we add a check to make sure the number of threads is positive then probably not. Ill add that check then remove this one




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r523292295



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -589,6 +589,10 @@ private void subscribeConsumer() {
         }
     }
 
+    public void resizeCache(final long size) {
+        taskManager.resizeCache(size);

Review comment:
       Talked about off line. The named cache is already synchronized. Also made max cache size volatile 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525488664



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private void resizeThreadCache(final int numStreamThreads) {
+        final long cacheSizePreThread = totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));

Review comment:
       Moved to a new method. Glad we got that cleared up. LGTM?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525534167



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -71,6 +72,22 @@ public long flushes() {
         return numFlushes;
     }
 
+    public void resize(final long newCacheSizeBytes) {
+        final boolean shrink = newCacheSizeBytes < this.maxCacheSizeBytes;

Review comment:
       nit: we can remove `this.` now (same next line)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525562156



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private long getCacheSizePerThread(final int numStreamThreads) {
+        return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));
+    }
+
+    private void resizeThreadCache(final int numStreamThreads) {
+        if (numStreamThreads < 0) {

Review comment:
       Well, `getCacheSizePerThread` would eventually return zero (with growing number of threads), what means that every put() into the cache would result in an immediate eviction. So I don't think we need to do anything for this corner case).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r520699793



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -589,6 +589,10 @@ private void subscribeConsumer() {
         }
     }
 
+    public void resizeCache(final long size) {
+        taskManager.resizeCache(size);

Review comment:
       Are the thread caches not independent of each other? Also I was not planning on having the new thread resize the cache but the calling thread do so




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525426317



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private void resizeThreadCache(final int numStreamThreads) {
+        final long cacheSizePreThread = totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));

Review comment:
       I did have it in a separate method but helper but when removing the `totalCacheSize < 0 ` check @cadonna thought it would be more readable inline




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525533593



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private long getCacheSizePerThread(final int numStreamThreads) {
+        return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));
+    }
+
+    private void resizeThreadCache(final int numStreamThreads) {
+        if (numStreamThreads < 0) {

Review comment:
       Can it be smaller than `0` ? Should the test be `<= 0` or `< 1` instead?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525464495



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -71,6 +72,26 @@ public long flushes() {
         return numFlushes;
     }
 
+    public void resize(final long maxCacheSizeBytes) {
+        final boolean shrink = maxCacheSizeBytes < this.maxCacheSizeBytes;
+        this.maxCacheSizeBytes = maxCacheSizeBytes;
+        if (shrink) {
+            final CircularIterator<NamedCache> circularIterator = new CircularIterator<>(caches.values());
+            while (sizeBytes() > maxCacheSizeBytes) {
+                if (!circularIterator.hasNext()) {
+                    log.error("Unable to remove any more entries as all caches are empty");

Review comment:
       Yeah, in retrospect it was not very clear. Hopefully its better this way now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525540418



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private long getCacheSizePerThread(final int numStreamThreads) {
+        return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));
+    }
+
+    private void resizeThreadCache(final int numStreamThreads) {
+        if (numStreamThreads < 0) {

Review comment:
       Yes, it can be zero, but the check say `< 0`, so it would always evaluate to false?
   
   And if we have zero threads, we should not resize the cache as we might end up in an infinite loop? But we would only call this method if we "shrink", ie, if the thread count grows, but it can never grow from negative to zero, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r520686495



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -155,6 +155,7 @@
     private final StreamsMetricsImpl streamsMetrics;
     private final ProcessorTopology taskTopology;
     private final ProcessorTopology globalTaskTopology;
+    private Long totalCacheSize;

Review comment:
       yes it can




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r520693132



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -71,6 +71,16 @@ public long flushes() {
         return numFlushes;
     }
 
+    public void resize(final long maxCacheSizeBytes) {
+        final boolean shrink = maxCacheSizeBytes < this.maxCacheSizeBytes;
+        this.maxCacheSizeBytes = maxCacheSizeBytes;
+        if (shrink) {
+            for (final NamedCache cache : caches.values()) {
+                maybeEvict(cache.name());
+            }

Review comment:
       I would expect it to take a bit to repopulate the cache to be balanced but you are right it probably better to do so evenly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r519049317



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -589,6 +589,10 @@ private void subscribeConsumer() {
         }
     }
 
+    public void resizeCache(final long size) {
+        taskManager.resizeCache(size);

Review comment:
       I am not sure if I should just expose cache or pass it along. It is about 4 levels deep




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r521449419



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -589,6 +589,10 @@ private void subscribeConsumer() {
         }
     }
 
+    public void resizeCache(final long size) {
+        taskManager.resizeCache(size);

Review comment:
       I'm not sure I follow. If we make it so that each thread is responsible for resizing their own independent cache then why would the method need to synchronized as each call should not affect the others. And if one thread does all the work there should only be one call. Unless you are thinking about multiple threads adding threads at the same time?
   
   Maybe I don't understand how the cache is set up well enough? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -155,6 +155,7 @@
     private final StreamsMetricsImpl streamsMetrics;
     private final ProcessorTopology taskTopology;
     private final ProcessorTopology globalTaskTopology;
+    private final Long totalCacheSize;

Review comment:
       I think that its just what the ide defaulted to. Ill change it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,17 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private long getCacheSizePerThread(final int numStreamThreads) {
+        return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));
+    }

Review comment:
       I think I agree with you. fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525428102



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -71,6 +72,26 @@ public long flushes() {
         return numFlushes;
     }
 
+    public void resize(final long maxCacheSizeBytes) {

Review comment:
       sure, that works




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r520686307



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,21 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private long getCacheSizePerThread(final int numStreamThreads) {
+        if (totalCacheSize < 0) {
+            totalCacheSize = 0L;
+            log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
+        }

Review comment:
       good to know




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525425650



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -728,12 +729,8 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 "must subscribe to at least one source topic or global table.");
         }
 
-        long totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
-        if (totalCacheSize < 0) {
-            totalCacheSize = 0;

Review comment:
       https://github.com/apache/kafka/pull/9572#discussion_r520444420
   
   It seems it was not necessary




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#issuecomment-723242241


   @cadonna Part 2


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525463645



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private void resizeThreadCache(final int numStreamThreads) {
+        final long cacheSizePreThread = totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));

Review comment:
       If this line is duplicated, it should go in a method. When I proposed to move it inline, I was apparently not aware that the same line was used somewhere else.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525459352



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private void resizeThreadCache(final int numStreamThreads) {
+        final long cacheSizePreThread = totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));

Review comment:
       I think it was about readability. I might be misremembering though, as it was a conversation we had last week




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525439819



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -71,6 +72,26 @@ public long flushes() {
         return numFlushes;
     }
 
+    public void resize(final long maxCacheSizeBytes) {
+        final boolean shrink = maxCacheSizeBytes < this.maxCacheSizeBytes;
+        this.maxCacheSizeBytes = maxCacheSizeBytes;
+        if (shrink) {
+            final CircularIterator<NamedCache> circularIterator = new CircularIterator<>(caches.values());
+            while (sizeBytes() > maxCacheSizeBytes) {
+                if (!circularIterator.hasNext()) {
+                    log.error("Unable to remove any more entries as all caches are empty");

Review comment:
       I see. -- I guess the miss-leading fact was, that this check was done inside the while-loop.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525463165



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -589,6 +593,10 @@ private void subscribeConsumer() {
         }
     }
 
+    public void resizeCache(final long size) {
+        cacheResizer.accept(size);

Review comment:
       I think we can, thats probably a good idea.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#issuecomment-730028264


   Thanks for the PR @wcarlson5. Merged to `trunk`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525437647



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -589,6 +593,10 @@ private void subscribeConsumer() {
         }
     }
 
+    public void resizeCache(final long size) {
+        cacheResizer.accept(size);

Review comment:
       Ah. I see. -- Should we pass `java.util.function.Consumer<Long> cacheResizer` into `StreamThread` constructor for this case instead?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525540418



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private long getCacheSizePerThread(final int numStreamThreads) {
+        return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));
+    }
+
+    private void resizeThreadCache(final int numStreamThreads) {
+        if (numStreamThreads < 0) {

Review comment:
       Yes, it can be zero, but the check says `< 0`, so it would always evaluate to false?
   
   And if we have zero threads, we should not resize the cache as we might end up in an infinite loop? But we would only call this method if we "shrink", ie, if the thread count grows, but it can never grow from negative to zero, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525541523



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -71,6 +72,22 @@ public long flushes() {
         return numFlushes;
     }
 
+    public void resize(final long newCacheSizeBytes) {
+        final boolean shrink = newCacheSizeBytes < this.maxCacheSizeBytes;

Review comment:
       yep




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r520445787



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -155,6 +155,7 @@
     private final StreamsMetricsImpl streamsMetrics;
     private final ProcessorTopology taskTopology;
     private final ProcessorTopology globalTaskTopology;
+    private Long totalCacheSize;

Review comment:
       I think this can be a `final long` if we remove the check as I proposed below.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,21 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private long getCacheSizePerThread(final int numStreamThreads) {
+        if (totalCacheSize < 0) {
+            totalCacheSize = 0L;
+            log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
+        }

Review comment:
       I see that this check was there before, but I actually think it is not needed because the configs are validated and there `CACHE_MAX_BYTES_BUFFERING_CONFIG` is specified as at least 0.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -71,6 +71,16 @@ public long flushes() {
         return numFlushes;
     }
 
+    public void resize(final long maxCacheSizeBytes) {
+        final boolean shrink = maxCacheSizeBytes < this.maxCacheSizeBytes;
+        this.maxCacheSizeBytes = maxCacheSizeBytes;
+        if (shrink) {
+            for (final NamedCache cache : caches.values()) {
+                maybeEvict(cache.name());
+            }

Review comment:
       This loop has the disadvantage that it first evict entries of one named cache, if all entries are evicted and we still need to free space, it starts to evict entries of the next named cache etc. I guess it would be better to avoid such a skewed emission of records to downstream by continuously iterating over the named caches and evict one entry at a time from each named cache until enough space is freed.  

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -589,6 +589,10 @@ private void subscribeConsumer() {
         }
     }
 
+    public void resizeCache(final long size) {
+        taskManager.resizeCache(size);

Review comment:
       I am in favour of keeping a reference to the thread cache in the `StreamThread` and do the re-sizing here. I think it makes the code a bit easier to follow. 
   You will need synchronization, because the thread that will add the new stream thread will also resize the thread caches.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525562156



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private long getCacheSizePerThread(final int numStreamThreads) {
+        return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));
+    }
+
+    private void resizeThreadCache(final int numStreamThreads) {
+        if (numStreamThreads < 0) {

Review comment:
       Well, `getCacheSizePerThread` would eventually return zero, what means that every put() into the cache would result in an immediate eviction. So I don't think we need to do anything for this corner case).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525435452



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private void resizeThreadCache(final int numStreamThreads) {
+        final long cacheSizePreThread = totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));

Review comment:
       Not sure why the `totalCacheSize` check is relevant for avoiding code duplication?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525561178



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private long getCacheSizePerThread(final int numStreamThreads) {
+        return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));
+    }
+
+    private void resizeThreadCache(final int numStreamThreads) {
+        if (numStreamThreads < 0) {

Review comment:
       That is a good point. Maybe what we need to do it put a minimum size of cache to limit how many stream threads an instance can have?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525427943



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -589,6 +593,10 @@ private void subscribeConsumer() {
         }
     }
 
+    public void resizeCache(final long size) {
+        cacheResizer.accept(size);

Review comment:
       The cache is not exposed in stream thread. Since I was only using one method I thought it best to only expose that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525399396



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -728,12 +729,8 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 "must subscribe to at least one source topic or global table.");
         }
 
-        long totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
-        if (totalCacheSize < 0) {
-            totalCacheSize = 0;
-            log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
-        }
-        final long cacheSizePerThread = totalCacheSize / (numStreamThreads + (hasGlobalTopology ? 1 : 0));
+        totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
+        final long cacheSizePerThread = totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));

Review comment:
       Why move off using `hasGlobalTopology`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -728,12 +729,8 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 "must subscribe to at least one source topic or global table.");
         }
 
-        long totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
-        if (totalCacheSize < 0) {
-            totalCacheSize = 0;

Review comment:
       Why do we remove this guard?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -71,6 +72,26 @@ public long flushes() {
         return numFlushes;
     }
 
+    public void resize(final long maxCacheSizeBytes) {
+        final boolean shrink = maxCacheSizeBytes < this.maxCacheSizeBytes;
+        this.maxCacheSizeBytes = maxCacheSizeBytes;
+        if (shrink) {
+            final CircularIterator<NamedCache> circularIterator = new CircularIterator<>(caches.values());
+            while (sizeBytes() > maxCacheSizeBytes) {
+                if (!circularIterator.hasNext()) {
+                    log.error("Unable to remove any more entries as all caches are empty");

Review comment:
       Could this ever happen? If we the max cache size is smaller than a single entry, would we not evict the entry and the used cache size would always shrink to zero?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -589,6 +593,10 @@ private void subscribeConsumer() {
         }
     }
 
+    public void resizeCache(final long size) {
+        cacheResizer.accept(size);

Review comment:
       Why do we need the `cacheResizer`? Can't we just call `cache.resize(size)` here?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -71,6 +72,26 @@ public long flushes() {
         return numFlushes;
     }
 
+    public void resize(final long maxCacheSizeBytes) {

Review comment:
       nit: `newCachSizeBytes` ? (To avoid the "clash" with `this.maxCachSizeBytes`.)

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private void resizeThreadCache(final int numStreamThreads) {
+        final long cacheSizePreThread = totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));

Review comment:
       Seems this duplicates `L733`. Might be good to extract into a small helper method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525582298



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private long getCacheSizePerThread(final int numStreamThreads) {
+        return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));
+    }
+
+    private void resizeThreadCache(final int numStreamThreads) {
+        if (numStreamThreads < 0) {

Review comment:
       that is a good point




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525536944



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private long getCacheSizePerThread(final int numStreamThreads) {
+        return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));
+    }
+
+    private void resizeThreadCache(final int numStreamThreads) {
+        if (numStreamThreads < 0) {

Review comment:
       It can be zero if you have a global thread, but since this is internal the check might not be entirely necessary




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525427249



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -728,12 +729,8 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 "must subscribe to at least one source topic or global table.");
         }
 
-        long totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
-        if (totalCacheSize < 0) {
-            totalCacheSize = 0;
-            log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
-        }
-        final long cacheSizePerThread = totalCacheSize / (numStreamThreads + (hasGlobalTopology ? 1 : 0));
+        totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
+        final long cacheSizePerThread = totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));

Review comment:
       It was in a separate method without access to `hasGlobalTopology`. I supposes if it stays we can move it back 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #9572:
URL: https://github.com/apache/kafka/pull/9572


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525532300



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private void resizeThreadCache(final int numStreamThreads) {
+        final long cacheSizePreThread = totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));

Review comment:
       > LGTM?
   
   If this is a question, should it be LGTY? 😂 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r525562156



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,20 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private long getCacheSizePerThread(final int numStreamThreads) {
+        return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));
+    }
+
+    private void resizeThreadCache(final int numStreamThreads) {
+        if (numStreamThreads < 0) {

Review comment:
       Well, `getCacheSizePerThread` would eventually return zero (with growing number of threads), what means that every put() into the cache would result in an immediate eviction. So I don't think we need to do anything for this corner case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9572: KAFKA-10500: Thread Cache Resizes

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9572:
URL: https://github.com/apache/kafka/pull/9572#discussion_r521249074



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -806,6 +803,17 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private long getCacheSizePerThread(final int numStreamThreads) {
+        return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));
+    }

Review comment:
       IMO, the code would be easier navigable if you inline this method. Without the removed check, there is not really a reason to have a method here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -589,6 +589,10 @@ private void subscribeConsumer() {
         }
     }
 
+    public void resizeCache(final long size) {
+        taskManager.resizeCache(size);

Review comment:
       > I was not planning on having the new thread resize the cache but the calling thread do so
   
   That is what I am saying "the thread that will add the new stream thread" is the calling thread. The new stream thread cannot resize the caches of the other stream threads because it is not aware of the other stream threads. Still we need synchronization because the calling thread will access and modify the thread caches of all stream threads and all stream threads will access and modify their own thread cache during normal processing.  

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -155,6 +155,7 @@
     private final StreamsMetricsImpl streamsMetrics;
     private final ProcessorTopology taskTopology;
     private final ProcessorTopology globalTaskTopology;
+    private final Long totalCacheSize;

Review comment:
       Why does this need to be a `Long` instead of a `long`? The numerical value of the variable is only immutable if we use a `long` here.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org