You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/07 17:57:45 UTC

[kafka] branch trunk updated: KAFKA-6628: RocksDBSegmentedBytesStoreTest does not cover time window serdes (#4836)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 55dd970  KAFKA-6628: RocksDBSegmentedBytesStoreTest does not cover time window serdes (#4836)
55dd970 is described below

commit 55dd97097f51e9e9a687c35afa0cfd38d60a6b45
Author: Liju John <li...@users.noreply.github.com>
AuthorDate: Mon May 7 23:27:35 2018 +0530

    KAFKA-6628: RocksDBSegmentedBytesStoreTest does not cover time window serdes (#4836)
    
    Updated RocksDBSegmentedBytesStoreTest class to include time window serdes.
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
 .../internals/RocksDBSegmentedBytesStoreTest.java  | 227 ++++++++++++---------
 1 file changed, 132 insertions(+), 95 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index bd2fa91..db6d1d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -22,16 +22,25 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Parameterized.Parameter;
+
+import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
+
 
 import java.io.File;
 import java.text.SimpleDateFormat;
@@ -51,32 +60,58 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-// TODO: this test does not cover time window serdes
+
+@RunWith(Parameterized.class)
 public class RocksDBSegmentedBytesStoreTest {
 
-    private final long retention = 60000L;
+    private final long retention = 1000;
     private final int numSegments = 3;
     private InternalMockProcessorContext context;
     private final String storeName = "bytes-store";
     private RocksDBSegmentedBytesStore bytesStore;
     private File stateDir;
-    private final SessionKeySchema schema = new SessionKeySchema();
+    private long windowSizeForTimeWindow = 500;
+    private final Window[] windows = new Window[4];
+
+    @Parameter
+    public SegmentedBytesStore.KeySchema schema;
+
+    @Parameters(name = "{0}")
+    public static Object[] getKeySchemas() {
+        return new Object[]{new SessionKeySchema(), new WindowKeySchema()};
+    }
 
     @Before
     public void before() {
         schema.init("topic");
+
+        if (schema instanceof SessionKeySchema) {
+            windows[0] = new SessionWindow(10, 10);
+            windows[1] = new SessionWindow(500, 1000);
+            windows[2] = new SessionWindow(1000, 1500);
+            windows[3] = new SessionWindow(30000, 60000);
+        }
+        if (schema instanceof WindowKeySchema) {
+
+            windows[0] = timeWindowForSize(10, windowSizeForTimeWindow);
+            windows[1] = timeWindowForSize(500, windowSizeForTimeWindow);
+            windows[2] = timeWindowForSize(1000, windowSizeForTimeWindow);
+            windows[3] = timeWindowForSize(60000, windowSizeForTimeWindow);
+        }
+
+
         bytesStore = new RocksDBSegmentedBytesStore(storeName,
-                                                    retention,
-                                                    numSegments,
-                                                    schema);
+                retention,
+                numSegments,
+                schema);
 
         stateDir = TestUtils.tempDirectory();
         context = new InternalMockProcessorContext(
-            stateDir,
-            Serdes.String(),
-            Serdes.Long(),
-            new NoOpRecordCollector(),
-            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
+                stateDir,
+                Serdes.String(),
+                Serdes.Long(),
+                new NoOpRecordCollector(),
+                new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
         bytesStore.init(context, bytesStore);
     }
 
@@ -88,94 +123,83 @@ public class RocksDBSegmentedBytesStoreTest {
     @Test
     public void shouldPutAndFetch() {
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(10, 10L))), serializeValue(10L));
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(500L, 1000L))), serializeValue(50L));
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(1500L, 2000L))), serializeValue(100L));
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(2500L, 3000L))), serializeValue(200L));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(10));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100));
+
+        final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 500);
 
-        final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(10, 10)), 10L),
-                                                                                    KeyValue.pair(new Windowed<>(key, new SessionWindow(500, 1000)), 50L));
+        final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
+                KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
 
