You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/04/17 23:22:13 UTC

[kafka] branch trunk updated: [HOT FIX] Check for null before deserializing in MeteredSessionStore (#6575)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c783630  [HOT FIX] Check for null before deserializing in MeteredSessionStore  (#6575)
c783630 is described below

commit c7836307c3588ed5d267f32eabcd7dfc0dbeec80
Author: A. Sophie Blee-Goldman <ab...@gmail.com>
AuthorDate: Wed Apr 17 16:21:59 2019 -0700

    [HOT FIX] Check for null before deserializing in MeteredSessionStore  (#6575)
    
    The fetchSession() method of SessionStore searches for a (single) specific session and returns null if none are found. This is analogous to fetch(key, time) in WindowStore or get(key) in KeyValueStore. MeteredWindowStore and MeteredKeyValueStore both check for a null result before attempting to deserialize, however MeteredSessionStore just blindly deserializes and as a result NPE is thrown when we search for a record that does not exist.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Bill Bejeck <bb...@gmail.com>, Bruno Cadonna <br...@confluent.io>
---
 .../state/internals/MeteredSessionStore.java       | 23 ++++++++++++++++------
 .../state/internals/MeteredKeyValueStoreTest.java  |  9 +++++++++
 .../state/internals/MeteredSessionStoreTest.java   | 13 ++++++++++--
 .../state/internals/MeteredWindowStoreTest.java    |  2 +-
 4 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 4631601..94b004e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -151,22 +151,28 @@ public class MeteredSessionStore<K, V>
     @Override
     public V fetchSession(final K key, final long startTime, final long endTime) {
         Objects.requireNonNull(key, "key cannot be null");
-        final V value;
         final Bytes bytesKey = keyBytes(key);
         final long startNs = time.nanoseconds();
         try {
-            value = serdes.valueFrom(wrapped().fetchSession(bytesKey, startTime, endTime));
+            final byte[] result = wrapped().fetchSession(bytesKey, startTime, endTime);
+            if (result == null) {
+                return null;
+            }
+            return serdes.valueFrom(result);
         } finally {
             metrics.recordLatency(flushTime, startNs, time.nanoseconds());
         }
-
-        return value;
     }
 
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
         Objects.requireNonNull(key, "key cannot be null");
-        return findSessions(key, 0, Long.MAX_VALUE);
+        return new MeteredWindowedKeyValueIterator<>(
+            wrapped().fetch(keyBytes(key)),
+            fetchTime,
+            metrics,
+            serdes,
+            time);
     }
 
     @Override
@@ -174,7 +180,12 @@ public class MeteredSessionStore<K, V>
                                                   final K to) {
         Objects.requireNonNull(from, "from cannot be null");
         Objects.requireNonNull(to, "to cannot be null");
-        return findSessions(from, to, 0, Long.MAX_VALUE);
+        return new MeteredWindowedKeyValueIterator<>(
+            wrapped().fetch(keyBytes(from), keyBytes(to)),
+            fetchTime,
+            metrics,
+            serdes,
+            time);
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index b8fc88e..5cbe95c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -55,6 +55,7 @@ import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(EasyMockRunner.class)
@@ -244,6 +245,14 @@ public class MeteredKeyValueStoreTest {
     }
 
     @Test
+    public void shouldNotThrowNullPointerExceptionIfGetReturnsNull() {
+        expect(inner.get(Bytes.wrap("a".getBytes()))).andReturn(null);
+
+        init();
+        assertNull(metered.get("a"));
+    }
+
+    @Test
     public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() {
         assertFalse(metered.setFlushListener(null, false));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index b349f17..30c382b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -57,6 +57,7 @@ import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(EasyMockRunner.class)
@@ -172,7 +173,7 @@ public class MeteredSessionStoreTest {
 
     @Test
     public void shouldFetchForKeyAndRecordFetchMetric() {
-        expect(inner.findSessions(Bytes.wrap(keyBytes), 0, Long.MAX_VALUE))
+        expect(inner.fetch(Bytes.wrap(keyBytes)))
                 .andReturn(new KeyValueIteratorStub<>(
                         Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator()));
         init();
@@ -189,7 +190,7 @@ public class MeteredSessionStoreTest {
 
     @Test
     public void shouldFetchRangeFromStoreAndRecordFetchMetric() {
-        expect(inner.findSessions(Bytes.wrap(keyBytes), Bytes.wrap(keyBytes), 0, Long.MAX_VALUE))
+        expect(inner.fetch(Bytes.wrap(keyBytes), Bytes.wrap(keyBytes)))
                 .andReturn(new KeyValueIteratorStub<>(
                         Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator()));
         init();
@@ -211,6 +212,14 @@ public class MeteredSessionStoreTest {
         assertTrue((Double) metric.metricValue() > 0);
     }
 
+    @Test
+    public void shouldNotThrowNullPointerExceptionIfFetchSessionReturnsNull() {
+        expect(inner.fetchSession(Bytes.wrap("a".getBytes()), 0, Long.MAX_VALUE)).andReturn(null);
+
+        init();
+        assertNull(metered.fetchSession("a", 0, Long.MAX_VALUE));
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnPutIfKeyIsNull() {
         metered.put(null, "a");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 962888a..c0ed7f6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -178,7 +178,7 @@ public class MeteredWindowStoreTest {
     }
 
     @Test
-    public void shouldNotExceptionIfFetchReturnsNull() {
+    public void shouldNotThrowNullPointerExceptionIfFetchReturnsNull() {
         expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null);
         replay(innerStoreMock);