You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/09 17:42:16 UTC

[kafka] branch trunk updated: KAFKA-6878: NPE when querying global state store not in READY state (#4978)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e32dcb9  KAFKA-6878: NPE when querying global state store not in READY state (#4978)
e32dcb9 is described below

commit e32dcb9a669bc354cc97cb45f14f0dbad9657693
Author: tedyu <yu...@gmail.com>
AuthorDate: Wed May 9 10:42:10 2018 -0700

    KAFKA-6878: NPE when querying global state store not in READY state (#4978)
    
    Check whether cache is null before retrieving from cache.
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
 .../apache/kafka/streams/state/internals/CachingKeyValueStore.java   | 5 ++++-
 .../org/apache/kafka/streams/state/internals/CachingWindowStore.java | 3 +++
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 45f606f..16684e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -163,7 +163,10 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
     }
 
     private byte[] getInternal(final Bytes key) {
-        final LRUCacheEntry entry = cache.get(cacheName, key);
+        LRUCacheEntry entry = null;
+        if (cache != null) {
+            entry = cache.get(cacheName, key);
+        }
         if (entry == null) {
             final byte[] rawValue = underlying.get(key);
             if (rawValue == null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 58111a6..1d0455b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -160,6 +160,9 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
         validateStoreOpen();
         final Bytes bytesKey = WindowKeySchema.toStoreKeyBinary(key, timestamp, 0);
         final Bytes cacheKey = cacheFunction.cacheKey(bytesKey);
+        if (cache == null) {
+            return underlying.fetch(key, timestamp);
+        }
         final LRUCacheEntry entry = cache.get(name, cacheKey);
         if (entry == null) {
             return underlying.fetch(key, timestamp);

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.