-        final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1000L);
         assertEquals(expected, toList(values));
     }
 
     @Test
     public void shouldFindValuesWithinRange() {
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(1000L, 1000L))), serializeValue(10L));
-        final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1L, 1999L);
-        assertEquals(Collections.singletonList(KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 10L)), toList(results));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(10));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100));
+        final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999);
+        final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
+                KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
+
+        assertEquals(expected, toList(results));
     }
 
+
     @Test
     public void shouldRemove() {
-        bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(0, 1000))), serializeValue(30L));
-        bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(1500, 2500))), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), serializeValue(30));
+        bytesStore.put(serializeKey(new Windowed<>("a", windows[1])), serializeValue(50));
 
-        bytesStore.remove(serializeKey(new Windowed<>("a", new SessionWindow(0, 1000))));
-        final KeyValueIterator<Bytes, byte[]> value = bytesStore.fetch(Bytes.wrap("a".getBytes()), 0, 1000L);
+        bytesStore.remove(serializeKey(new Windowed<>("a", windows[0])));
+        final KeyValueIterator<Bytes, byte[]> value = bytesStore.fetch(Bytes.wrap("a".getBytes()), 0, 100);
         assertFalse(value.hasNext());
     }
 
+
     @Test
     public void shouldRollSegments() {
         // just to validate directories
         final Segments segments = new Segments(storeName, retention, numSegments);
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
-        assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
-        assertEquals(Utils.mkSet(segments.segmentName(0),
-                                 segments.segmentName(1)), segmentDirs());
-
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(61000L, 120000L))), serializeValue(200L));
-        assertEquals(Utils.mkSet(segments.segmentName(0),
-                                 segments.segmentName(1),
-                                 segments.segmentName(2)), segmentDirs());
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(100));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(500));
+        assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(121000L, 180000L))), serializeValue(300L));
-        assertEquals(Utils.mkSet(segments.segmentName(1),
-                                 segments.segmentName(2),
-                                 segments.segmentName(3)), segmentDirs());
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(1000));
+        assertEquals(Utils.mkSet(segments.segmentName(0), segments.segmentName(1)), segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(181000L, 240000L))), serializeValue(400L));
-        assertEquals(Utils.mkSet(segments.segmentName(2),
-                                 segments.segmentName(3),
-                                 segments.segmentName(4)), segmentDirs());
+        final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500));
 
-        final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 240000));
-        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(61000L, 120000L)), 200L),
-                                   KeyValue.pair(new Windowed<>(key, new SessionWindow(121000L, 180000L)), 300L),
-                                   KeyValue.pair(new Windowed<>(key, new SessionWindow(181000L, 240000L)), 400L)
-                                                 ), results);
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+                KeyValue.pair(new Windowed<>(key, windows[1]), 100L),
+                KeyValue.pair(new Windowed<>(key, windows[2]), 500L)), results);
 
     }
 
+
     @Test
     public void shouldGetAllSegments() {
         // just to validate directories
         final Segments segments = new Segments(storeName, retention, numSegments);
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
-        assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
-        assertEquals(Utils.mkSet(segments.segmentName(0),
-                                 segments.segmentName(1)), segmentDirs());
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
+        assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(61000L, 120000L))), serializeValue(200L));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L));
         assertEquals(Utils.mkSet(segments.segmentName(0),
-                                 segments.segmentName(1),
-                                 segments.segmentName(2)), segmentDirs());
+                segments.segmentName(1)), segmentDirs());
 
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
-        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
-                                   KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L),
-                                   KeyValue.pair(new Windowed<>(key, new SessionWindow(61000L, 120000L)), 200L)
-                                                 ), results);
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+                KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
+        ), results);
 
     }
 
@@ -184,22 +208,18 @@ public class RocksDBSegmentedBytesStoreTest {
         // just to validate directories
         final Segments segments = new Segments(storeName, retention, numSegments);
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
-        assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
-        assertEquals(Utils.mkSet(segments.segmentName(0),
-                                 segments.segmentName(1)), segmentDirs());
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
+        assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
 
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(61000L, 120000L))), serializeValue(200L));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L));
         assertEquals(Utils.mkSet(segments.segmentName(0),
-                                 segments.segmentName(1),
-                                 segments.segmentName(2)), segmentDirs());
+                segments.segmentName(1)), segmentDirs());
 
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetchAll(0L, 60000L));
-        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
-                                   KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L)
-                                                 ), results);
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+                KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
+        ), results);
 
     }
 
