You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jackson Newhouse (Jira)" <ji...@apache.org> on 2022/05/25 23:03:00 UTC

[jira] [Commented] (KAFKA-13939) Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer

    [ https://issues.apache.org/jira/browse/KAFKA-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17542269#comment-17542269 ] 

Jackson Newhouse commented on KAFKA-13939:
------------------------------------------

One way to patch this would be something like
{code:java}
jnewhouse - kafka % git diff
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 2909e2763f..b0ee755b3f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -403,7 +403,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
                 delegate.remove();
                 index.remove(next.getKey().key());
 
-                dirtyKeys.add(next.getKey().key());
+                if (loggingEnabled) {
+                    dirtyKeys.add(next.getKey().key());
+                }
 
                 memBufferSize -= computeRecordSize(next.getKey().key(), bufferValue);
 
@@ -478,7 +480,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
             serializedKey,
             new BufferValue(serializedPriorValue, serialChange.oldValue, serialChange.newValue, recordContext)
         );
-        dirtyKeys.add(serializedKey);
+        if (loggingEnabled) {
+            dirtyKeys.add(serializedKey);
+        }
         updateBufferMetrics();
     } {code}
Since `loggingEnabled` is final, we can just not track the dirty keys. The set is only read from if `loggingEnabled` is true.

> Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-13939
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13939
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jackson Newhouse
>            Priority: Major
>
> If `loggingEnabled` is false, the `dirtyKeys` Set is not cleared within `flush()`, see [https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java#L262.] However, dirtyKeys is still written to in the loop within `evictWhile`. This causes dirtyKeys to continuously grow for the life of the buffer. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)