You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/07/12 17:57:22 UTC
[kafka] branch trunk updated: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window (#12370)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b5d4fa7645e KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window (#12370)
b5d4fa7645e is described below
commit b5d4fa7645eb75d2030eb8cac78545a681686a39
Author: Hao Li <11...@users.noreply.github.com>
AuthorDate: Tue Jul 12 10:57:11 2022 -0700
KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window (#12370)
1. Added more unit test for RocksDBTimeOrderedSessionStore and RocksDBTimeOrderedSessionSegmentedBytesStore
2. Disable cache for sliding window if emit strategy is ON_WINDOW_CLOSE
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../internals/SessionWindowedKStreamImpl.java | 3 +
.../internals/SlidingWindowedKStreamImpl.java | 4 +-
.../state/internals/PrefixedSessionKeySchemas.java | 14 +--
...cksDBTimeOrderedSessionSegmentedBytesStore.java | 8 +-
.../internals/RocksDBTimeOrderedSessionStore.java | 8 +-
...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 138 +++++++++++++++++++++
.../internals/AbstractSessionBytesStoreTest.java | 124 ++++++++++++++++++
.../state/internals/InMemorySessionStoreTest.java | 41 +-----
.../state/internals/RocksDBSessionStoreTest.java | 57 +--------
9 files changed, 291 insertions(+), 106 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index c3b05cb1182..8c60019fccb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -289,7 +289,10 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
// do not enable cache if the emit final strategy is used
if (materialized.cachingEnabled() && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
builder.withCachingEnabled();
+ } else {
+ builder.withCachingDisabled();
}
+
return builder;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
index 70c75b4c82e..5ca6b911b7c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
@@ -258,7 +258,9 @@ public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
} else {
builder.withLoggingDisabled();
}
- if (materialized.cachingEnabled()) {
+
+ // do not enable cache if the emit final strategy is used
+ if (materialized.cachingEnabled() && emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
builder.withCachingEnabled();
} else {
builder.withCachingDisabled();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
index 2ac25277ba8..3ce00bcb8a9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
@@ -102,8 +102,8 @@ public class PrefixedSessionKeySchemas {
@Override
public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom,
final Bytes binaryKeyTo,
- final long from,
- final long to,
+ final long earliestWindowEndTime,
+ final long latestWindowStartTime,
final boolean forward) {
return iterator -> {
while (iterator.hasNext()) {
@@ -120,13 +120,13 @@ public class PrefixedSessionKeySchemas {
// We can return false directly here since keys are sorted by end time and if
// we get time smaller than `from`, there won't be time within range.
- if (!forward && endTime < from) {
+ if (!forward && endTime < earliestWindowEndTime) {
return false;
}
if ((binaryKeyFrom == null || windowedKey.key().compareTo(binaryKeyFrom) >= 0)
&& (binaryKeyTo == null || windowedKey.key().compareTo(binaryKeyTo) <= 0)
- && endTime >= from && startTime <= to) {
+ && endTime >= earliestWindowEndTime && startTime <= latestWindowStartTime) {
return true;
}
iterator.next();
@@ -137,10 +137,10 @@ public class PrefixedSessionKeySchemas {
@Override
public <S extends Segment> List<S> segmentsToSearch(final Segments<S> segments,
- final long from,
- final long to,
+ final long earliestWindowEndTime,
+ final long latestWindowStartTime,
final boolean forward) {
- return segments.segments(from, Long.MAX_VALUE, forward);
+ return segments.segments(earliestWindowEndTime, Long.MAX_VALUE, forward);
}
static long extractStartTimestamp(final byte[] binaryKey) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
index 172d3218818..59e255443c0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
@@ -63,12 +63,12 @@ public class RocksDBTimeOrderedSessionSegmentedBytesStore extends AbstractRocksD
}
public byte[] fetchSession(final Bytes key,
- final long earliestSessionEndTime,
- final long latestSessionStartTime) {
+ final long sessionStartTime,
+ final long sessionEndTime) {
return get(TimeFirstSessionKeySchema.toBinary(
key,
- earliestSessionEndTime,
- latestSessionStartTime
+ sessionStartTime,
+ sessionEndTime
));
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
index deb6028ef68..62a874f06c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
@@ -122,12 +122,12 @@ public class RocksDBTimeOrderedSessionStore
@Override
public byte[] fetchSession(final Bytes key,
- final long earliestSessionEndTime,
- final long latestSessionStartTime) {
+ final long sessionStartTime,
+ final long sessiontEndTime) {
return wrapped().fetchSession(
key,
- earliestSessionEndTime,
- latestSessionStartTime
+ sessionStartTime,
+ sessiontEndTime
);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 0641392b2a3..3644e8eaa6d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -91,6 +91,7 @@ import static org.hamcrest.Matchers.hasEntry;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -827,6 +828,143 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
}
}
+ @Test
+ public void shouldFetchSessionForSingleKey() {
+ // Only for TimeFirstSessionKeySchema schema
+ if (!(getBaseSchema() instanceof TimeFirstSessionKeySchema)) {
+ return;
+ }
+
+ final String keyA = "a";
+ final String keyB = "b";
+ final String keyC = "c";
+
+ final StateSerdes<String, Long> stateSerdes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
+ final Bytes key1 = Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyA));
+ final Bytes key2 = Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyB));
+ final Bytes key3 = Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyC));
+
+ final byte[] expectedValue1 = serializeValue(10);
+ final byte[] expectedValue2 = serializeValue(50);
+ final byte[] expectedValue3 = serializeValue(100);
+ final byte[] expectedValue4 = serializeValue(200);
+
+ bytesStore.put(serializeKey(new Windowed<>(keyA, windows[0])), expectedValue1);
+ bytesStore.put(serializeKey(new Windowed<>(keyA, windows[1])), expectedValue2);
+ bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), expectedValue3);
+ bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), expectedValue4);
+
+ final byte[] value1 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
+ key1, windows[0].start(), windows[0].end());
+ assertEquals(Bytes.wrap(value1), Bytes.wrap(expectedValue1));
+
+ final byte[] value2 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
+ key1, windows[1].start(), windows[1].end());
+ assertEquals(Bytes.wrap(value2), Bytes.wrap(expectedValue2));
+
+ final byte[] value3 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
+ key2, windows[2].start(), windows[2].end());
+ assertEquals(Bytes.wrap(value3), Bytes.wrap(expectedValue3));
+
+ final byte[] value4 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
+ key3, windows[3].start(), windows[3].end());
+ assertEquals(Bytes.wrap(value4), Bytes.wrap(expectedValue4));
+
+ final byte[] noValue = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
+ key3, 2000, 3000);
+ assertNull(noValue);
+ }
+
+ @Test
+ public void shouldFetchSessionForTimeRange() {
+ // Only for TimeFirstSessionKeySchema schema
+ if (!(getBaseSchema() instanceof TimeFirstSessionKeySchema)) {
+ return;
+ }
+ final String keyA = "a";
+ final String keyB = "b";
+ final String keyC = "c";
+
+ final Window[] sessionWindows = new Window[4];
+ sessionWindows[0] = new SessionWindow(100L, 100L);
+ sessionWindows[1] = new SessionWindow(50L, 200L);
+ sessionWindows[2] = new SessionWindow(200L, 300L);
+ bytesStore.put(serializeKey(new Windowed<>(keyA, sessionWindows[0])), serializeValue(10));
+ bytesStore.put(serializeKey(new Windowed<>(keyB, sessionWindows[1])), serializeValue(100));
+ bytesStore.put(serializeKey(new Windowed<>(keyC, sessionWindows[2])), serializeValue(200));
+
+
+ // Fetch point
+ try (final KeyValueIterator<Bytes, byte[]> values = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(100L, 100L)) {
+
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
+ KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L)
+ );
+
+ assertEquals(expected, toList(values));
+ }
+
+ // Fetch partial boundary
+ try (final KeyValueIterator<Bytes, byte[]> values = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(100L, 200L)) {
+
+ final List<KeyValue<Windowed<String>, Long>> expected = asList(
+ KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L),
+ KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L)
+ );
+
+ assertEquals(expected, toList(values));
+ }
+
+ // Fetch partial
+ try (final KeyValueIterator<Bytes, byte[]> values = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(99L, 201L)) {
+ final List<KeyValue<Windowed<String>, Long>> expected = asList(
+ KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L),
+ KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L)
+ );
+
+ assertEquals(expected, toList(values));
+ }
+
+ // Fetch partial
+ try (final KeyValueIterator<Bytes, byte[]> values = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(101L, 199L)) {
+ assertTrue(toList(values).isEmpty());
+ }
+
+ // Fetch all boundary
+ try (final KeyValueIterator<Bytes, byte[]> values = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(100L, 300L)) {
+
+ final List<KeyValue<Windowed<String>, Long>> expected = asList(
+ KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L),
+ KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L),
+ KeyValue.pair(new Windowed<>(keyC, sessionWindows[2]), 200L)
+ );
+
+ assertEquals(expected, toList(values));
+ }
+
+ // Fetch all
+ try (final KeyValueIterator<Bytes, byte[]> values = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(99L, 301L)) {
+
+ final List<KeyValue<Windowed<String>, Long>> expected = asList(
+ KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L),
+ KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L),
+ KeyValue.pair(new Windowed<>(keyC, sessionWindows[2]), 200L)
+ );
+
+ assertEquals(expected, toList(values));
+ }
+
+ // Fetch all
+ try (final KeyValueIterator<Bytes, byte[]> values = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(101L, 299L)) {
+
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
+ KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L)
+ );
+
+ assertEquals(expected, toList(values));
+ }
+ }
+
@Test
public void shouldSkipAndRemoveDanglingIndex() {
final String keyA = "a";
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
index 6e93f6a7ba1..78d7f08ee84 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@@ -32,7 +33,9 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.test.InternalMockProcessorContext;
@@ -60,6 +63,7 @@ import static org.apache.kafka.test.StreamsTestUtils.valuesToSet;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -73,6 +77,13 @@ public abstract class AbstractSessionBytesStoreTest {
static final long SEGMENT_INTERVAL = 60_000L;
static final long RETENTION_PERIOD = 10_000L;
+ enum StoreType {
+ RocksDBSessionStore,
+ RocksDBTimeOrderedSessionStoreWithIndex,
+ RocksDBTimeOrderedSessionStoreWithoutIndex,
+ InMemoryStore
+ }
+
SessionStore<String, Long> sessionStore;
private MockRecordCollector recordCollector;
@@ -83,6 +94,8 @@ public abstract class AbstractSessionBytesStoreTest {
final Serde<K> keySerde,
final Serde<V> valueSerde);
+ abstract StoreType getStoreType();
+
@Before
public void setUp() {
sessionStore = buildSessionStore(RETENTION_PERIOD, Serdes.String(), Serdes.Long());
@@ -179,6 +192,75 @@ public abstract class AbstractSessionBytesStoreTest {
}
}
+ @Test
+ public void shouldFindSessionsForTimeRange() {
+ sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 5L);
+
+ if (getStoreType() == StoreType.RocksDBSessionStore) {
+ assertThrows(
+ "This API is not supported by this implementation of SessionStore.",
+ UnsupportedOperationException.class,
+ () -> sessionStore.findSessions(0, 0)
+ );
+ return;
+ }
+
+ // Find point
+ try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(0, 0)) {
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
+ KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 5L)
+ );
+ assertEquals(expected, toList(values));
+ }
+
+ sessionStore.put(new Windowed<>("b", new SessionWindow(10, 20)), 10L);
+ sessionStore.put(new Windowed<>("c", new SessionWindow(30, 40)), 20L);
+
+ // Find boundary
+ try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(0, 20)) {
+ final List<KeyValue<Windowed<String>, Long>> expected = asList(
+ KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 5L),
+ KeyValue.pair(new Windowed<>("b", new SessionWindow(10, 20)), 10L)
+ );
+ assertEquals(expected, toList(values));
+ }
+
+ // Find left boundary
+ try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(0, 19)) {
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
+ KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 5L)
+ );
+ assertEquals(expected, toList(values));
+ }
+
+ // Find right boundary
+ try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(1, 20)) {
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
+ KeyValue.pair(new Windowed<>("b", new SessionWindow(10, 20)), 10L)
+ );
+ assertEquals(expected, toList(values));
+ }
+
+ // Find partial off by 1
+ try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(19, 41)) {
+ final List<KeyValue<Windowed<String>, Long>> expected = asList(
+ KeyValue.pair(new Windowed<>("b", new SessionWindow(10, 20)), 10L),
+ KeyValue.pair(new Windowed<>("c", new SessionWindow(30, 40)), 20L)
+ );
+ assertEquals(expected, toList(values));
+ }
+
+ // Find all boundary
+ try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(0, 40)) {
+ final List<KeyValue<Windowed<String>, Long>> expected = asList(
+ KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 5L),
+ KeyValue.pair(new Windowed<>("b", new SessionWindow(10, 20)), 10L),
+ KeyValue.pair(new Windowed<>("c", new SessionWindow(30, 40)), 20L)
+ );
+ assertEquals(expected, toList(values));
+ }
+ }
+
@Test
public void shouldBackwardFetchAllSessionsWithSameRecordKey() {
final LinkedList<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>();
@@ -810,4 +892,46 @@ public abstract class AbstractSessionBytesStoreTest {
);
}
}
+
+ @Test
+ public void shouldRemoveExpired() {
+ sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
+ if (getStoreType() == StoreType.InMemoryStore) {
+ sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L);
+ sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L);
+
+ // Advance stream time to expire the first record
+ sessionStore.put(new Windowed<>("aa", new SessionWindow(10, RETENTION_PERIOD)), 4L);
+ } else {
+ sessionStore.put(new Windowed<>("aa", new SessionWindow(0, SEGMENT_INTERVAL)), 2L);
+ sessionStore.put(new Windowed<>("a", new SessionWindow(10, SEGMENT_INTERVAL)), 3L);
+
+ // Advance stream time to expire the first record
+ sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 2 * SEGMENT_INTERVAL)), 4L);
+ }
+
+ try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE)
+ ) {
+ assertEquals(valuesToSet(iterator), new HashSet<>(Arrays.asList(2L, 3L, 4L)));
+ }
+ }
+
+ @Test
+ public void shouldMatchPositionAfterPut() {
+ final MeteredSessionStore<String, Long> meteredSessionStore = (MeteredSessionStore<String, Long>) sessionStore;
+ final ChangeLoggingSessionBytesStore changeLoggingSessionBytesStore = (ChangeLoggingSessionBytesStore) meteredSessionStore.wrapped();
+ final SessionStore wrapped = (SessionStore) changeLoggingSessionBytesStore.wrapped();
+
+ context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new RecordHeaders()));
+ sessionStore.put(new Windowed<String>("a", new SessionWindow(0, 0)), 1L);
+ context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new RecordHeaders()));
+ sessionStore.put(new Windowed<String>("aa", new SessionWindow(0, 10)), 2L);
+ context.setRecordContext(new ProcessorRecordContext(0, 3, 0, "", new RecordHeaders()));
+ sessionStore.put(new Windowed<String>("a", new SessionWindow(10, 20)), 3L);
+
+ final Position expected = Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 3L)))));
+ final Position actual = sessionStore.getPosition();
+ assertThat(expected, is(actual));
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java
index 7821e2c0216..8546c546716 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java
@@ -16,12 +16,9 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
-import org.apache.kafka.streams.query.Position;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.Stores;
@@ -31,13 +28,9 @@ import java.util.Arrays;
import java.util.HashSet;
import static java.time.Duration.ofMillis;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.test.StreamsTestUtils.valuesToSet;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.hamcrest.Matchers.is;
public class InMemorySessionStoreTest extends AbstractSessionBytesStoreTest {
@@ -55,20 +48,8 @@ public class InMemorySessionStoreTest extends AbstractSessionBytesStoreTest {
valueSerde).build();
}
- @Test
- public void shouldRemoveExpired() {
- sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
- sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L);
- sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L);
-
- // Advance stream time to expire the first record
- sessionStore.put(new Windowed<>("aa", new SessionWindow(10, RETENTION_PERIOD)), 4L);
-
- try (final KeyValueIterator<Windowed<String>, Long> iterator =
- sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE)
- ) {
- assertEquals(valuesToSet(iterator), new HashSet<>(Arrays.asList(2L, 3L, 4L)));
- }
+ StoreType getStoreType() {
+ return StoreType.InMemoryStore;
}
@Test
@@ -90,22 +71,4 @@ public class InMemorySessionStoreTest extends AbstractSessionBytesStoreTest {
assertFalse(sessionStore.findSessions("a", "b", 0L, 20L).hasNext());
}
- @Test
- public void shouldMatchPositionAfterPut() {
- final MeteredSessionStore<String, Long> meteredSessionStore = (MeteredSessionStore<String, Long>) sessionStore;
- final ChangeLoggingSessionBytesStore changeLoggingSessionBytesStore = (ChangeLoggingSessionBytesStore) meteredSessionStore.wrapped();
- final InMemorySessionStore inMemorySessionStore = (InMemorySessionStore) changeLoggingSessionBytesStore.wrapped();
-
- context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new RecordHeaders()));
- sessionStore.put(new Windowed<String>("a", new SessionWindow(0, 0)), 1L);
- context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new RecordHeaders()));
- sessionStore.put(new Windowed<String>("aa", new SessionWindow(0, 10)), 2L);
- context.setRecordContext(new ProcessorRecordContext(0, 3, 0, "", new RecordHeaders()));
- sessionStore.put(new Windowed<String>("a", new SessionWindow(10, 20)), 3L);
-
- final Position expected = Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 3L)))));
- final Position actual = inMemorySessionStore.getPosition();
- assertThat(expected, is(actual));
- }
-
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index b3a749a8a37..8a849d86bcb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -17,44 +17,27 @@
package org.apache.kafka.streams.state.internals;
import java.util.Collection;
-import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionWindow;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.query.Position;
-import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.Stores;
-import org.junit.Test;
-import java.util.HashSet;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.test.StreamsTestUtils.valuesToSet;
-import static org.junit.Assert.assertEquals;
@RunWith(Parameterized.class)
public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest {
private static final String STORE_NAME = "rocksDB session store";
- enum StoreType {
- RocksDBSessionStore,
- RocksDBTimeOrderedSessionStoreWithIndex,
- RocksDBTimeOrderedSessionStoreWithoutIndex
- }
@Parameter
public StoreType storeType;
@Parameterized.Parameters(name = "{0}")
- public static Collection<Object[]> getKeySchema() {
+ public static Collection<Object[]> getParamStoreType() {
return asList(new Object[][] {
{StoreType.RocksDBSessionStore},
{StoreType.RocksDBTimeOrderedSessionStoreWithIndex},
@@ -62,6 +45,11 @@ public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest {
});
}
+ @Override
+ StoreType getStoreType() {
+ return storeType;
+ }
+
@Override
<K, V> SessionStore<K, V> buildSessionStore(final long retentionPeriod,
final Serde<K> keySerde,
@@ -102,37 +90,4 @@ public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest {
}
}
- @Test
- public void shouldRemoveExpired() {
- sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
- sessionStore.put(new Windowed<>("aa", new SessionWindow(0, SEGMENT_INTERVAL)), 2L);
- sessionStore.put(new Windowed<>("a", new SessionWindow(10, SEGMENT_INTERVAL)), 3L);
-
- // Advance stream time to expire the first record
- sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 2 * SEGMENT_INTERVAL)), 4L);
-
- try (final KeyValueIterator<Windowed<String>, Long> iterator =
- sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE)
- ) {
- assertEquals(valuesToSet(iterator), new HashSet<>(asList(2L, 3L, 4L)));
- }
- }
-
- @Test
- public void shouldMatchPositionAfterPut() {
- final MeteredSessionStore<String, Long> meteredSessionStore = (MeteredSessionStore<String, Long>) sessionStore;
- final ChangeLoggingSessionBytesStore changeLoggingSessionBytesStore = (ChangeLoggingSessionBytesStore) meteredSessionStore.wrapped();
- final WrappedStateStore rocksDBSessionStore = (WrappedStateStore) changeLoggingSessionBytesStore.wrapped();
-
- context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new RecordHeaders()));
- sessionStore.put(new Windowed<String>("a", new SessionWindow(0, 0)), 1L);
- context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new RecordHeaders()));
- sessionStore.put(new Windowed<String>("aa", new SessionWindow(0, SEGMENT_INTERVAL)), 2L);
- context.setRecordContext(new ProcessorRecordContext(0, 3, 0, "", new RecordHeaders()));
- sessionStore.put(new Windowed<String>("a", new SessionWindow(10, SEGMENT_INTERVAL)), 3L);
-
- final Position expected = Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 3L)))));
- final Position actual = rocksDBSessionStore.getPosition();
- assertEquals(expected, actual);
- }
}
\ No newline at end of file