You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/12 22:27:06 UTC

kafka git commit: KAFKA-5216: Fix peekNextKey in cached window/session store iterators

Repository: kafka
Updated Branches:
  refs/heads/trunk 794e6dbd1 -> da0b5b859


KAFKA-5216: Fix peekNextKey in cached window/session store iterators

guozhangwang mjsax dguy

Author: Xavier Léauté <xa...@confluent.io>

Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang

Closes #3016 from xvrl/kafka-5216


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da0b5b85
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da0b5b85
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da0b5b85

Branch: refs/heads/trunk
Commit: da0b5b8596fa20836b8c80c473b3f37af96c9b96
Parents: 794e6db
Author: Xavier Léauté <xa...@confluent.io>
Authored: Fri May 12 15:27:03 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri May 12 15:27:03 2017 -0700

----------------------------------------------------------------------
 .../AbstractMergedSortedCacheStoreIterator.java   |  2 +-
 ...SortedCacheWrappedWindowStoreIteratorTest.java | 18 ++++++++++++++++--
 2 files changed, 17 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/da0b5b85/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
index 438c5b2..c5c1a2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
@@ -134,7 +134,7 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V> implements KeyVa
         }
 
         if (nextStoreKey == null) {
-            return serdes.keyFrom(nextCacheKey.get());
+            return deserializeCacheKey(nextCacheKey);
         }
 
         final int comparison = compare(nextCacheKey, nextStoreKey);

http://git-wip-us.apache.org/repos/asf/kafka/blob/da0b5b85/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
index 5bc4e88..2048688 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
@@ -72,7 +72,7 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
     }
 
     @Test
-    public void shouldPeekNextKey() throws Exception {
+    public void shouldPeekNextStoreKey() throws Exception {
         windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes()));
         cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes), new LRUCacheEntry("b".getBytes()));
         Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
@@ -85,4 +85,18 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
         assertThat(iterator.peekNextKey(), equalTo(10L));
     }
 
-}
\ No newline at end of file
+    @Test
+    public void shouldPeekNextCacheKey() throws Exception {
+        windowStoreKvPairs.add(KeyValue.pair(0L, "a".getBytes()));
+        cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 10L, 0, stateSerdes), new LRUCacheEntry("b".getBytes()));
+        Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
+        Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
+        final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, fromBytes, toBytes);
+        final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()));
+        assertThat(iterator.peekNextKey(), equalTo(0L));
+        iterator.next();
+        assertThat(iterator.peekNextKey(), equalTo(10L));
+    }
+
+}