You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/10 00:07:38 UTC

[kafka] branch 1.1 updated: KAFKA-6878 Switch the order of underlying.init and initInternal (#4988)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new a7664f2  KAFKA-6878 Switch the order of underlying.init and initInternal (#4988)
a7664f2 is described below

commit a7664f22fa7f22cc724864f073e2074f4ff0ba91
Author: tedyu <yu...@gmail.com>
AuthorDate: Wed May 9 17:06:47 2018 -0700

    KAFKA-6878 Switch the order of underlying.init and initInternal (#4988)
    
    This is continuation of #4978.
    From Guozhang:
    
    I think to fix this issue, in init we could consider switching the steps of 1 and 2:
    
    initInternal(context);
    underlying.init(context, root);
    
    since
    
    volatile boolean open = false;
    
    it should be sufficient. In this case the check on step 3) will fail if underlying.init is not completed and we will throw InvalidStateStoreException.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../kafka/streams/state/internals/CachingKeyValueStore.java    |  2 +-
 .../kafka/streams/state/internals/CachingSessionStore.java     |  2 +-
 .../kafka/streams/state/internals/CachingWindowStore.java      | 10 +++++++---
 3 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 9eebc16..5db3f68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -60,8 +60,8 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
 
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
-        underlying.init(context, root);
         initInternal(context);
+        underlying.init(context, root);
         // save the stream thread as we only ever want to trigger a flush
         // when the stream thread is the current thread.
         streamThread = Thread.currentThread();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 31b9d75..022f6f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -64,8 +64,8 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
 
     public void init(final ProcessorContext context, final StateStore root) {
         topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name());
-        bytesStore.init(context, root);
         initInternal((InternalProcessorContext) context);
+        bytesStore.init(context, root);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index ad0bd99..a78978b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -67,8 +67,8 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
 
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
-        underlying.init(context, root);
         initInternal(context);
+        underlying.init(context, root);
         keySchema.init(context.applicationId());
     }
 
@@ -163,7 +163,9 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
         validateStoreOpen();
 
         final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(key, timeFrom, timeTo);
-
+        if (cache == null) {
+            return underlyingIterator;
+        }
         final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, timeFrom));
         final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, timeTo));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo);
@@ -186,7 +188,9 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
         validateStoreOpen();
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = underlying.fetch(from, to, timeFrom, timeTo);
-
+        if (cache == null) {
+            return underlyingIterator;
+        }
         final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(from, timeFrom));
         final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(to, timeTo));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo);

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.