You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/10/18 08:47:37 UTC
kafka git commit: KAFKA-6023 ThreadCache#sizeBytes() should check
overflow
Repository: kafka
Updated Branches:
refs/heads/trunk 5cc162f30 -> 68f324f4b
KAFKA-6023 ThreadCache#sizeBytes() should check overflow
long sizeBytes() {
long sizeInBytes = 0;
for (final NamedCache namedCache : caches.values()) {
sizeInBytes += namedCache.sizeInBytes();
}
return sizeInBytes;
}
The summation w.r.t. sizeInBytes may overflow.
Check similar to what is done in size() should be performed.
Author: siva santhalingam <si...@gmail.com>
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Damian Guy <da...@gmail.com>
Closes #4041 from shivsantham/kafka-6023
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68f324f4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68f324f4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68f324f4
Branch: refs/heads/trunk
Commit: 68f324f4bf0003d5dcfd79c5ab7f9c53bd0c1522
Parents: 5cc162f
Author: siva santhalingam <si...@gmail.com>
Authored: Wed Oct 18 09:44:39 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Wed Oct 18 09:44:39 2017 +0100
----------------------------------------------------------------------
.../apache/kafka/streams/state/internals/ThreadCache.java | 7 +++----
.../kafka/streams/state/internals/ThreadCacheTest.java | 8 ++++++++
2 files changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f324f4/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index aab9671..01a4bef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -205,10 +205,6 @@ public class ThreadCache {
return Long.MAX_VALUE;
}
}
-
- if (isOverflowing(size)) {
- return Long.MAX_VALUE;
- }
return size;
}
@@ -220,6 +216,9 @@ public class ThreadCache {
long sizeInBytes = 0;
for (final NamedCache namedCache : caches.values()) {
sizeInBytes += namedCache.sizeInBytes();
+ if (isOverflowing(sizeInBytes)) {
+ return Long.MAX_VALUE;
+ }
}
return sizeInBytes;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68f324f4/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 16fd34b..164e71e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -511,6 +511,14 @@ public class ThreadCacheTest {
assertNull(threadCache.get(namespace, null));
}
+ @Test
+ public void shouldCalculateSizeInBytes() {
+ final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics()));
+ NamedCache.LRUNode node = new NamedCache.LRUNode(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{0}));
+ cache.put(namespace1, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[]{0}));
+ assertEquals(cache.sizeBytes(), node.size());
+ }
+
private LRUCacheEntry dirtyEntry(final byte[] key) {
return new LRUCacheEntry(key, true, -1, -1, -1, "");
}