You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/10 00:07:38 UTC
[kafka] branch 1.1 updated: KAFKA-6878 Switch the order of
underlying.init and initInternal (#4988)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new a7664f2 KAFKA-6878 Switch the order of underlying.init and initInternal (#4988)
a7664f2 is described below
commit a7664f22fa7f22cc724864f073e2074f4ff0ba91
Author: tedyu <yu...@gmail.com>
AuthorDate: Wed May 9 17:06:47 2018 -0700
KAFKA-6878 Switch the order of underlying.init and initInternal (#4988)
This is continuation of #4978.
From Guozhang:
I think to fix this issue, in init we could consider switching the steps of 1 and 2:
initInternal(context);
underlying.init(context, root);
since
volatile boolean open = false;
it should be sufficient. In this case the check on step 3) will fail if underlying.init is not completed and we will throw InvalidStateStoreException.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../kafka/streams/state/internals/CachingKeyValueStore.java | 2 +-
.../kafka/streams/state/internals/CachingSessionStore.java | 2 +-
.../kafka/streams/state/internals/CachingWindowStore.java | 10 +++++++---
3 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 9eebc16..5db3f68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -60,8 +60,8 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
@Override
public void init(final ProcessorContext context, final StateStore root) {
- underlying.init(context, root);
initInternal(context);
+ underlying.init(context, root);
// save the stream thread as we only ever want to trigger a flush
// when the stream thread is the current thread.
streamThread = Thread.currentThread();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 31b9d75..022f6f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -64,8 +64,8 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
public void init(final ProcessorContext context, final StateStore root) {
topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name());
- bytesStore.init(context, root);
initInternal((InternalProcessorContext) context);
+ bytesStore.init(context, root);
}
@SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index ad0bd99..a78978b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -67,8 +67,8 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
@Override
public void init(final ProcessorContext context, final StateStore root) {
- underlying.init(context, root);
initInternal(context);
+ underlying.init(context, root);
keySchema.init(context.applicationId());
}
@@ -163,7 +163,9 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
validateStoreOpen();
final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(key, timeFrom, timeTo);
-
+ if (cache == null) {
+ return underlyingIterator;
+ }
final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, timeFrom));
final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, timeTo));
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo);
@@ -186,7 +188,9 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
validateStoreOpen();
final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = underlying.fetch(from, to, timeFrom, timeTo);
-
+ if (cache == null) {
+ return underlyingIterator;
+ }
final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(from, timeFrom));
final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(to, timeTo));
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo);
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.