You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/15 10:36:02 UTC

kafka git commit: KAFKA-5908; fix range query in CompositeReadOnlyWindowStore

Repository: kafka
Updated Branches:
  refs/heads/trunk f6f56a645 -> 985cc534a


KAFKA-5908; fix range query in CompositeReadOnlyWindowStore

The `NextIteratorFunction` in `CompositeReadOnlyWindowStore` was incorrectly using the `timeFrom` as the `timeTo`

Author: Damian Guy <da...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3868 from dguy/window-store-range-scan


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/985cc534
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/985cc534
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/985cc534

Branch: refs/heads/trunk
Commit: 985cc534a4278b64fc37dc37913ab974266efe43
Parents: f6f56a6
Author: Damian Guy <da...@gmail.com>
Authored: Fri Sep 15 11:35:57 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Fri Sep 15 11:35:57 2017 +0100

----------------------------------------------------------------------
 .../state/internals/CompositeReadOnlyWindowStore.java       | 2 +-
 .../state/internals/CompositeReadOnlyWindowStoreTest.java   | 9 ++++++---
 2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/985cc534/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index 298573c..5acb6b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -72,7 +72,7 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
         final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>> nextIteratorFunction = new NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>>() {
             @Override
             public KeyValueIterator<Windowed<K>, V> apply(final ReadOnlyWindowStore<K, V> store) {
-                return store.fetch(from, to, timeFrom, timeFrom);
+                return store.fetch(from, to, timeFrom, timeTo);
             }
         };
         return new DelegatingPeekingKeyValueIterator<>(storeName,

http://git-wip-us.apache.org/repos/asf/kafka/blob/985cc534/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index ae0679c..f887858 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.StateStoreProviderStub;
@@ -172,9 +173,11 @@ public class CompositeReadOnlyWindowStoreTest {
                 ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
         stubProviderTwo.addStore(storeName, secondUnderlying);
         underlyingWindowStore.put("a", "a", 0L);
-        secondUnderlying.put("b", "b", 0L);
-        List<KeyValue<Windowed<String>, String>> results = StreamsTestUtils.toList(windowStore.fetch("a", "b", 0, 1));
-        assertThat(results.size(), equalTo(2));
+        secondUnderlying.put("b", "b", 10L);
+        List<KeyValue<Windowed<String>, String>> results = StreamsTestUtils.toList(windowStore.fetch("a", "b", 0, 10));
+        assertThat(results, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"),
+                KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"))));
     }
 
     @Test(expected = NullPointerException.class)