You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/19 00:02:56 UTC
[2/3] kafka git commit: KAFKA-5192: add WindowStore range scan
(KIP-155)
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 3036f79..77b92a5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -38,9 +38,12 @@ import java.util.concurrent.ConcurrentHashMap;
*/
class Segments {
private static final Logger log = LoggerFactory.getLogger(Segments.class);
-
static final long MIN_SEGMENT_INTERVAL = 60 * 1000L;
+ static long segmentInterval(long retentionPeriod, int numSegments) {
+ return Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
+ }
+
private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>();
private final String name;
private final int numSegments;
@@ -52,7 +55,7 @@ class Segments {
Segments(final String name, final long retentionPeriod, final int numSegments) {
this.name = name;
this.numSegments = numSegments;
- this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
+ this.segmentInterval = segmentInterval(retentionPeriod, numSegments);
// Create a date formatter. Formatted timestamps are used as segment name suffixes
this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index 80785b2..6d6d9bf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -23,10 +23,15 @@ import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
+import java.nio.ByteBuffer;
import java.util.List;
class SessionKeySchema implements SegmentedBytesStore.KeySchema {
+
+ private static final int SUFFIX_SIZE = 2 * WindowStoreUtils.TIMESTAMP_SIZE;
+ private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE];
+
private String topic;
@Override
@@ -35,33 +40,49 @@ class SessionKeySchema implements SegmentedBytesStore.KeySchema {
}
@Override
- public Bytes upperRange(final Bytes key, final long to) {
+ public Bytes upperRangeFixedSize(final Bytes key, final long to) {
final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE));
return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic);
}
@Override
- public Bytes lowerRange(final Bytes key, final long from) {
+ public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from)));
return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic);
}
@Override
+ public Bytes upperRange(Bytes key, long to) {
+ final byte[] maxSuffix = ByteBuffer.allocate(SUFFIX_SIZE)
+ .putLong(to)
+ // start can at most be equal to end
+ .putLong(to)
+ .array();
+ return OrderedBytes.upperRange(key, maxSuffix);
+ }
+
+ @Override
+ public Bytes lowerRange(Bytes key, long from) {
+ return OrderedBytes.lowerRange(key, MIN_SUFFIX);
+ }
+
+ @Override
public long segmentTimestamp(final Bytes key) {
return SessionKeySerde.extractEnd(key.get());
}
@Override
- public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, final long to) {
+ public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) {
return new HasNextCondition() {
@Override
public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
while (iterator.hasNext()) {
final Bytes bytes = iterator.peekNextKey();
final Windowed<Bytes> windowedKey = SessionKeySerde.fromBytes(bytes);
- if (windowedKey.key().equals(binaryKey)
- && windowedKey.window().end() >= from
- && windowedKey.window().start() <= to) {
+ if (windowedKey.key().compareTo(binaryKeyFrom) >= 0
+ && windowedKey.key().compareTo(binaryKeyTo) <= 0
+ && windowedKey.window().end() >= from
+ && windowedKey.window().start() <= to) {
return true;
}
iterator.next();
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index b9a8665..214f36b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -21,9 +21,14 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
+import java.nio.ByteBuffer;
import java.util.List;
class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
+
+ private static final int SUFFIX_SIZE = WindowStoreUtils.TIMESTAMP_SIZE + WindowStoreUtils.SEQNUM_SIZE;
+ private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE];
+
private StateSerdes<Bytes, byte[]> serdes;
@Override
@@ -33,21 +38,36 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
@Override
public Bytes upperRange(final Bytes key, final long to) {
- return WindowStoreUtils.toBinaryKey(key, to, Integer.MAX_VALUE, serdes);
+ final byte[] maxSuffix = ByteBuffer.allocate(SUFFIX_SIZE)
+ .putLong(to)
+ .putInt(Integer.MAX_VALUE)
+ .array();
+
+ return OrderedBytes.upperRange(key, maxSuffix);
}
@Override
public Bytes lowerRange(final Bytes key, final long from) {
+ return OrderedBytes.lowerRange(key, MIN_SUFFIX);
+ }
+
+ @Override
+ public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
return WindowStoreUtils.toBinaryKey(key, Math.max(0, from), 0, serdes);
}
@Override
+ public Bytes upperRangeFixedSize(final Bytes key, final long to) {
+ return WindowStoreUtils.toBinaryKey(key, to, Integer.MAX_VALUE, serdes);
+ }
+
+ @Override
public long segmentTimestamp(final Bytes key) {
return WindowStoreUtils.timestampFromBinaryKey(key.get());
}
@Override
- public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, final long to) {
+ public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) {
return new HasNextCondition() {
@Override
public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
@@ -55,9 +75,10 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
final Bytes bytes = iterator.peekNextKey();
final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get());
final long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get());
- if (keyBytes.equals(binaryKey)
- && time >= from
- && time <= to) {
+ if (keyBytes.compareTo(binaryKeyFrom) >= 0
+ && keyBytes.compareTo(binaryKeyTo) <= 0
+ && time >= from
+ && time <= to) {
return true;
}
iterator.next();
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
new file mode 100644
index 0000000..4fd6f3e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreIteratorWrapper.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.NoSuchElementException;
+
+class WindowStoreIteratorWrapper<K, V> {
+
+ // this is optimizing the case when underlying is already a bytes store iterator, in which we can avoid Bytes.wrap() costs
+ private static class WrappedWindowStoreBytesIterator extends WindowStoreIteratorWrapper<Bytes, byte[]> {
+ WrappedWindowStoreBytesIterator(final KeyValueIterator<Bytes, byte[]> underlying,
+ final StateSerdes<Bytes, byte[]> serdes,
+ final long windowSize) {
+ super(underlying, serdes, windowSize);
+ }
+
+ @Override
+ public WindowStoreIterator<byte[]> valuesIterator() {
+ return new WrappedWindowStoreIterator<byte[]>(bytesIterator, serdes) {
+ @Override
+ public KeyValue<Long, byte[]> next() {
+ final KeyValue<Bytes, byte[]> next = bytesIterator.next();
+ final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
+ return KeyValue.pair(timestamp, next.value);
+ }
+ };
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> keyValueIterator() {
+ return new WrappedKeyValueIterator<Bytes, byte[]>(bytesIterator, serdes, windowSize) {
+ @Override
+ public Windowed<Bytes> peekNextKey() {
+ final Bytes next = bytesIterator.peekNextKey();
+ final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.get());
+ final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(next.get());
+ return new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize));
+ }
+
+ @Override
+ public KeyValue<Windowed<Bytes>, byte[]> next() {
+ if (!bytesIterator.hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ final KeyValue<Bytes, byte[]> next = bytesIterator.next();
+ final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
+ final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(next.key.get());
+ return KeyValue.pair(
+ new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)),
+ next.value
+ );
+ }
+ };
+ }
+ }
+
+ static WindowStoreIteratorWrapper<Bytes, byte[]> bytesIterator(final KeyValueIterator<Bytes, byte[]> underlying,
+ final StateSerdes<Bytes, byte[]> serdes,
+ final long windowSize) {
+ return new WrappedWindowStoreBytesIterator(underlying, serdes, windowSize);
+ }
+
+
+ protected final KeyValueIterator<Bytes, byte[]> bytesIterator;
+ protected final StateSerdes<K, V> serdes;
+ protected final long windowSize;
+
+ WindowStoreIteratorWrapper(
+ final KeyValueIterator<Bytes, byte[]> bytesIterator,
+ final StateSerdes<K, V> serdes,
+ final long windowSize
+ ) {
+ this.bytesIterator = bytesIterator;
+ this.serdes = serdes;
+ this.windowSize = windowSize;
+ }
+
+ public WindowStoreIterator<V> valuesIterator() {
+ return new WrappedWindowStoreIterator<>(bytesIterator, serdes);
+ }
+
+ public KeyValueIterator<Windowed<K>, V> keyValueIterator() {
+ return new WrappedKeyValueIterator<>(bytesIterator, serdes, windowSize);
+ }
+
+ private static class WrappedWindowStoreIterator<V> implements WindowStoreIterator<V> {
+ final KeyValueIterator<Bytes, byte[]> bytesIterator;
+ final StateSerdes<?, V> serdes;
+
+ WrappedWindowStoreIterator(
+ KeyValueIterator<Bytes, byte[]> bytesIterator, StateSerdes<?, V> serdes) {
+ this.bytesIterator = bytesIterator;
+ this.serdes = serdes;
+ }
+
+ @Override
+ public Long peekNextKey() {
+ return WindowStoreUtils.timestampFromBinaryKey(bytesIterator.peekNextKey().get());
+ }
+
+ @Override
+ public boolean hasNext() {
+ return bytesIterator.hasNext();
+ }
+
+ @Override
+ public KeyValue<Long, V> next() {
+ final KeyValue<Bytes, byte[]> next = bytesIterator.next();
+ final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
+ final V value = serdes.valueFrom(next.value);
+ return KeyValue.pair(timestamp, value);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName());
+ }
+
+ @Override
+ public void close() {
+ bytesIterator.close();
+ }
+ }
+
+ private static class WrappedKeyValueIterator<K, V> implements KeyValueIterator<Windowed<K>, V> {
+ final KeyValueIterator<Bytes, byte[]> bytesIterator;
+ final StateSerdes<K, V> serdes;
+ final long windowSize;
+
+ WrappedKeyValueIterator(
+ KeyValueIterator<Bytes, byte[]> bytesIterator, StateSerdes<K, V> serdes, long windowSize) {
+ this.bytesIterator = bytesIterator;
+ this.serdes = serdes;
+ this.windowSize = windowSize;
+ }
+
+ @Override
+ public Windowed<K> peekNextKey() {
+ final byte[] nextKey = bytesIterator.peekNextKey().get();
+ final long timestamp = WindowStoreUtils.timestampFromBinaryKey(nextKey);
+ final K key = WindowStoreUtils.keyFromBinaryKey(nextKey, serdes);
+ return new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize));
+ }
+
+ @Override
+ public boolean hasNext() {
+ return bytesIterator.hasNext();
+ }
+
+ @Override
+ public KeyValue<Windowed<K>, V> next() {
+ final KeyValue<Bytes, byte[]> next = bytesIterator.next();
+ final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
+ final K key = WindowStoreUtils.keyFromBinaryKey(next.key.get(), serdes);
+ final V value = serdes.valueFrom(next.value);
+ return KeyValue.pair(
+ new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)),
+ value
+ );
+
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName());
+ }
+
+ @Override
+ public void close() {
+ bytesIterator.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
index faf2899..ed79947 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
@@ -19,14 +19,15 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.StateSerdes;
import java.nio.ByteBuffer;
public class WindowStoreUtils {
- private static final int SEQNUM_SIZE = 4;
- private static final int TIMESTAMP_SIZE = 8;
+ static final int SEQNUM_SIZE = 4;
+ static final int TIMESTAMP_SIZE = 8;
/** Inner byte array serde used for segments */
static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes();
@@ -73,4 +74,13 @@ public class WindowStoreUtils {
static int sequenceNumberFromBinaryKey(byte[] binaryKey) {
return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length - SEQNUM_SIZE);
}
+
+ /**
+ * Safely construct a time window of the given size,
+ * taking care of bounding endMs to Long.MAX_VALUE if necessary
+ */
+ static TimeWindow timeWindowForSize(final long startMs, final long windowSize) {
+ final long endMs = startMs + windowSize;
+ return new TimeWindow(startMs, endMs < 0 ? Long.MAX_VALUE : endMs);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedWindowStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedWindowStoreIterator.java
deleted file mode 100644
index 1ce6b04..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedWindowStoreIterator.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.streams.state.WindowStoreIterator;
-
-import java.util.NoSuchElementException;
-
-class WrappedWindowStoreIterator<V> implements WindowStoreIterator<V> {
- final KeyValueIterator<Bytes, byte[]> bytesIterator;
- private final StateSerdes<?, V> serdes;
-
- // this is optimizing the case when underlying is already a bytes store iterator, in which we can avoid Bytes.wrap() costs
- private static class WrappedWindowStoreBytesIterator extends WrappedWindowStoreIterator<byte[]> {
- WrappedWindowStoreBytesIterator(final KeyValueIterator<Bytes, byte[]> underlying,
- final StateSerdes<Bytes, byte[]> serdes) {
- super(underlying, serdes);
- }
-
- @Override
- public KeyValue<Long, byte[]> next() {
- if (!bytesIterator.hasNext()) {
- throw new NoSuchElementException();
- }
-
- final KeyValue<Bytes, byte[]> next = bytesIterator.next();
- final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
- final byte[] value = next.value;
- return KeyValue.pair(timestamp, value);
- }
- }
-
- static WrappedWindowStoreIterator<byte[]> bytesIterator(final KeyValueIterator<Bytes, byte[]> underlying,
- final StateSerdes<Bytes, byte[]> serdes) {
- return new WrappedWindowStoreBytesIterator(underlying, serdes);
- }
-
- WrappedWindowStoreIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator, final StateSerdes<?, V> serdes) {
- this.bytesIterator = bytesIterator;
- this.serdes = serdes;
- }
-
- @Override
- public boolean hasNext() {
- return bytesIterator.hasNext();
- }
-
- /**
- * @throws NoSuchElementException if no next element exists
- */
- @Override
- public KeyValue<Long, V> next() {
- final KeyValue<Bytes, byte[]> next = bytesIterator.next();
- final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
- final V value = serdes.valueFrom(next.value);
- return KeyValue.pair(timestamp, value);
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName());
- }
-
- @Override
- public void close() {
- bytesIterator.close();
- }
-
- @Override
- public Long peekNextKey() {
- return WindowStoreUtils.timestampFromBinaryKey(bytesIterator.peekNextKey().get());
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 24e0329..97a1408 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -131,23 +131,23 @@ public class KStreamWindowAggregateTest {
assertEquals(Utils.mkList(
- "[A@0]:0+1",
- "[B@0]:0+2",
- "[C@0]:0+3",
- "[D@0]:0+4",
- "[A@0]:0+1+1",
-
- "[A@0]:0+1+1+1", "[A@5]:0+1",
- "[B@0]:0+2+2", "[B@5]:0+2",
- "[D@0]:0+4+4", "[D@5]:0+4",
- "[B@0]:0+2+2+2", "[B@5]:0+2+2",
- "[C@0]:0+3+3", "[C@5]:0+3",
-
- "[A@5]:0+1+1", "[A@10]:0+1",
- "[B@5]:0+2+2+2", "[B@10]:0+2",
- "[D@5]:0+4+4", "[D@10]:0+4",
- "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
- "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
+ "[A@0/10]:0+1",
+ "[B@0/10]:0+2",
+ "[C@0/10]:0+3",
+ "[D@0/10]:0+4",
+ "[A@0/10]:0+1+1",
+
+ "[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
+ "[B@0/10]:0+2+2", "[B@5/15]:0+2",
+ "[D@0/10]:0+4+4", "[D@5/15]:0+4",
+ "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2",
+ "[C@0/10]:0+3+3", "[C@5/15]:0+3",
+
+ "[A@5/15]:0+1+1", "[A@10/20]:0+1",
+ "[B@5/15]:0+2+2+2", "[B@10/20]:0+2",
+ "[D@5/15]:0+4+4", "[D@10/20]:0+4",
+ "[B@5/15]:0+2+2+2+2", "[B@10/20]:0+2+2",
+ "[C@5/15]:0+3+3", "[C@10/20]:0+3"), proc2.processed);
}
private void setRecordContext(final long time, final String topic) {
@@ -210,11 +210,11 @@ public class KStreamWindowAggregateTest {
driver.flushState();
proc1.checkAndClearProcessResult(
- "[A@0]:0+1",
- "[B@0]:0+2",
- "[C@0]:0+3",
- "[D@0]:0+4",
- "[A@0]:0+1+1"
+ "[A@0/10]:0+1",
+ "[B@0/10]:0+2",
+ "[C@0/10]:0+3",
+ "[D@0/10]:0+4",
+ "[A@0/10]:0+1+1"
);
proc2.checkAndClearProcessResult();
proc3.checkAndClearProcessResult();
@@ -236,11 +236,11 @@ public class KStreamWindowAggregateTest {
driver.flushState();
proc1.checkAndClearProcessResult(
- "[A@0]:0+1+1+1", "[A@5]:0+1",
- "[B@0]:0+2+2", "[B@5]:0+2",
- "[D@0]:0+4+4", "[D@5]:0+4",
- "[B@0]:0+2+2+2", "[B@5]:0+2+2",
- "[C@0]:0+3+3", "[C@5]:0+3"
+ "[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
+ "[B@0/10]:0+2+2", "[B@5/15]:0+2",
+ "[D@0/10]:0+4+4", "[D@5/15]:0+4",
+ "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2",
+ "[C@0/10]:0+3+3", "[C@5/15]:0+3"
);
proc2.checkAndClearProcessResult();
proc3.checkAndClearProcessResult();
@@ -263,18 +263,18 @@ public class KStreamWindowAggregateTest {
proc1.checkAndClearProcessResult();
proc2.checkAndClearProcessResult(
- "[A@0]:0+a",
- "[B@0]:0+b",
- "[C@0]:0+c",
- "[D@0]:0+d",
- "[A@0]:0+a+a"
+ "[A@0/10]:0+a",
+ "[B@0/10]:0+b",
+ "[C@0/10]:0+c",
+ "[D@0/10]:0+d",
+ "[A@0/10]:0+a+a"
);
proc3.checkAndClearProcessResult(
- "[A@0]:0+1+1+1%0+a",
- "[B@0]:0+2+2+2%0+b",
- "[C@0]:0+3+3%0+c",
- "[D@0]:0+4+4%0+d",
- "[A@0]:0+1+1+1%0+a+a");
+ "[A@0/10]:0+1+1+1%0+a",
+ "[B@0/10]:0+2+2+2%0+b",
+ "[C@0/10]:0+3+3%0+c",
+ "[D@0/10]:0+4+4%0+d",
+ "[A@0/10]:0+1+1+1%0+a+a");
setRecordContext(5, topic1);
driver.process(topic2, "A", "a");
@@ -293,18 +293,18 @@ public class KStreamWindowAggregateTest {
driver.flushState();
proc1.checkAndClearProcessResult();
proc2.checkAndClearProcessResult(
- "[A@0]:0+a+a+a", "[A@5]:0+a",
- "[B@0]:0+b+b", "[B@5]:0+b",
- "[D@0]:0+d+d", "[D@5]:0+d",
- "[B@0]:0+b+b+b", "[B@5]:0+b+b",
- "[C@0]:0+c+c", "[C@5]:0+c"
+ "[A@0/10]:0+a+a+a", "[A@5/15]:0+a",
+ "[B@0/10]:0+b+b", "[B@5/15]:0+b",
+ "[D@0/10]:0+d+d", "[D@5/15]:0+d",
+ "[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b",
+ "[C@0/10]:0+c+c", "[C@5/15]:0+c"
);
proc3.checkAndClearProcessResult(
- "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
- "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
- "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
- "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
- "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
+ "[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a",
+ "[B@0/10]:0+2+2+2%0+b+b", "[B@5/15]:0+2+2%0+b",
+ "[D@0/10]:0+4+4%0+d+d", "[D@5/15]:0+4%0+d",
+ "[B@0/10]:0+2+2+2%0+b+b+b", "[B@5/15]:0+2+2%0+b+b",
+ "[C@0/10]:0+3+3%0+c+c", "[C@5/15]:0+3%0+c"
);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
index e7bd187..3ad6475 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@@ -55,4 +56,9 @@ public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore {
public WindowStoreIterator fetch(final Object key, final long timeFrom, final long timeTo) {
return null;
}
+
+ @Override
+ public WindowStoreIterator<KeyValue> fetch(Object from, Object to, long timeFrom, long timeTo) {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index f8eec1c..bfc20ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -63,11 +63,15 @@ public class CachingSessionStoreTest {
public void setUp() throws Exception {
final SessionKeySchema schema = new SessionKeySchema();
schema.init("topic");
- underlying = new RocksDBSegmentedBytesStore("test", 60000, 3, schema);
+ final int retention = 60000;
+ final int numSegments = 3;
+ underlying = new RocksDBSegmentedBytesStore("test", retention, numSegments, schema);
final RocksDBSessionStore<Bytes, byte[]> sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray());
cachingStore = new CachingSessionStore<>(sessionStore,
Serdes.String(),
- Serdes.Long());
+ Serdes.Long(),
+ Segments.segmentInterval(retention, numSegments)
+ );
cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic"));
@@ -86,6 +90,8 @@ public class CachingSessionStoreTest {
cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 1L);
cachingStore.put(new Windowed<>("b", new SessionWindow(0, 0)), 1L);
+ assertEquals(3, cache.size());
+
final KeyValueIterator<Windowed<String>, Long> a = cachingStore.findSessions("a", 0, 0);
final KeyValueIterator<Windowed<String>, Long> b = cachingStore.findSessions("b", 0, 0);
@@ -93,7 +99,35 @@ public class CachingSessionStoreTest {
assertEquals(KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 1L), b.next());
assertFalse(a.hasNext());
assertFalse(b.hasNext());
+ }
+
+ @Test
+ public void shouldPutFetchAllKeysFromCache() throws Exception {
+ cachingStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
+ cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 1L);
+ cachingStore.put(new Windowed<>("b", new SessionWindow(0, 0)), 1L);
+
+ assertEquals(3, cache.size());
+
+ final KeyValueIterator<Windowed<String>, Long> all = cachingStore.findSessions("a", "b", 0, 0);
+ assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), all.next());
+ assertEquals(KeyValue.pair(new Windowed<>("aa", new SessionWindow(0, 0)), 1L), all.next());
+ assertEquals(KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 1L), all.next());
+ assertFalse(all.hasNext());
+ }
+
+ @Test
+ public void shouldPutFetchRangeFromCache() throws Exception {
+ cachingStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
+ cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 1L);
+ cachingStore.put(new Windowed<>("b", new SessionWindow(0, 0)), 1L);
+
assertEquals(3, cache.size());
+
+ final KeyValueIterator<Windowed<String>, Long> some = cachingStore.findSessions("aa", "b", 0, 0);
+ assertEquals(KeyValue.pair(new Windowed<>("aa", new SessionWindow(0, 0)), 1L), some.next());
+ assertEquals(KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 1L), some.next());
+ assertFalse(some.hasNext());
}
@Test
@@ -164,6 +198,29 @@ public class CachingSessionStoreTest {
}
@Test
+ public void shouldFetchRangeCorrectlyAcrossSegments() throws Exception {
+ final Windowed<String> a1 = new Windowed<>("a", new SessionWindow(0, 0));
+ final Windowed<String> aa1 = new Windowed<>("aa", new SessionWindow(0, 0));
+ final Windowed<String> a2 = new Windowed<>("a", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL, Segments.MIN_SEGMENT_INTERVAL));
+ final Windowed<String> a3 = new Windowed<>("a", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2));
+ final Windowed<String> aa3 = new Windowed<>("aa", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2));
+ cachingStore.put(a1, 1L);
+ cachingStore.put(aa1, 1L);
+ cachingStore.put(a2, 2L);
+ cachingStore.put(a3, 3L);
+ cachingStore.put(aa3, 3L);
+ cachingStore.flush();
+
+ final KeyValueIterator<Windowed<String>, Long> rangeResults = cachingStore.findSessions("a", "aa", 0, Segments.MIN_SEGMENT_INTERVAL * 2);
+ assertEquals(a1, rangeResults.next().key);
+ assertEquals(aa1, rangeResults.next().key);
+ assertEquals(a2, rangeResults.next().key);
+ assertEquals(a3, rangeResults.next().key);
+ assertEquals(aa3, rangeResults.next().key);
+ assertFalse(rangeResults.hasNext());
+ }
+
+ @Test
public void shouldForwardChangedValuesDuringFlush() throws Exception {
final Windowed<String> a = new Windowed<>("a", new SessionWindow(0, 0));
final List<KeyValue<Windowed<String>, Change<Long>>> flushed = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 5b3123e..faf6e83 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -64,13 +64,16 @@ public class CachingWindowStoreTest {
@Before
public void setUp() throws Exception {
keySchema = new WindowKeySchema();
- underlying = new RocksDBSegmentedBytesStore("test", 30000, 3, keySchema);
- final RocksDBWindowStore<Bytes, byte[]> windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false);
+ final int retention = 30000;
+ final int numSegments = 3;
+ underlying = new RocksDBSegmentedBytesStore("test", retention, numSegments, keySchema);
+ final RocksDBWindowStore<Bytes, byte[]> windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false, WINDOW_SIZE);
cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>();
cachingStore = new CachingWindowStore<>(windowStore,
Serdes.String(),
Serdes.String(),
- WINDOW_SIZE);
+ WINDOW_SIZE,
+ Segments.segmentInterval(retention, numSegments));
cachingStore.setFlushListener(cacheListener);
cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
topic = "topic";
@@ -100,6 +103,19 @@ public class CachingWindowStoreTest {
}
@Test
+ public void shouldPutFetchRangeFromCache() throws Exception {
+ cachingStore.put("a", "a");
+ cachingStore.put("b", "b");
+
+ final KeyValueIterator<Windowed<String>, String> iterator = cachingStore.fetch("a", "b", 10, 10);
+ assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), "a"), iterator.next());
+ assertEquals(KeyValue.pair(new Windowed<>("b", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), "b"), iterator.next());
+ assertFalse(iterator.hasNext());
+ assertEquals(2, cache.size());
+ }
+
+
+ @Test
public void shouldFlushEvictedItemsIntoUnderlyingStore() throws Exception {
int added = addItemsToCache();
// all dirty entries should have been flushed
@@ -171,6 +187,19 @@ public class CachingWindowStoreTest {
}
@Test
+ public void shouldIterateCacheAndStoreKeyRange() throws Exception {
+ final Bytes key = Bytes.wrap("1" .getBytes());
+ underlying.put(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.getInnerStateSerde("app-id")), "a".getBytes());
+ cachingStore.put("1", "b", DEFAULT_TIMESTAMP + WINDOW_SIZE);
+
+ final KeyValueIterator<Windowed<String>, String> fetchRange =
+ cachingStore.fetch("1", "2", DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE);
+ assertEquals(KeyValue.pair(new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), "a"), fetchRange.next());
+ assertEquals(KeyValue.pair(new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP + WINDOW_SIZE, DEFAULT_TIMESTAMP + WINDOW_SIZE + WINDOW_SIZE)), "b"), fetchRange.next());
+ assertFalse(fetchRange.hasNext());
+ }
+
+ @Test
public void shouldClearNamespaceCacheOnClose() throws Exception {
cachingStore.put("a", "a");
assertEquals(1, cache.size());
@@ -185,12 +214,17 @@ public class CachingWindowStoreTest {
}
@Test(expected = InvalidStateStoreException.class)
+ public void shouldThrowIfTryingToFetchRangeFromClosedCachingStore() throws Exception {
+ cachingStore.close();
+ cachingStore.fetch("a", "b", 0, 10);
+ }
+
+ @Test(expected = InvalidStateStoreException.class)
public void shouldThrowIfTryingToWriteToClosedCachingStore() throws Exception {
cachingStore.close();
cachingStore.put("a", "a");
}
- @SuppressWarnings("unchecked")
@Test
public void shouldFetchAndIterateOverExactKeys() throws Exception {
cachingStore.put("a", "0001", 0);
@@ -203,6 +237,34 @@ public class CachingWindowStoreTest {
assertThat(toList(cachingStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected));
}
+ @Test
+ public void shouldFetchAndIterateOverKeyRange() throws Exception {
+ cachingStore.put("a", "0001", 0);
+ cachingStore.put("aa", "0002", 0);
+ cachingStore.put("a", "0003", 1);
+ cachingStore.put("aa", "0004", 1);
+ cachingStore.put("a", "0005", 60000);
+
+ assertThat(
+ toList(cachingStore.fetch("a", "a", 0, Long.MAX_VALUE)),
+ equalTo(Utils.mkList(windowedPair("a", "0001", 0), windowedPair("a", "0003", 1), windowedPair("a", "0005", 60000L)))
+ );
+
+ assertThat(
+ toList(cachingStore.fetch("aa", "aa", 0, Long.MAX_VALUE)),
+ equalTo(Utils.mkList(windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1)))
+ );
+
+ assertThat(
+ toList(cachingStore.fetch("a", "aa", 0, Long.MAX_VALUE)),
+ equalTo(Utils.mkList(windowedPair("a", "0001", 0), windowedPair("a", "0003", 1), windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1), windowedPair("a", "0005", 60000L)))
+ );
+ }
+
+ private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp) {
+ return KeyValue.pair(new Windowed<>(key, new TimeWindow(timestamp, timestamp + WINDOW_SIZE)), value);
+ }
+
private int addItemsToCache() throws IOException {
int cachedSize = 0;
int i = 0;
@@ -216,4 +278,4 @@ public class CachingWindowStoreTest {
return i;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index 6f4ff07..b6e95a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -38,6 +38,7 @@ import static org.junit.Assert.assertEquals;
public class CompositeReadOnlyWindowStoreTest {
+ private static final long WINDOW_SIZE = 30_000;
private final String storeName = "window-store";
private StateStoreProviderStub stubProviderOne;
private StateStoreProviderStub stubProviderTwo;
@@ -54,10 +55,10 @@ public class CompositeReadOnlyWindowStoreTest {
public void before() {
stubProviderOne = new StateStoreProviderStub(false);
stubProviderTwo = new StateStoreProviderStub(false);
- underlyingWindowStore = new ReadOnlyWindowStoreStub<>();
+ underlyingWindowStore = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
stubProviderOne.addStore(storeName, underlyingWindowStore);
- otherUnderlyingStore = new ReadOnlyWindowStoreStub<>();
+ otherUnderlyingStore = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
stubProviderOne.addStore("other-window-store", otherUnderlyingStore);
@@ -89,7 +90,7 @@ public class CompositeReadOnlyWindowStoreTest {
@Test
public void shouldFindValueForKeyWhenMultiStores() throws Exception {
final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new
- ReadOnlyWindowStoreStub<>();
+ ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
stubProviderTwo.addStore(storeName, secondUnderlying);
underlyingWindowStore.put("key-one", "value-one", 0L);
@@ -162,4 +163,4 @@ public class CompositeReadOnlyWindowStoreTest {
windowStoreIterator.next();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
index acded8c..6cc77df 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
@@ -34,6 +34,18 @@ import static org.junit.Assert.assertTrue;
public class FilteredCacheIteratorTest {
+ private static final CacheFunction IDENTITY_FUNCTION = new CacheFunction() {
+ @Override
+ public Bytes key(Bytes cacheKey) {
+ return cacheKey;
+ }
+
+ @Override
+ public Bytes cacheKey(Bytes key) {
+ return key;
+ }
+ };
+
@SuppressWarnings("unchecked")
private final InMemoryKeyValueStore<Bytes, LRUCacheEntry> store = new InMemoryKeyValueStore("name", null, null);
private final KeyValue<Bytes, LRUCacheEntry> firstEntry = KeyValue.pair(Bytes.wrap("a".getBytes()),
@@ -58,8 +70,8 @@ public class FilteredCacheIteratorTest {
}
};
allIterator = new FilteredCacheIterator(
- new DelegatingPeekingKeyValueIterator<>("",
- store.all()), allCondition);
+ new DelegatingPeekingKeyValueIterator<>("",
+ store.all()), allCondition, IDENTITY_FUNCTION);
final HasNextCondition firstEntryCondition = new HasNextCondition() {
@Override
@@ -69,7 +81,7 @@ public class FilteredCacheIteratorTest {
};
firstEntryIterator = new FilteredCacheIterator(
new DelegatingPeekingKeyValueIterator<>("",
- store.all()), firstEntryCondition);
+ store.all()), firstEntryCondition, IDENTITY_FUNCTION);
}
@@ -115,4 +127,4 @@ public class FilteredCacheIteratorTest {
allIterator.remove();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
index d3d8f40..ee5e529 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
@@ -36,6 +36,13 @@ import static org.junit.Assert.assertTrue;
public class MergedSortedCacheWrappedSessionStoreIteratorTest {
+ private static final SegmentedCacheFunction SINGLE_SEGMENT_CACHE_FUNCTION = new SegmentedCacheFunction(null, -1) {
+ @Override
+ public long segmentId(Bytes key) {
+ return 0;
+ }
+ };
+
private final String storeKey = "a";
private final String cacheKey = "b";
@@ -43,10 +50,13 @@ public class MergedSortedCacheWrappedSessionStoreIteratorTest {
private final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs = Collections.singleton(
KeyValue.pair(new Windowed<>(Bytes.wrap(storeKey.getBytes()), storeWindow), storeKey.getBytes())).iterator();
private final SessionWindow cacheWindow = new SessionWindow(10, 20);
- private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(KeyValue.pair(
- SessionKeySerde.toBinary(
- new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer(), "dummy"), new LRUCacheEntry(cacheKey.getBytes())))
- .iterator();
+ private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(
+ KeyValue.pair(
+ SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(
+ SessionKeySerde.toBinary(new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer(), "dummy")
+ ),
+ new LRUCacheEntry(cacheKey.getBytes())
+ )).iterator();
@Test
public void shouldHaveNextFromStore() throws Exception {
@@ -106,7 +116,10 @@ public class MergedSortedCacheWrappedSessionStoreIteratorTest {
final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator
= new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs));
- return new MergedSortedCacheSessionStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.String(), Serdes.String()));
+ return new MergedSortedCacheSessionStoreIterator<>(
+ cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.String(), Serdes.String()),
+ SINGLE_SEGMENT_CACHE_FUNCTION
+ );
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
index 2048688..fed39b7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
@@ -36,6 +36,13 @@ import static org.junit.Assert.assertEquals;
public class MergedSortedCacheWrappedWindowStoreIteratorTest {
+ private static final SegmentedCacheFunction SINGLE_SEGMENT_CACHE_FUNCTION = new SegmentedCacheFunction(null, -1) {
+ @Override
+ public long segmentId(Bytes key) {
+ return 0;
+ }
+ };
+
private final List<KeyValue<Long, byte[]>> windowStoreKvPairs = new ArrayList<>();
private final ThreadCache cache = new ThreadCache("testCache", 1000000L, new MockStreamsMetrics(new Metrics()));
private final String namespace = "one";
@@ -52,16 +59,20 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
final Bytes keyBytes = WindowStoreUtils.toBinaryKey("a", t + 10, 0, stateSerdes);
final byte[] valBytes = String.valueOf(t + 10).getBytes();
expectedKvPairs.add(KeyValue.pair(t + 10, valBytes));
- cache.put(namespace, keyBytes, new LRUCacheEntry(valBytes));
+ cache.put(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(keyBytes), new LRUCacheEntry(valBytes));
}
Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
- final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, fromBytes, toBytes);
+ final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(
+ namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes)
+ );
- final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()));
+ final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(
+ cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray())
+ );
int index = 0;
while (iterator.hasNext()) {
final KeyValue<Long, byte[]> next = iterator.next();
@@ -74,12 +85,16 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
@Test
public void shouldPeekNextStoreKey() throws Exception {
windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes()));
- cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes), new LRUCacheEntry("b".getBytes()));
+ cache.put(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes)), new LRUCacheEntry("b".getBytes()));
Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
- final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, fromBytes, toBytes);
- final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()));
+ final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(
+ namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes)
+ );
+ final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(
+ cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray())
+ );
assertThat(iterator.peekNextKey(), equalTo(0L));
iterator.next();
assertThat(iterator.peekNextKey(), equalTo(10L));
@@ -88,15 +103,14 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
@Test
public void shouldPeekNextCacheKey() throws Exception {
windowStoreKvPairs.add(KeyValue.pair(0L, "a".getBytes()));
- cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 10L, 0, stateSerdes), new LRUCacheEntry("b".getBytes()));
+ cache.put(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(WindowStoreUtils.toBinaryKey("a", 10L, 0, stateSerdes)), new LRUCacheEntry("b".getBytes()));
Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
- final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, fromBytes, toBytes);
+ final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes));
final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()));
assertThat(iterator.peekNextKey(), equalTo(0L));
iterator.next();
assertThat(iterator.peekNextKey(), equalTo(10L));
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
new file mode 100644
index 0000000..114a150
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.KeyValueIteratorStub;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest {
+ private static final SegmentedCacheFunction SINGLE_SEGMENT_CACHE_FUNCTION = new SegmentedCacheFunction(null, -1) {
+ @Override
+ public long segmentId(Bytes key) {
+ return 0;
+ }
+ };
+ private static final int WINDOW_SIZE = 10;
+
+ private final String storeKey = "a";
+ private final String cacheKey = "b";
+
+ private final TimeWindow storeWindow = new TimeWindow(0, 1);
+ private final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs = Collections.singleton(
+ KeyValue.pair(new Windowed<>(Bytes.wrap(storeKey.getBytes()), storeWindow), storeKey.getBytes())).iterator();
+ private final TimeWindow cacheWindow = new TimeWindow(10, 20);
+ private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(
+ KeyValue.pair(
+ SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(
+ WindowStoreUtils.toBinaryKey(
+ cacheKey, cacheWindow.start(), 0,
+ new StateSerdes<>("dummy", Serdes.String(), Serdes.String())
+ )
+ ),
+ new LRUCacheEntry(cacheKey.getBytes())
+ )).iterator();
+
+ @Test
+ public void shouldHaveNextFromStore() throws Exception {
+ final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator
+ = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
+ assertTrue(mergeIterator.hasNext());
+ }
+
+ @Test
+ public void shouldGetNextFromStore() throws Exception {
+ final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator
+ = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
+ assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey)));
+ }
+
+ @Test
+ public void shouldPeekNextKeyFromStore() throws Exception {
+ final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator
+ = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
+ assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(storeKey, storeWindow)));
+ }
+
+ @Test
+ public void shouldHaveNextFromCache() throws Exception {
+ final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator
+ = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(),
+ cacheKvs);
+ assertTrue(mergeIterator.hasNext());
+ }
+
+ @Test
+ public void shouldGetNextFromCache() throws Exception {
+ final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator
+ = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(), cacheKvs);
+ assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey)));
+ }
+
+ @Test
+ public void shouldPeekNextKeyFromCache() throws Exception {
+ final MergedSortedCacheWindowStoreKeyValueIterator<String, String> mergeIterator
+ = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(), cacheKvs);
+ assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(cacheKey, cacheWindow)));
+ }
+
+ @Test
+ public void shouldIterateBothStoreAndCache() throws Exception {
+ final MergedSortedCacheWindowStoreKeyValueIterator<String, String> iterator = createIterator(storeKvs, cacheKvs);
+ assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey)));
+ assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey)));
+ assertFalse(iterator.hasNext());
+ }
+
+ private MergedSortedCacheWindowStoreKeyValueIterator<String, String> createIterator(
+ final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs,
+ final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs
+ ) {
+ final DelegatingPeekingKeyValueIterator<Windowed<Bytes>, byte[]> storeIterator
+ = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(storeKvs));
+
+ final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator
+ = new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs));
+ return new MergedSortedCacheWindowStoreKeyValueIterator<>(
+ cacheIterator,
+ storeIterator,
+ new StateSerdes<>("name", Serdes.String(), Serdes.String()),
+ WINDOW_SIZE,
+ SINGLE_SEGMENT_CACHE_FUNCTION
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index e37e0b4..6974240 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -18,8 +18,11 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -28,15 +31,23 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
/**
* A very simple window store stub for testing purposes.
*/
public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>, StateStore {
- private final Map<Long, Map<K, V>> data = new HashMap<>();
+ private final long windowSize;
+ private final Map<Long, NavigableMap<K, V>> data = new HashMap<>();
private boolean open = true;
+ public ReadOnlyWindowStoreStub(long windowSize) {
+ this.windowSize = windowSize;
+ }
+
@Override
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
if (!open) {
@@ -52,9 +63,54 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
return new TheWindowStoreIterator<>(results.iterator());
}
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo) {
+ if (!open) {
+ throw new InvalidStateStoreException("Store is not open");
+ }
+ final List<KeyValue<Windowed<K>, V>> results = new ArrayList<>();
+ for (long now = timeFrom; now <= timeTo; now++) {
+ final NavigableMap<K, V> kvMap = data.get(now);
+ if (kvMap != null) {
+ for (Entry<K, V> entry : kvMap.subMap(from, true, to, true).entrySet()) {
+ results.add(new KeyValue<>(new Windowed<>(entry.getKey(), new TimeWindow(now, now + windowSize)), entry.getValue()));
+ }
+ }
+ }
+ final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
+
+ return new KeyValueIterator<Windowed<K>, V>() {
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Windowed<K> peekNextKey() {
+ throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName());
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public KeyValue<Windowed<K>, V> next() {
+ return iterator.next();
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove() not supported in " + getClass().getName());
+ }
+ };
+ }
+
public void put(final K key, final V value, final long timestamp) {
if (!data.containsKey(timestamp)) {
- data.put(timestamp, new HashMap<K, V>());
+ data.put(timestamp, new TreeMap<K, V>());
}
data.get(timestamp).put(key, value);
}
@@ -123,7 +179,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
@Override
public void remove() {
-
+ throw new UnsupportedOperationException("remove() not supported in " + getClass().getName());
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e2875235/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index f5998dc..c30b0e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -168,13 +168,30 @@ public class RocksDBSessionStoreTest {
sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L);
sessionStore.put(new Windowed<>("a", new SessionWindow(0x7a00000000000000L - 2, 0x7a00000000000000L - 1)), 5L);
- final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("a", 0, Long.MAX_VALUE);
- final List<Long> results = new ArrayList<>();
+ KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("a", 0, Long.MAX_VALUE);
+ List<Long> results = new ArrayList<>();
while (iterator.hasNext()) {
results.add(iterator.next().value);
}
assertThat(results, equalTo(Arrays.asList(1L, 3L, 5L)));
+
+
+ iterator = sessionStore.findSessions("aa", 0, Long.MAX_VALUE);
+ results = new ArrayList<>();
+ while (iterator.hasNext()) {
+ results.add(iterator.next().value);
+ }
+
+ assertThat(results, equalTo(Arrays.asList(2L, 4L)));
+
+
+ final KeyValueIterator<Windowed<String>, Long> rangeIterator = sessionStore.findSessions("a", "aa", 0, Long.MAX_VALUE);
+ final List<Long> rangeResults = new ArrayList<>();
+ while (rangeIterator.hasNext()) {
+ rangeResults.add(rangeIterator.next().value);
+ }
+ assertThat(rangeResults, equalTo(Arrays.asList(1L, 3L, 2L, 4L, 5L)));
}
static List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Windowed<String>, Long> iterator) {