You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/10/18 08:47:37 UTC

kafka git commit: KAFKA-6023 ThreadCache#sizeBytes() should check overflow

Repository: kafka
Updated Branches:
  refs/heads/trunk 5cc162f30 -> 68f324f4b


KAFKA-6023 ThreadCache#sizeBytes() should check overflow

    long sizeBytes() {
        long sizeInBytes = 0;
        for (final NamedCache namedCache : caches.values()) {
            sizeInBytes += namedCache.sizeInBytes();
        }
        return sizeInBytes;
    }
The summation w.r.t. sizeInBytes may overflow.
Check similar to what is done in size() should be performed.

Author: siva santhalingam <si...@gmail.com>

Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Damian Guy <da...@gmail.com>

Closes #4041 from shivsantham/kafka-6023


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68f324f4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68f324f4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68f324f4

Branch: refs/heads/trunk
Commit: 68f324f4bf0003d5dcfd79c5ab7f9c53bd0c1522
Parents: 5cc162f
Author: siva santhalingam <si...@gmail.com>
Authored: Wed Oct 18 09:44:39 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Wed Oct 18 09:44:39 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/streams/state/internals/ThreadCache.java    | 7 +++----
 .../kafka/streams/state/internals/ThreadCacheTest.java       | 8 ++++++++
 2 files changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/68f324f4/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index aab9671..01a4bef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -205,10 +205,6 @@ public class ThreadCache {
                 return Long.MAX_VALUE;
             }
         }
-
-        if (isOverflowing(size)) {
-            return Long.MAX_VALUE;
-        }
         return size;
     }
 
@@ -220,6 +216,9 @@ public class ThreadCache {
         long sizeInBytes = 0;
         for (final NamedCache namedCache : caches.values()) {
             sizeInBytes += namedCache.sizeInBytes();
+            if (isOverflowing(sizeInBytes)) {
+                return Long.MAX_VALUE;
+            }
         }
         return sizeInBytes;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f324f4/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 16fd34b..164e71e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -511,6 +511,14 @@ public class ThreadCacheTest {
         assertNull(threadCache.get(namespace, null));
     }
 
+    @Test
+    public void shouldCalculateSizeInBytes() {
+        final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics()));
+        NamedCache.LRUNode node = new NamedCache.LRUNode(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{0}));
+        cache.put(namespace1, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[]{0}));
+        assertEquals(cache.sizeBytes(), node.size());
+    }
+
     private LRUCacheEntry dirtyEntry(final byte[] key) {
         return new LRUCacheEntry(key, true, -1, -1, -1, "");
     }