You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/07 17:57:45 UTC
[kafka] branch trunk updated: KAFKA-6628:
RocksDBSegmentedBytesStoreTest does not cover time window serdes (#4836)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 55dd970 KAFKA-6628: RocksDBSegmentedBytesStoreTest does not cover time window serdes (#4836)
55dd970 is described below
commit 55dd97097f51e9e9a687c35afa0cfd38d60a6b45
Author: Liju John <li...@users.noreply.github.com>
AuthorDate: Mon May 7 23:27:35 2018 +0530
KAFKA-6628: RocksDBSegmentedBytesStoreTest does not cover time window serdes (#4836)
Updated RocksDBSegmentedBytesStoreTest class to include time window serdes.
Reviewers: Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
.../internals/RocksDBSegmentedBytesStoreTest.java | 227 ++++++++++++---------
1 file changed, 132 insertions(+), 95 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index bd2fa91..db6d1d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -22,16 +22,25 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Parameterized.Parameter;
+
+import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
+
import java.io.File;
import java.text.SimpleDateFormat;
@@ -51,32 +60,58 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-// TODO: this test does not cover time window serdes
+
+@RunWith(Parameterized.class)
public class RocksDBSegmentedBytesStoreTest {
- private final long retention = 60000L;
+ private final long retention = 1000;
private final int numSegments = 3;
private InternalMockProcessorContext context;
private final String storeName = "bytes-store";
private RocksDBSegmentedBytesStore bytesStore;
private File stateDir;
- private final SessionKeySchema schema = new SessionKeySchema();
+ private long windowSizeForTimeWindow = 500;
+ private final Window[] windows = new Window[4];
+
+ @Parameter
+ public SegmentedBytesStore.KeySchema schema;
+
+ @Parameters(name = "{0}")
+ public static Object[] getKeySchemas() {
+ return new Object[]{new SessionKeySchema(), new WindowKeySchema()};
+ }
@Before
public void before() {
schema.init("topic");
+
+ if (schema instanceof SessionKeySchema) {
+ windows[0] = new SessionWindow(10, 10);
+ windows[1] = new SessionWindow(500, 1000);
+ windows[2] = new SessionWindow(1000, 1500);
+ windows[3] = new SessionWindow(30000, 60000);
+ }
+ if (schema instanceof WindowKeySchema) {
+
+ windows[0] = timeWindowForSize(10, windowSizeForTimeWindow);
+ windows[1] = timeWindowForSize(500, windowSizeForTimeWindow);
+ windows[2] = timeWindowForSize(1000, windowSizeForTimeWindow);
+ windows[3] = timeWindowForSize(60000, windowSizeForTimeWindow);
+ }
+
+
bytesStore = new RocksDBSegmentedBytesStore(storeName,
- retention,
- numSegments,
- schema);
+ retention,
+ numSegments,
+ schema);
stateDir = TestUtils.tempDirectory();
context = new InternalMockProcessorContext(
- stateDir,
- Serdes.String(),
- Serdes.Long(),
- new NoOpRecordCollector(),
- new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
+ stateDir,
+ Serdes.String(),
+ Serdes.Long(),
+ new NoOpRecordCollector(),
+ new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
bytesStore.init(context, bytesStore);
}
@@ -88,94 +123,83 @@ public class RocksDBSegmentedBytesStoreTest {
@Test
public void shouldPutAndFetch() {
final String key = "a";
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(10, 10L))), serializeValue(10L));
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(500L, 1000L))), serializeValue(50L));
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(1500L, 2000L))), serializeValue(100L));
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(2500L, 3000L))), serializeValue(200L));
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(10));
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50));
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100));
+
+ final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 500);
- final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(10, 10)), 10L),
- KeyValue.pair(new Windowed<>(key, new SessionWindow(500, 1000)), 50L));
+ final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
+ KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
- final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1000L);
assertEquals(expected, toList(values));
}
@Test
public void shouldFindValuesWithinRange() {
final String key = "a";
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(1000L, 1000L))), serializeValue(10L));
- final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1L, 1999L);
- assertEquals(Collections.singletonList(KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 10L)), toList(results));
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(10));
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50));
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100));
+ final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999);
+ final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
+ KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
+
+ assertEquals(expected, toList(results));
}
+
@Test
public void shouldRemove() {
- bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(0, 1000))), serializeValue(30L));
- bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(1500, 2500))), serializeValue(50L));
+ bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), serializeValue(30));
+ bytesStore.put(serializeKey(new Windowed<>("a", windows[1])), serializeValue(50));
- bytesStore.remove(serializeKey(new Windowed<>("a", new SessionWindow(0, 1000))));
- final KeyValueIterator<Bytes, byte[]> value = bytesStore.fetch(Bytes.wrap("a".getBytes()), 0, 1000L);
+ bytesStore.remove(serializeKey(new Windowed<>("a", windows[0])));
+ final KeyValueIterator<Bytes, byte[]> value = bytesStore.fetch(Bytes.wrap("a".getBytes()), 0, 100);
assertFalse(value.hasNext());
}
+
@Test
public void shouldRollSegments() {
// just to validate directories
final Segments segments = new Segments(storeName, retention, numSegments);
final String key = "a";
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
- assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
- assertEquals(Utils.mkSet(segments.segmentName(0),
- segments.segmentName(1)), segmentDirs());
-
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(61000L, 120000L))), serializeValue(200L));
- assertEquals(Utils.mkSet(segments.segmentName(0),
- segments.segmentName(1),
- segments.segmentName(2)), segmentDirs());
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50));
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(100));
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(500));
+ assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(121000L, 180000L))), serializeValue(300L));
- assertEquals(Utils.mkSet(segments.segmentName(1),
- segments.segmentName(2),
- segments.segmentName(3)), segmentDirs());
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(1000));
+ assertEquals(Utils.mkSet(segments.segmentName(0), segments.segmentName(1)), segmentDirs());
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(181000L, 240000L))), serializeValue(400L));
- assertEquals(Utils.mkSet(segments.segmentName(2),
- segments.segmentName(3),
- segments.segmentName(4)), segmentDirs());
+ final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500));
- final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 240000));
- assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(61000L, 120000L)), 200L),
- KeyValue.pair(new Windowed<>(key, new SessionWindow(121000L, 180000L)), 300L),
- KeyValue.pair(new Windowed<>(key, new SessionWindow(181000L, 240000L)), 400L)
- ), results);
+ assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+ KeyValue.pair(new Windowed<>(key, windows[1]), 100L),
+ KeyValue.pair(new Windowed<>(key, windows[2]), 500L)), results);
}
+
@Test
public void shouldGetAllSegments() {
// just to validate directories
final Segments segments = new Segments(storeName, retention, numSegments);
final String key = "a";
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
- assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
- assertEquals(Utils.mkSet(segments.segmentName(0),
- segments.segmentName(1)), segmentDirs());
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
+ assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(61000L, 120000L))), serializeValue(200L));
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L));
assertEquals(Utils.mkSet(segments.segmentName(0),
- segments.segmentName(1),
- segments.segmentName(2)), segmentDirs());
+ segments.segmentName(1)), segmentDirs());
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
- assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
- KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L),
- KeyValue.pair(new Windowed<>(key, new SessionWindow(61000L, 120000L)), 200L)
- ), results);
+ assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+ KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
+ ), results);
}
@@ -184,22 +208,18 @@ public class RocksDBSegmentedBytesStoreTest {
// just to validate directories
final Segments segments = new Segments(storeName, retention, numSegments);
final String key = "a";
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
- assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
- assertEquals(Utils.mkSet(segments.segmentName(0),
- segments.segmentName(1)), segmentDirs());
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
+ assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(61000L, 120000L))), serializeValue(200L));
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L));
assertEquals(Utils.mkSet(segments.segmentName(0),
- segments.segmentName(1),
- segments.segmentName(2)), segmentDirs());
+ segments.segmentName(1)), segmentDirs());
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetchAll(0L, 60000L));
- assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
- KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L)
- ), results);
+ assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+ KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
+ ), results);
}
@@ -207,8 +227,9 @@ public class RocksDBSegmentedBytesStoreTest {
public void shouldLoadSegementsWithOldStyleDateFormattedName() {
final Segments segments = new Segments(storeName, retention, numSegments);
final String key = "a";
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
+
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L));
bytesStore.close();
final String firstSegmentName = segments.segmentName(0);
@@ -222,22 +243,24 @@ public class RocksDBSegmentedBytesStoreTest {
assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName));
bytesStore = new RocksDBSegmentedBytesStore(storeName,
- retention,
- numSegments,
- schema);
+ retention,
+ numSegments,
+ schema);
bytesStore.init(context, bytesStore);
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L));
- assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
- KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L))));
+ assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+ KeyValue.pair(new Windowed<>(key, windows[3]), 100L))));
}
+
@Test
public void shouldLoadSegementsWithOldStyleColonFormattedName() {
final Segments segments = new Segments(storeName, retention, numSegments);
final String key = "a";
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
+
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L));
bytesStore.close();
final String firstSegmentName = segments.segmentName(0);
@@ -247,24 +270,25 @@ public class RocksDBSegmentedBytesStoreTest {
assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName));
bytesStore = new RocksDBSegmentedBytesStore(storeName,
- retention,
- numSegments,
- schema);
+ retention,
+ numSegments,
+ schema);
bytesStore.init(context, bytesStore);
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L));
- assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
- KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L))));
+ assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+ KeyValue.pair(new Windowed<>(key, windows[3]), 100L))));
}
+
@Test
public void shouldBeAbleToWriteToReInitializedStore() {
final String key = "a";
// need to create a segment so we can attempt to write to it again.
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50));
bytesStore.close();
bytesStore.init(context, bytesStore);
- bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
+ bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(100));
}
private Set<String> segmentDirs() {
@@ -278,20 +302,33 @@ public class RocksDBSegmentedBytesStoreTest {
}
private Bytes serializeKey(final Windowed<String> key) {
- return Bytes.wrap(SessionKeySchema.toBinary(key, Serdes.String().serializer(), "dummy"));
+ final StateSerdes<String, Long> stateSerdes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
+ if (schema instanceof SessionKeySchema) {
+ return Bytes.wrap(SessionKeySchema.toBinary(key, stateSerdes.keySerializer(), "dummy"));
+ } else {
+ return WindowKeySchema.toStoreKeyBinary(key, 0, stateSerdes);
+ }
}
private List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Bytes, byte[]> iterator) {
final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
+ final StateSerdes<String, Long> stateSerdes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
while (iterator.hasNext()) {
final KeyValue<Bytes, byte[]> next = iterator.next();
- final KeyValue<Windowed<String>, Long> deserialized = KeyValue.pair(
- SessionKeySchema.from(next.key.get(), Serdes.String().deserializer(), "dummy"),
- Serdes.Long().deserializer().deserialize("dummy", next.value)
- );
- results.add(deserialized);
+ if (schema instanceof WindowKeySchema) {
+ final KeyValue<Windowed<String>, Long> deserialized = KeyValue.pair(
+ WindowKeySchema.fromStoreKey(next.key.get(), windowSizeForTimeWindow, stateSerdes),
+ stateSerdes.valueDeserializer().deserialize("dummy", next.value)
+ );
+ results.add(deserialized);
+ } else {
+ final KeyValue<Windowed<String>, Long> deserialized = KeyValue.pair(
+ SessionKeySchema.from(next.key.get(), stateSerdes.keyDeserializer(), "dummy"),
+ stateSerdes.valueDeserializer().deserialize("dummy", next.value)
+ );
+ results.add(deserialized);
+ }
}
return results;
}
-
}
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.