You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/06/16 21:27:48 UTC

[kafka] branch trunk updated: KAFKA-13939: Only track dirty keys if logging is enabled. (#12263)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ee565f5f6b KAFKA-13939: Only track dirty keys if logging is enabled. (#12263)
ee565f5f6b is described below

commit ee565f5f6b97c84d4f7f895fcb79188822284414
Author: jnewhouse <jn...@quantcast.com>
AuthorDate: Thu Jun 16 14:27:38 2022 -0700

    KAFKA-13939: Only track dirty keys if logging is enabled. (#12263)
    
    InMemoryTimeOrderedKeyValueBuffer keeps a Set of keys that have been seen in order to log them for durability. This set is never used nor cleared if logging is not enabled. Having it be populated creates a memory leak. This change stops populating the set if logging is not enabled.
    
    Reviewers: Divij Vaidya <di...@amazon.com>, Kvicii <42...@users.noreply.github.com>, Guozhang Wang <wa...@gmail.com>
---
 .../state/internals/InMemoryTimeOrderedKeyValueBuffer.java        | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 5894023bbe..5403f9e703 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -423,7 +423,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
                 delegate.remove();
                 index.remove(next.getKey().key());
 
-                dirtyKeys.add(next.getKey().key());
+                if (loggingEnabled) {
+                    dirtyKeys.add(next.getKey().key());
+                }
 
                 memBufferSize -= computeRecordSize(next.getKey().key(), bufferValue);
 
@@ -497,7 +499,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
             serializedKey,
             new BufferValue(serializedPriorValue, serialChange.oldValue, serialChange.newValue, recordContext)
         );
-        dirtyKeys.add(serializedKey);
+        if (loggingEnabled) {
+            dirtyKeys.add(serializedKey);
+        }
         updateBufferMetrics();
     }