You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/04/17 23:22:13 UTC
[kafka] branch trunk updated: [HOT FIX] Check for null before
deserializing in MeteredSessionStore (#6575)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c783630 [HOT FIX] Check for null before deserializing in MeteredSessionStore (#6575)
c783630 is described below
commit c7836307c3588ed5d267f32eabcd7dfc0dbeec80
Author: A. Sophie Blee-Goldman <ab...@gmail.com>
AuthorDate: Wed Apr 17 16:21:59 2019 -0700
[HOT FIX] Check for null before deserializing in MeteredSessionStore (#6575)
The fetchSession() method of SessionStore searches for a (single) specific session and returns null if none are found. This is analogous to fetch(key, time) in WindowStore or get(key) in KeyValueStore. MeteredWindowStore and MeteredKeyValueStore both check for a null result before attempting to deserialize, however MeteredSessionStore just blindly deserializes and as a result NPE is thrown when we search for a record that does not exist.
Reviewers: Guozhang Wang <wa...@gmail.com>, Bill Bejeck <bb...@gmail.com>, Bruno Cadonna <br...@confluent.io>
---
.../state/internals/MeteredSessionStore.java | 23 ++++++++++++++++------
.../state/internals/MeteredKeyValueStoreTest.java | 9 +++++++++
.../state/internals/MeteredSessionStoreTest.java | 13 ++++++++++--
.../state/internals/MeteredWindowStoreTest.java | 2 +-
4 files changed, 38 insertions(+), 9 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 4631601..94b004e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -151,22 +151,28 @@ public class MeteredSessionStore<K, V>
@Override
public V fetchSession(final K key, final long startTime, final long endTime) {
Objects.requireNonNull(key, "key cannot be null");
- final V value;
final Bytes bytesKey = keyBytes(key);
final long startNs = time.nanoseconds();
try {
- value = serdes.valueFrom(wrapped().fetchSession(bytesKey, startTime, endTime));
+ final byte[] result = wrapped().fetchSession(bytesKey, startTime, endTime);
+ if (result == null) {
+ return null;
+ }
+ return serdes.valueFrom(result);
} finally {
metrics.recordLatency(flushTime, startNs, time.nanoseconds());
}
-
- return value;
}
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
Objects.requireNonNull(key, "key cannot be null");
- return findSessions(key, 0, Long.MAX_VALUE);
+ return new MeteredWindowedKeyValueIterator<>(
+ wrapped().fetch(keyBytes(key)),
+ fetchTime,
+ metrics,
+ serdes,
+ time);
}
@Override
@@ -174,7 +180,12 @@ public class MeteredSessionStore<K, V>
final K to) {
Objects.requireNonNull(from, "from cannot be null");
Objects.requireNonNull(to, "to cannot be null");
- return findSessions(from, to, 0, Long.MAX_VALUE);
+ return new MeteredWindowedKeyValueIterator<>(
+ wrapped().fetch(keyBytes(from), keyBytes(to)),
+ fetchTime,
+ metrics,
+ serdes,
+ time);
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index b8fc88e..5cbe95c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -55,6 +55,7 @@ import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@RunWith(EasyMockRunner.class)
@@ -244,6 +245,14 @@ public class MeteredKeyValueStoreTest {
}
@Test
+ public void shouldNotThrowNullPointerExceptionIfGetReturnsNull() {
+ expect(inner.get(Bytes.wrap("a".getBytes()))).andReturn(null);
+
+ init();
+ assertNull(metered.get("a"));
+ }
+
+ @Test
public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() {
assertFalse(metered.setFlushListener(null, false));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index b349f17..30c382b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -57,6 +57,7 @@ import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@RunWith(EasyMockRunner.class)
@@ -172,7 +173,7 @@ public class MeteredSessionStoreTest {
@Test
public void shouldFetchForKeyAndRecordFetchMetric() {
- expect(inner.findSessions(Bytes.wrap(keyBytes), 0, Long.MAX_VALUE))
+ expect(inner.fetch(Bytes.wrap(keyBytes)))
.andReturn(new KeyValueIteratorStub<>(
Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator()));
init();
@@ -189,7 +190,7 @@ public class MeteredSessionStoreTest {
@Test
public void shouldFetchRangeFromStoreAndRecordFetchMetric() {
- expect(inner.findSessions(Bytes.wrap(keyBytes), Bytes.wrap(keyBytes), 0, Long.MAX_VALUE))
+ expect(inner.fetch(Bytes.wrap(keyBytes), Bytes.wrap(keyBytes)))
.andReturn(new KeyValueIteratorStub<>(
Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator()));
init();
@@ -211,6 +212,14 @@ public class MeteredSessionStoreTest {
assertTrue((Double) metric.metricValue() > 0);
}
+ @Test
+ public void shouldNotThrowNullPointerExceptionIfFetchSessionReturnsNull() {
+ expect(inner.fetchSession(Bytes.wrap("a".getBytes()), 0, Long.MAX_VALUE)).andReturn(null);
+
+ init();
+ assertNull(metered.fetchSession("a", 0, Long.MAX_VALUE));
+ }
+
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnPutIfKeyIsNull() {
metered.put(null, "a");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 962888a..c0ed7f6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -178,7 +178,7 @@ public class MeteredWindowStoreTest {
}
@Test
- public void shouldNotExceptionIfFetchReturnsNull() {
+ public void shouldNotThrowNullPointerExceptionIfFetchReturnsNull() {
expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null);
replay(innerStoreMock);