You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/21 23:12:03 UTC

kafka git commit: KAFKA-4863 Follow-up: Querying window store may return unwanted keys

Repository: kafka
Updated Branches:
  refs/heads/trunk fef7fca2a -> c74eec96c


KAFKA-4863 Follow-up: Querying window store may return unwanted keys

iterate over all keys returned from the rocksdb iterator so we don't miss any results

Author: Damian Guy <da...@gmail.com>

Reviewers: Xavier L�aut�, Guozhang Wang

Closes #2713 from dguy/window-iter


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c74eec96
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c74eec96
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c74eec96

Branch: refs/heads/trunk
Commit: c74eec96c5e8828a88df67f6eb21d2369b340a22
Parents: fef7fca
Author: Damian Guy <da...@gmail.com>
Authored: Tue Mar 21 16:11:50 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Mar 21 16:11:50 2017 -0700

----------------------------------------------------------------------
 .../kstream/internals/SessionKeySerde.java      |  2 +-
 .../state/internals/SessionKeySchema.java       | 14 ++--
 .../state/internals/WindowKeySchema.java        | 12 ++--
 .../internals/RocksDBSessionStoreTest.java      | 39 +++++++++--
 .../state/internals/RocksDBWindowStoreTest.java | 47 ++++++++++++-
 .../state/internals/SessionKeySchemaTest.java   | 74 ++++++++++++++++++++
 6 files changed, 167 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c74eec96/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
index 7eb8300..7a85c77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
@@ -120,7 +120,7 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> {
         return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE);
     }
 