@@ -207,8 +227,9 @@ public class RocksDBSegmentedBytesStoreTest {
     public void shouldLoadSegementsWithOldStyleDateFormattedName() {
         final Segments segments = new Segments(storeName, retention, numSegments);
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
+
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L));
         bytesStore.close();
 
         final String firstSegmentName = segments.segmentName(0);
@@ -222,22 +243,24 @@ public class RocksDBSegmentedBytesStoreTest {
         assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName));
 
         bytesStore = new RocksDBSegmentedBytesStore(storeName,
-                                                    retention,
-                                                    numSegments,
-                                                    schema);
+                retention,
+                numSegments,
+                schema);
 
         bytesStore.init(context, bytesStore);
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L));
-        assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
-                                                  KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L))));
+        assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+                KeyValue.pair(new Windowed<>(key, windows[3]), 100L))));
     }
 
+
     @Test
     public void shouldLoadSegementsWithOldStyleColonFormattedName() {
         final Segments segments = new Segments(storeName, retention, numSegments);
         final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
+
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L));
         bytesStore.close();
 
         final String firstSegmentName = segments.segmentName(0);
@@ -247,24 +270,25 @@ public class RocksDBSegmentedBytesStoreTest {
         assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName));
 
         bytesStore = new RocksDBSegmentedBytesStore(storeName,
-            retention,
-            numSegments,
-            schema);
+                retention,
+                numSegments,
+                schema);
 
         bytesStore.init(context, bytesStore);
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L));
-        assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
-            KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L))));
+        assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+                KeyValue.pair(new Windowed<>(key, windows[3]), 100L))));
     }
 
+
     @Test
     public void shouldBeAbleToWriteToReInitializedStore() {
         final String key = "a";
         // need to create a segment so we can attempt to write to it again.
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50));
         bytesStore.close();
         bytesStore.init(context, bytesStore);
-        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(100));
     }
 
     private Set<String> segmentDirs() {
@@ -278,20 +302,33 @@ public class RocksDBSegmentedBytesStoreTest {
     }
 
     private Bytes serializeKey(final Windowed<String> key) {
-        return Bytes.wrap(SessionKeySchema.toBinary(key, Serdes.String().serializer(), "dummy"));
+        final StateSerdes<String, Long> stateSerdes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
+        if (schema instanceof SessionKeySchema) {
+            return Bytes.wrap(SessionKeySchema.toBinary(key, stateSerdes.keySerializer(), "dummy"));
+        } else {
+            return WindowKeySchema.toStoreKeyBinary(key, 0, stateSerdes);
+        }
     }
 
     private List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Bytes, byte[]> iterator) {
         final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
+        final StateSerdes<String, Long> stateSerdes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
         while (iterator.hasNext()) {
             final KeyValue<Bytes, byte[]> next = iterator.next();
-            final KeyValue<Windowed<String>, Long> deserialized = KeyValue.pair(
-                    SessionKeySchema.from(next.key.get(), Serdes.String().deserializer(), "dummy"),
-                    Serdes.Long().deserializer().deserialize("dummy", next.value)
-            );
-            results.add(deserialized);
+            if (schema instanceof WindowKeySchema) {
+                final KeyValue<Windowed<String>, Long> deserialized = KeyValue.pair(
+                        WindowKeySchema.fromStoreKey(next.key.get(), windowSizeForTimeWindow, stateSerdes),
+                        stateSerdes.valueDeserializer().deserialize("dummy", next.value)
+                );
+                results.add(deserialized);
+            } else {
+                final KeyValue<Windowed<String>, Long> deserialized = KeyValue.pair(
+                        SessionKeySchema.from(next.key.get(), stateSerdes.keyDeserializer(), "dummy"),
+                        stateSerdes.valueDeserializer().deserialize("dummy", next.value)
+                );
+                results.add(deserialized);
+            }
         }
         return results;
     }
-
 }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.