You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/12 22:27:06 UTC
kafka git commit: KAFKA-5216: Fix peekNextKey in cached
window/session store iterators
Repository: kafka
Updated Branches:
refs/heads/trunk 794e6dbd1 -> da0b5b859
KAFKA-5216: Fix peekNextKey in cached window/session store iterators
guozhangwang mjsax dguy
Author: Xavier Léauté <xa...@confluent.io>
Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang
Closes #3016 from xvrl/kafka-5216
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da0b5b85
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da0b5b85
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da0b5b85
Branch: refs/heads/trunk
Commit: da0b5b8596fa20836b8c80c473b3f37af96c9b96
Parents: 794e6db
Author: Xavier Léauté <xa...@confluent.io>
Authored: Fri May 12 15:27:03 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri May 12 15:27:03 2017 -0700
----------------------------------------------------------------------
.../AbstractMergedSortedCacheStoreIterator.java | 2 +-
...SortedCacheWrappedWindowStoreIteratorTest.java | 18 ++++++++++++++++--
2 files changed, 17 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/da0b5b85/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
index 438c5b2..c5c1a2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
@@ -134,7 +134,7 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V> implements KeyVa
}
if (nextStoreKey == null) {
- return serdes.keyFrom(nextCacheKey.get());
+ return deserializeCacheKey(nextCacheKey);
}
final int comparison = compare(nextCacheKey, nextStoreKey);
http://git-wip-us.apache.org/repos/asf/kafka/blob/da0b5b85/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
index 5bc4e88..2048688 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
@@ -72,7 +72,7 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
}
@Test
- public void shouldPeekNextKey() throws Exception {
+ public void shouldPeekNextStoreKey() throws Exception {
windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes()));
cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes), new LRUCacheEntry("b".getBytes()));
Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
@@ -85,4 +85,18 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
assertThat(iterator.peekNextKey(), equalTo(10L));
}
-}
\ No newline at end of file
+ @Test
+ public void shouldPeekNextCacheKey() throws Exception {
+ windowStoreKvPairs.add(KeyValue.pair(0L, "a".getBytes()));
+ cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 10L, 0, stateSerdes), new LRUCacheEntry("b".getBytes()));
+ Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
+ Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
+ final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
+ final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, fromBytes, toBytes);
+ final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()));
+ assertThat(iterator.peekNextKey(), equalTo(0L));
+ iterator.next();
+ assertThat(iterator.peekNextKey(), equalTo(10L));
+ }
+
+}