You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/21 23:12:03 UTC
kafka git commit: KAFKA-4863 Follow-up: Querying window store may
return unwanted keys
Repository: kafka
Updated Branches:
refs/heads/trunk fef7fca2a -> c74eec96c
KAFKA-4863 Follow-up: Querying window store may return unwanted keys
iterate over all keys returned from the rocksdb iterator so we don't miss any results
Author: Damian Guy <da...@gmail.com>
Reviewers: Xavier L�aut�, Guozhang Wang
Closes #2713 from dguy/window-iter
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c74eec96
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c74eec96
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c74eec96
Branch: refs/heads/trunk
Commit: c74eec96c5e8828a88df67f6eb21d2369b340a22
Parents: fef7fca
Author: Damian Guy <da...@gmail.com>
Authored: Tue Mar 21 16:11:50 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Mar 21 16:11:50 2017 -0700
----------------------------------------------------------------------
.../kstream/internals/SessionKeySerde.java | 2 +-
.../state/internals/SessionKeySchema.java | 14 ++--
.../state/internals/WindowKeySchema.java | 12 ++--
.../internals/RocksDBSessionStoreTest.java | 39 +++++++++--
.../state/internals/RocksDBWindowStoreTest.java | 47 ++++++++++++-
.../state/internals/SessionKeySchemaTest.java | 74 ++++++++++++++++++++
6 files changed, 167 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c74eec96/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
index 7eb8300..7a85c77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
@@ -120,7 +120,7 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> {
return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE);
}
- public static Window extractWindow(final byte[] binaryKey) {
+ static Window extractWindow(final byte[] binaryKey) {
final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
http://git-wip-us.apache.org/repos/asf/kafka/blob/c74eec96/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index cd2a4f6..7d6761c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -50,15 +50,15 @@ class SessionKeySchema implements SegmentedBytesStore.KeySchema {
return new HasNextCondition() {
@Override
public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
- if (iterator.hasNext()) {
+ while (iterator.hasNext()) {
final Bytes bytes = iterator.peekNextKey();
- final Bytes keyBytes = Bytes.wrap(SessionKeySerde.extractKeyBytes(bytes.get()));
- if (!keyBytes.equals(binaryKey)) {
- return false;
+ final Windowed<Bytes> windowedKey = SessionKeySerde.fromBytes(bytes);
+ if (windowedKey.key().equals(binaryKey)
+ && windowedKey.window().end() >= from
+ && windowedKey.window().start() <= to) {
+ return true;
}
- final long start = SessionKeySerde.extractStart(bytes.get());
- final long end = SessionKeySerde.extractEnd(bytes.get());
- return end >= from && start <= to;
+ iterator.next();
}
return false;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c74eec96/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index 0a89da7..76faf0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -46,14 +46,16 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
return new HasNextCondition() {
@Override
public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
- if (iterator.hasNext()) {
+ while (iterator.hasNext()) {
final Bytes bytes = iterator.peekNextKey();
final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get());
- if (!keyBytes.equals(binaryKey)) {
- return false;
- }
final long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get());
- return time >= from && time <= to;
+ if (keyBytes.equals(binaryKey)
+ && time >= from
+ && time <= to) {
+ return true;
+ }
+ iterator.next();
}
return false;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c74eec96/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 9082da0..9be7c10 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -35,6 +35,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -42,6 +44,7 @@ import static org.junit.Assert.assertTrue;
public class RocksDBSessionStoreTest {
private SessionStore<String, Long> sessionStore;
+ private MockProcessorContext context;
@Before
public void before() {
@@ -52,11 +55,11 @@ public class RocksDBSessionStoreTest {
Serdes.String(),
Serdes.Long());
- final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
- Serdes.String(),
- Serdes.Long(),
- new NoOpRecordCollector(),
- new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
+ context = new MockProcessorContext(TestUtils.tempDirectory(),
+ Serdes.String(),
+ Serdes.Long(),
+ new NoOpRecordCollector(),
+ new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
sessionStore.init(context, sessionStore);
}
@@ -144,6 +147,32 @@ public class RocksDBSessionStoreTest {
assertFalse(results.hasNext());
}
+ @Test
+ public void shouldFetchExactKeys() throws Exception {
+ final RocksDBSegmentedBytesStore bytesStore =
+ new RocksDBSegmentedBytesStore("session-store", 0x7a00000000000000L, 2, new SessionKeySchema());
+
+ sessionStore = new RocksDBSessionStore<>(bytesStore,
+ Serdes.String(),
+ Serdes.Long());
+
+ sessionStore.init(context, sessionStore);
+
+ sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
+ sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 2L);
+ sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L);
+ sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L);
+ sessionStore.put(new Windowed<>("a", new SessionWindow(0x7a00000000000000L - 2, 0x7a00000000000000L - 1)), 5L);
+
+ final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("a", 0, Long.MAX_VALUE);
+ final List<Long> results = new ArrayList<>();
+ while (iterator.hasNext()) {
+ results.add(iterator.next().value);
+ }
+
+ assertThat(results, equalTo(Arrays.asList(1L, 3L, 5L)));
+ }
+
static List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Windowed<String>, Long> iterator) {
final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
while (iterator.hasNext()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c74eec96/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 7352673..012c4ce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -622,11 +623,11 @@ public class RocksDBWindowStoreTest {
final RocksDBWindowStoreSupplier<String, String> supplier =
new RocksDBWindowStoreSupplier<>(
"window",
- 60 * 1000L * 2, 3,
+ 0x7a00000000000000L, 2,
true,
Serdes.String(),
Serdes.String(),
- windowSize,
+ 0x7a00000000000000L,
true,
Collections.<String, String>emptyMap(),
false);
@@ -638,12 +639,52 @@ public class RocksDBWindowStoreTest {
windowStore.put("aa", "0002", 0);
windowStore.put("a", "0003", 1);
windowStore.put("aa", "0004", 1);
- windowStore.put("a", "0005", 60000);
+ windowStore.put("a", "0005", 0x7a00000000000000L - 1);
+
final List expected = Utils.mkList("0001", "0003", "0005");
assertThat(toList(windowStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected));
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldFetchAndIterateOverExactBinaryKeys() throws Exception {
+ final RocksDBWindowStoreSupplier<Bytes, String> supplier =
+ new RocksDBWindowStoreSupplier<>(
+ "window",
+ 60000, 2,
+ true,
+ Serdes.Bytes(),
+ Serdes.String(),
+ 60000,
+ true,
+ Collections.<String, String>emptyMap(),
+ false);
+
+ windowStore = supplier.get();
+ windowStore.init(context, windowStore);
+
+ final Bytes key1 = Bytes.wrap(new byte[]{0});
+ final Bytes key2 = Bytes.wrap(new byte[]{0, 0});
+ final Bytes key3 = Bytes.wrap(new byte[]{0, 0, 0});
+ windowStore.put(key1, "1", 0);
+ windowStore.put(key2, "2", 0);
+ windowStore.put(key3, "3", 0);
+ windowStore.put(key1, "4", 1);
+ windowStore.put(key2, "5", 1);
+ windowStore.put(key3, "6", 59999);
+ windowStore.put(key1, "7", 59999);
+ windowStore.put(key2, "8", 59999);
+ windowStore.put(key3, "9", 59999);
+
+ final List expectedKey1 = Utils.mkList("1", "4", "7");
+ assertThat(toList(windowStore.fetch(key1, 0, Long.MAX_VALUE)), equalTo(expectedKey1));
+ final List expectedKey2 = Utils.mkList("2", "5", "8");
+ assertThat(toList(windowStore.fetch(key2, 0, Long.MAX_VALUE)), equalTo(expectedKey2));
+ final List expectedKey3 = Utils.mkList("3", "6", "9");
+ assertThat(toList(windowStore.fetch(key3, 0, Long.MAX_VALUE)), equalTo(expectedKey3));
+ }
+
private void putFirstBatch(final WindowStore<Integer, String> store, final long startTime, final MockProcessorContext context) {
context.setRecordContext(createRecordContext(startTime));
store.put(0, "zero");
http://git-wip-us.apache.org/repos/asf/kafka/blob/c74eec96/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
new file mode 100644
index 0000000..7c085dd
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.test.KeyValueIteratorStub;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+public class SessionKeySchemaTest {
+
+ private final SessionKeySchema sessionKeySchema = new SessionKeySchema();
+ private DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator;
+
+ @Before
+ public void before() {
+ final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1),
+ KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0))), 2),
+ KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0))), 3),
+ KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(10, 20))), 4),
+ KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(10, 20))), 5),
+ KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(10, 20))), 6));
+ iterator = new DelegatingPeekingKeyValueIterator<>("foo", new KeyValueIteratorStub<>(keys.iterator()));
+ }
+
+ @Test
+ public void shouldFetchExactKeysSkippingLongerKeys() throws Exception {
+ final List<Integer> result = getValues(sessionKeySchema.hasNextCondition(Bytes.wrap(new byte[]{0}), 0, Long.MAX_VALUE));
+ assertThat(result, equalTo(Arrays.asList(2, 4)));
+ }
+
+ @Test
+ public void shouldFetchExactKeySkippingShorterKeys() throws Exception {
+ final HasNextCondition hasNextCondition = sessionKeySchema.hasNextCondition(Bytes.wrap(new byte[]{0, 0}), 0, Long.MAX_VALUE);
+ final List<Integer> results = getValues(hasNextCondition);
+ assertThat(results, equalTo(Arrays.asList(1, 5)));
+ }
+
+
+ private List<Integer> getValues(final HasNextCondition hasNextCondition) {
+ final List<Integer> results = new ArrayList<>();
+ while (hasNextCondition.hasNext(iterator)) {
+ results.add(iterator.next().value);
+ }
+ return results;
+ }
+
+}
\ No newline at end of file