-    public static Window extractWindow(final byte[] binaryKey) {
+    static Window extractWindow(final byte[] binaryKey) {
         final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
         final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
         final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c74eec96/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index cd2a4f6..7d6761c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -50,15 +50,15 @@ class SessionKeySchema implements SegmentedBytesStore.KeySchema {
         return new HasNextCondition() {
             @Override
             public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
-                if (iterator.hasNext()) {
+                while (iterator.hasNext()) {
                     final Bytes bytes = iterator.peekNextKey();
-                    final Bytes keyBytes = Bytes.wrap(SessionKeySerde.extractKeyBytes(bytes.get()));
-                    if (!keyBytes.equals(binaryKey)) {
-                        return false;
+                    final Windowed<Bytes> windowedKey = SessionKeySerde.fromBytes(bytes);
+                    if (windowedKey.key().equals(binaryKey)
+                            && windowedKey.window().end() >= from
+                            && windowedKey.window().start() <= to) {
+                        return true;
                     }
-                    final long start = SessionKeySerde.extractStart(bytes.get());
-                    final long end = SessionKeySerde.extractEnd(bytes.get());
-                    return end >= from && start <= to;
+                    iterator.next();
                 }
                 return false;
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c74eec96/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index 0a89da7..76faf0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -46,14 +46,16 @@ class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
         return new HasNextCondition() {
             @Override
             public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
-                if (iterator.hasNext()) {
+                while (iterator.hasNext()) {
                     final Bytes bytes = iterator.peekNextKey();
                     final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get());
-                    if (!keyBytes.equals(binaryKey)) {
-                        return false;
-                    }
                     final long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get());
-                    return time >= from && time <= to;
+                    if (keyBytes.equals(binaryKey)
+                            && time >= from
+                            && time <= to) {
+                        return true;
+                    }
+                    iterator.next();
                 }
                 return false;
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c74eec96/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 9082da0..9be7c10 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -35,6 +35,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -42,6 +44,7 @@ import static org.junit.Assert.assertTrue;
 public class RocksDBSessionStoreTest {
 
     private SessionStore<String, Long> sessionStore;
+    private MockProcessorContext context;
 
     @Before
     public void before() {
@@ -52,11 +55,11 @@ public class RocksDBSessionStoreTest {
                                                  Serdes.String(),
                                                  Serdes.Long());
 
-        final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
-                                                                      Serdes.String(),
-                                                                      Serdes.Long(),
-                                                                      new NoOpRecordCollector(),
-                                                                      new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
+        context = new MockProcessorContext(TestUtils.tempDirectory(),
+                                           Serdes.String(),
+                                           Serdes.Long(),
+                                           new NoOpRecordCollector(),
+                                           new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
         sessionStore.init(context, sessionStore);
     }
 
@@ -144,6 +147,32 @@ public class RocksDBSessionStoreTest {
         assertFalse(results.hasNext());
     }
 
+    @Test
+    public void shouldFetchExactKeys() throws Exception {
+        final RocksDBSegmentedBytesStore bytesStore =
+                new RocksDBSegmentedBytesStore("session-store", 0x7a00000000000000L, 2, new SessionKeySchema());
+
+        sessionStore = new RocksDBSessionStore<>(bytesStore,
+                                                 Serdes.String(),
+                                                 Serdes.Long());
+
+        sessionStore.init(context, sessionStore);
+
+        sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
+        sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 2L);
+        sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L);
+        sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L);
+        sessionStore.put(new Windowed<>("a", new SessionWindow(0x7a00000000000000L - 2, 0x7a00000000000000L - 1)), 5L);
+
+        final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("a", 0, Long.MAX_VALUE);
+        final List<Long> results = new ArrayList<>();
+        while (iterator.hasNext()) {
+            results.add(iterator.next().value);
+        }
+
+        assertThat(results, equalTo(Arrays.asList(1L, 3L, 5L)));
+    }
+
     static List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Windowed<String>, Long> iterator) {
         final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
         while (iterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c74eec96/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 7352673..012c4ce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -622,11 +623,11 @@ public class RocksDBWindowStoreTest {
         final RocksDBWindowStoreSupplier<String, String> supplier =
                 new RocksDBWindowStoreSupplier<>(
                         "window",
-                        60 * 1000L * 2, 3,
+                        0x7a00000000000000L, 2,
                         true,
                         Serdes.String(),
                         Serdes.String(),
-                        windowSize,
+                        0x7a00000000000000L,
                         true,
                         Collections.<String, String>emptyMap(),
                         false);
@@ -638,12 +639,52 @@ public class RocksDBWindowStoreTest {
         windowStore.put("aa", "0002", 0);
         windowStore.put("a", "0003", 1);
         windowStore.put("aa", "0004", 1);
-        windowStore.put("a", "0005", 60000);
+        windowStore.put("a", "0005", 0x7a00000000000000L - 1);
+
 
         final List expected = Utils.mkList("0001", "0003", "0005");
         assertThat(toList(windowStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldFetchAndIterateOverExactBinaryKeys() throws Exception {
+        final RocksDBWindowStoreSupplier<Bytes, String> supplier =
+                new RocksDBWindowStoreSupplier<>(
+                        "window",
+                        60000, 2,
+                        true,
+                        Serdes.Bytes(),
+                        Serdes.String(),
+                        60000,
+                        true,
+                        Collections.<String, String>emptyMap(),
+                        false);
+
+        windowStore = supplier.get();
+        windowStore.init(context, windowStore);
+
+        final Bytes key1 = Bytes.wrap(new byte[]{0});
+        final Bytes key2 = Bytes.wrap(new byte[]{0, 0});
+        final Bytes key3 = Bytes.wrap(new byte[]{0, 0, 0});
+        windowStore.put(key1, "1", 0);
+        windowStore.put(key2, "2", 0);
+        windowStore.put(key3, "3", 0);
+        windowStore.put(key1, "4", 1);
+        windowStore.put(key2, "5", 1);
+        windowStore.put(key3, "6", 59999);
+        windowStore.put(key1, "7", 59999);
+        windowStore.put(key2, "8", 59999);
+        windowStore.put(key3, "9", 59999);
+
+        final List expectedKey1 = Utils.mkList("1", "4", "7");
+        assertThat(toList(windowStore.fetch(key1, 0, Long.MAX_VALUE)), equalTo(expectedKey1));
+        final List expectedKey2 = Utils.mkList("2", "5", "8");
+        assertThat(toList(windowStore.fetch(key2, 0, Long.MAX_VALUE)), equalTo(expectedKey2));
+        final List expectedKey3 = Utils.mkList("3", "6", "9");
+        assertThat(toList(windowStore.fetch(key3, 0, Long.MAX_VALUE)), equalTo(expectedKey3));
+    }
+
     private void putFirstBatch(final WindowStore<Integer, String> store, final long startTime, final MockProcessorContext context) {
         context.setRecordContext(createRecordContext(startTime));
         store.put(0, "zero");

http://git-wip-us.apache.org/repos/asf/kafka/blob/c74eec96/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
new file mode 100644
index 0000000..7c085dd
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.test.KeyValueIteratorStub;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+public class SessionKeySchemaTest {
+
+    private final SessionKeySchema sessionKeySchema = new SessionKeySchema();
+    private DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator;
+
+    @Before
+    public void before() {
+        final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1),
+                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0))), 2),
+                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0))), 3),
+                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(10, 20))), 4),
+                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(10, 20))), 5),
+                                                                  KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(10, 20))), 6));
+        iterator = new DelegatingPeekingKeyValueIterator<>("foo", new KeyValueIteratorStub<>(keys.iterator()));
+    }
+
+    @Test
+    public void shouldFetchExactKeysSkippingLongerKeys() throws Exception {
+        final List<Integer> result = getValues(sessionKeySchema.hasNextCondition(Bytes.wrap(new byte[]{0}), 0, Long.MAX_VALUE));
+        assertThat(result, equalTo(Arrays.asList(2, 4)));
+    }
+
+    @Test
+    public void shouldFetchExactKeySkippingShorterKeys() throws Exception {
+        final HasNextCondition hasNextCondition = sessionKeySchema.hasNextCondition(Bytes.wrap(new byte[]{0, 0}), 0, Long.MAX_VALUE);
+        final List<Integer> results = getValues(hasNextCondition);
+        assertThat(results, equalTo(Arrays.asList(1, 5)));
+    }
+
+
+    private List<Integer> getValues(final HasNextCondition hasNextCondition) {
+        final List<Integer> results = new ArrayList<>();
+        while (hasNextCondition.hasNext(iterator)) {
+            results.add(iterator.next().value);
+        }
+        return results;
+    }
+
+}
\ No newline at end of file