You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/10/13 01:40:10 UTC
[kafka] branch trunk updated: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess… (#11211)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 07c10024890 KAFKA-12960: Enforcing strict retention time for WindowStore and Sess… (#11211)
07c10024890 is described below
commit 07c10024890c0761ba47ccd4d6301e8101d8c8de
Author: vamossagar12 <sa...@gmail.com>
AuthorDate: Thu Oct 13 07:09:58 2022 +0530
KAFKA-12960: Enforcing strict retention time for WindowStore and Sess… (#11211)
WindowedStore and SessionStore do not implement a strict retention time in general. We should consider to make retention time strict: even if we still have some record in the store (due to the segmented implementation), we might want to filter expired records on-read. This might benefit PAPI users.
This PR, adds the filtering behaviour in the Metered store so that, it gets automatically applied for cases when a custom state store is implemented
Reviewer: Luke Chen <sh...@gmail.com>, A. Sophie Blee-Goldman <so...@confluent.io>, Matthias J. Sax <mj...@apache.org>
---
...stractDualSchemaRocksDBSegmentedBytesStore.java | 47 ++++-
.../AbstractRocksDBSegmentedBytesStore.java | 75 ++++++--
...tractRocksDBTimeOrderedSegmentedBytesStore.java | 70 +++++---
.../internals/RocksDBSegmentedBytesStore.java | 2 +-
.../RocksDBTimestampedSegmentedBytesStore.java | 2 +-
.../TimeWindowedKStreamIntegrationTest.java | 66 ++++---
.../KStreamSlidingWindowAggregateTest.java | 18 +-
.../internals/KStreamWindowAggregateTest.java | 61 ++++---
.../internals/SessionWindowedKStreamImplTest.java | 63 +++++--
.../internals/TimeWindowedKStreamImplTest.java | 121 +++++++++----
...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 190 +++++++++++++--------
.../AbstractRocksDBSegmentedBytesStoreTest.java | 163 +++++++++---------
.../internals/AbstractWindowBytesStoreTest.java | 4 +-
.../CachingPersistentWindowStoreTest.java | 3 +-
.../state/internals/MeteredSessionStoreTest.java | 49 ++++++
.../MeteredTimestampedWindowStoreTest.java | 1 +
.../state/internals/MeteredWindowStoreTest.java | 14 ++
.../state/internals/RocksDBSessionStoreTest.java | 93 ++++++++++
.../state/internals/RocksDBWindowStoreTest.java | 143 ++++++++++++----
19 files changed, 835 insertions(+), 350 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index 95c1d8d8c81..b446a52eb5f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -52,6 +52,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
protected final AbstractSegments<S> segments;
protected final KeySchema baseKeySchema;
protected final Optional<KeySchema> indexKeySchema;
+ private final long retentionPeriod;
protected ProcessorContext context;
@@ -66,22 +67,27 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
AbstractDualSchemaRocksDBSegmentedBytesStore(final String name,
final KeySchema baseKeySchema,
final Optional<KeySchema> indexKeySchema,
- final AbstractSegments<S> segments) {
+ final AbstractSegments<S> segments,
+ final long retentionPeriod) {
this.name = name;
this.baseKeySchema = baseKeySchema;
this.indexKeySchema = indexKeySchema;
this.segments = segments;
+ this.retentionPeriod = retentionPeriod;
}
@Override
public KeyValueIterator<Bytes, byte[]> all() {
+
+ final long actualFrom = getActualFrom(0, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
final List<S> searchSpace = segments.allSegments(true);
- final Bytes from = baseKeySchema.lowerRange(null, 0);
+ final Bytes from = baseKeySchema.lowerRange(null, actualFrom);
final Bytes to = baseKeySchema.upperRange(null, Long.MAX_VALUE);
return new SegmentIterator<>(
searchSpace.iterator(),
- baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, true),
+ baseKeySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, true),
from,
to,
true);
@@ -89,13 +95,16 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
@Override
public KeyValueIterator<Bytes, byte[]> backwardAll() {
+
+ final long actualFrom = getActualFrom(0, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
final List<S> searchSpace = segments.allSegments(false);
- final Bytes from = baseKeySchema.lowerRange(null, 0);
+ final Bytes from = baseKeySchema.lowerRange(null, actualFrom);
final Bytes to = baseKeySchema.upperRange(null, Long.MAX_VALUE);
return new SegmentIterator<>(
searchSpace.iterator(),
- baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, false),
+ baseKeySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, false),
from,
to,
false);
@@ -119,6 +128,15 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
abstract protected KeyValue<Bytes, byte[]> getIndexKeyValue(final Bytes baseKey, final byte[] baseValue);
+ // isTimeFirstWindowSchema true implies ON_WINDOW_CLOSE semantics. There's an edge case
+ // when retentionPeriod = grace Period. If we add 1, then actualFrom > to which would
+ // lead to no records being returned.
+ protected long getActualFrom(final long from, final boolean isTimeFirstWindowSchema) {
+ return isTimeFirstWindowSchema ? Math.max(from, observedStreamTime - retentionPeriod) :
+ Math.max(from, observedStreamTime - retentionPeriod + 1);
+
+ }
+
// For testing
void putIndex(final Bytes indexKey, final byte[] value) {
if (!hasIndex()) {
@@ -191,7 +209,24 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
@Override
public byte[] get(final Bytes rawKey) {
- final S segment = segments.getSegmentForTimestamp(baseKeySchema.segmentTimestamp(rawKey));
+ final long timestampFromRawKey = baseKeySchema.segmentTimestamp(rawKey);
+ // check if timestamp is expired
+
+ if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema) {
+ if (timestampFromRawKey < observedStreamTime - retentionPeriod) {
+ LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})",
+ rawKey.toString(), timestampFromRawKey, observedStreamTime - retentionPeriod);
+ return null;
+ }
+ } else {
+ if (timestampFromRawKey < observedStreamTime - retentionPeriod + 1) {
+ LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})",
+ rawKey.toString(), timestampFromRawKey, observedStreamTime - retentionPeriod + 1);
+ return null;
+ }
+ }
+
+ final S segment = segments.getSegmentForTimestamp(timestampFromRawKey);
if (segment == null) {
return null;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 13f914d075a..bcfe30b30e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -52,6 +52,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
private final String name;
private final AbstractSegments<S> segments;
private final String metricScope;
+ private final long retentionPeriod;
private final KeySchema keySchema;
private ProcessorContext context;
@@ -65,10 +66,12 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
AbstractRocksDBSegmentedBytesStore(final String name,
final String metricScope,
+ final long retentionPeriod,
final KeySchema keySchema,
final AbstractSegments<S> segments) {
this.name = name;
this.metricScope = metricScope;
+ this.retentionPeriod = retentionPeriod;
this.keySchema = keySchema;
this.segments = segments;
}
@@ -91,19 +94,30 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
final long from,
final long to,
final boolean forward) {
- final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to, forward);
+ final long actualFrom = getActualFrom(from);
- final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, from);
+ if (keySchema instanceof WindowKeySchema && to < actualFrom) {
+ LOG.debug("Returning no records for key {} as to ({}) < actualFrom ({}) ", key.toString(), to, actualFrom);
+ return KeyValueIterators.emptyIterator();
+ }
+
+ final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, to, forward);
+
+ final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, actualFrom);
final Bytes binaryTo = keySchema.upperRangeFixedSize(key, to);
return new SegmentIterator<>(
searchSpace.iterator(),
- keySchema.hasNextCondition(key, key, from, to, forward),
+ keySchema.hasNextCondition(key, key, actualFrom, to, forward),
binaryFrom,
binaryTo,
forward);
}
+ private long getActualFrom(final long from) {
+ return Math.max(from, observedStreamTime - retentionPeriod + 1);
+ }
+
@Override
public KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom,
final Bytes keyTo,
@@ -133,14 +147,21 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
return KeyValueIterators.emptyIterator();
}
- final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to, forward);
+ final long actualFrom = getActualFrom(from);
- final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from);
+ if (keySchema instanceof WindowKeySchema && to < actualFrom) {
+ LOG.debug("Returning no records for keys {}/{} as to ({}) < actualFrom ({}) ", keyFrom, keyTo, to, actualFrom);
+ return KeyValueIterators.emptyIterator();
+ }
+
+ final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, to, forward);
+
+ final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, actualFrom);
final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to);
return new SegmentIterator<>(
searchSpace.iterator(),
- keySchema.hasNextCondition(keyFrom, keyTo, from, to, forward),
+ keySchema.hasNextCondition(keyFrom, keyTo, actualFrom, to, forward),
binaryFrom,
binaryTo,
forward);
@@ -148,11 +169,12 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
@Override
public KeyValueIterator<Bytes, byte[]> all() {
- final List<S> searchSpace = segments.allSegments(true);
+ final long actualFrom = getActualFrom(0);
+ final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, Long.MAX_VALUE, true);
return new SegmentIterator<>(
searchSpace.iterator(),
- keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, true),
+ keySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, true),
null,
null,
true);
@@ -160,11 +182,13 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
@Override
public KeyValueIterator<Bytes, byte[]> backwardAll() {
- final List<S> searchSpace = segments.allSegments(false);
+ final long actualFrom = getActualFrom(0);
+
+ final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, Long.MAX_VALUE, false);
return new SegmentIterator<>(
searchSpace.iterator(),
- keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, false),
+ keySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, false),
null,
null,
false);
@@ -173,11 +197,18 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
@Override
public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
- final List<S> searchSpace = segments.segments(timeFrom, timeTo, true);
+ final long actualFrom = getActualFrom(timeFrom);
+
+ if (keySchema instanceof WindowKeySchema && timeTo < actualFrom) {
+ LOG.debug("Returning no records for as timeTo ({}) < actualFrom ({}) ", timeTo, actualFrom);
+ return KeyValueIterators.emptyIterator();
+ }
+
+ final List<S> searchSpace = segments.segments(actualFrom, timeTo, true);
return new SegmentIterator<>(
searchSpace.iterator(),
- keySchema.hasNextCondition(null, null, timeFrom, timeTo, true),
+ keySchema.hasNextCondition(null, null, actualFrom, timeTo, true),
null,
null,
true);
@@ -186,11 +217,18 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
@Override
public KeyValueIterator<Bytes, byte[]> backwardFetchAll(final long timeFrom,
final long timeTo) {
- final List<S> searchSpace = segments.segments(timeFrom, timeTo, false);
+ final long actualFrom = getActualFrom(timeFrom);
+
+ if (keySchema instanceof WindowKeySchema && timeTo < actualFrom) {
+ LOG.debug("Returning no records for as timeTo ({}) < actualFrom ({}) ", timeTo, actualFrom);
+ return KeyValueIterators.emptyIterator();
+ }
+
+ final List<S> searchSpace = segments.segments(actualFrom, timeTo, false);
return new SegmentIterator<>(
searchSpace.iterator(),
- keySchema.hasNextCondition(null, null, timeFrom, timeTo, false),
+ keySchema.hasNextCondition(null, null, actualFrom, timeTo, false),
null,
null,
false);
@@ -234,7 +272,14 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
@Override
public byte[] get(final Bytes key) {
- final S segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
+ final long timestampFromKey = keySchema.segmentTimestamp(key);
+ // check if timestamp is expired
+ if (timestampFromKey < observedStreamTime - retentionPeriod + 1) {
+ LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})",
+ key.toString(), timestampFromKey, observedStreamTime - retentionPeriod + 1);
+ return null;
+ }
+ final S segment = segments.getSegmentForTimestamp(timestampFromKey);
if (segment == null) {
return null;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
index 0398f0ca060..f8217c6d066 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
@@ -87,7 +87,7 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
cachedValue = get(baseKey);
if (cachedValue == null) {
- // Key not in base store, inconsistency happened and remove from index.
+ // Key not in base store or key is expired, inconsistency happened and remove from index.
indexIterator.next();
AbstractRocksDBTimeOrderedSegmentedBytesStore.this.removeIndex(key);
} else {
@@ -118,7 +118,7 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
final KeySchema baseKeySchema,
final Optional<KeySchema> indexKeySchema) {
super(name, baseKeySchema, indexKeySchema,
- new KeyValueSegments(name, metricsScope, retention, segmentInterval));
+ new KeyValueSegments(name, metricsScope, retention, segmentInterval), retention);
}
@Override
@@ -141,28 +141,38 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
final long from,
final long to,
final boolean forward) {
+
+ final long actualFrom = getActualFrom(from, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
+ if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && to < actualFrom) {
+ return KeyValueIterators.emptyIterator();
+ }
+
if (indexKeySchema.isPresent()) {
- final List<KeyValueSegment> searchSpace = indexKeySchema.get().segmentsToSearch(segments, from, to, forward);
+ final List<KeyValueSegment> searchSpace = indexKeySchema.get().segmentsToSearch(segments, actualFrom, to,
+ forward);
- final Bytes binaryFrom = indexKeySchema.get().lowerRangeFixedSize(key, from);
+ final Bytes binaryFrom = indexKeySchema.get().lowerRangeFixedSize(key, actualFrom);
final Bytes binaryTo = indexKeySchema.get().upperRangeFixedSize(key, to);
return getIndexToBaseStoreIterator(new SegmentIterator<>(
searchSpace.iterator(),
- indexKeySchema.get().hasNextCondition(key, key, from, to, forward),
+ indexKeySchema.get().hasNextCondition(key, key, actualFrom, to, forward),
binaryFrom,
binaryTo,
forward));
}
- final List<KeyValueSegment> searchSpace = baseKeySchema.segmentsToSearch(segments, from, to, forward);
- final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(key, from);
+ final List<KeyValueSegment> searchSpace = baseKeySchema.segmentsToSearch(segments, actualFrom, to,
+ forward);
+
+ final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(key, actualFrom);
final Bytes binaryTo = baseKeySchema.upperRangeFixedSize(key, to);
return new SegmentIterator<>(
searchSpace.iterator(),
- baseKeySchema.hasNextCondition(key, key, from, to, forward),
+ baseKeySchema.hasNextCondition(key, key, actualFrom, to, forward),
binaryFrom,
binaryTo,
forward);
@@ -197,30 +207,36 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
return KeyValueIterators.emptyIterator();
}
+ final long actualFrom = getActualFrom(from, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
+ if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && to < actualFrom) {
+ return KeyValueIterators.emptyIterator();
+ }
+
if (indexKeySchema.isPresent()) {
- final List<KeyValueSegment> searchSpace = indexKeySchema.get().segmentsToSearch(segments, from, to,
+ final List<KeyValueSegment> searchSpace = indexKeySchema.get().segmentsToSearch(segments, actualFrom, to,
forward);
- final Bytes binaryFrom = indexKeySchema.get().lowerRange(keyFrom, from);
+ final Bytes binaryFrom = indexKeySchema.get().lowerRange(keyFrom, actualFrom);
final Bytes binaryTo = indexKeySchema.get().upperRange(keyTo, to);
return getIndexToBaseStoreIterator(new SegmentIterator<>(
searchSpace.iterator(),
- indexKeySchema.get().hasNextCondition(keyFrom, keyTo, from, to, forward),
+ indexKeySchema.get().hasNextCondition(keyFrom, keyTo, actualFrom, to, forward),
binaryFrom,
binaryTo,
forward));
}
- final List<KeyValueSegment> searchSpace = baseKeySchema.segmentsToSearch(segments, from, to,
+ final List<KeyValueSegment> searchSpace = baseKeySchema.segmentsToSearch(segments, actualFrom, to,
forward);
- final Bytes binaryFrom = baseKeySchema.lowerRange(keyFrom, from);
+ final Bytes binaryFrom = baseKeySchema.lowerRange(keyFrom, actualFrom);
final Bytes binaryTo = baseKeySchema.upperRange(keyTo, to);
return new SegmentIterator<>(
searchSpace.iterator(),
- baseKeySchema.hasNextCondition(keyFrom, keyTo, from, to, forward),
+ baseKeySchema.hasNextCondition(keyFrom, keyTo, actualFrom, to, forward),
binaryFrom,
binaryTo,
forward);
@@ -235,13 +251,20 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
@Override
public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
- final List<KeyValueSegment> searchSpace = segments.segments(timeFrom, timeTo, true);
- final Bytes binaryFrom = baseKeySchema.lowerRange(null, timeFrom);
+
+ final long actualFrom = getActualFrom(timeFrom, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
+ if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && timeTo < actualFrom) {
+ return KeyValueIterators.emptyIterator();
+ }
+
+ final List<KeyValueSegment> searchSpace = segments.segments(actualFrom, timeTo, true);
+ final Bytes binaryFrom = baseKeySchema.lowerRange(null, actualFrom);
final Bytes binaryTo = baseKeySchema.upperRange(null, timeTo);
return new SegmentIterator<>(
searchSpace.iterator(),
- baseKeySchema.hasNextCondition(null, null, timeFrom, timeTo, true),
+ baseKeySchema.hasNextCondition(null, null, actualFrom, timeTo, true),
binaryFrom,
binaryTo,
true);
@@ -250,13 +273,20 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
@Override
public KeyValueIterator<Bytes, byte[]> backwardFetchAll(final long timeFrom,
final long timeTo) {
- final List<KeyValueSegment> searchSpace = segments.segments(timeFrom, timeTo, false);
- final Bytes binaryFrom = baseKeySchema.lowerRange(null, timeFrom);
+
+ final long actualFrom = getActualFrom(timeFrom, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
+ if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && timeTo < actualFrom) {
+ return KeyValueIterators.emptyIterator();
+ }
+
+ final List<KeyValueSegment> searchSpace = segments.segments(actualFrom, timeTo, false);
+ final Bytes binaryFrom = baseKeySchema.lowerRange(null, actualFrom);
final Bytes binaryTo = baseKeySchema.upperRange(null, timeTo);
return new SegmentIterator<>(
searchSpace.iterator(),
- baseKeySchema.hasNextCondition(null, null, timeFrom, timeTo, false),
+ baseKeySchema.hasNextCondition(null, null, actualFrom, timeTo, false),
binaryFrom,
binaryTo,
false);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 6c72fa64c5f..e7b7198d1cf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -23,6 +23,6 @@ public class RocksDBSegmentedBytesStore extends AbstractRocksDBSegmentedBytesSto
final long retention,
final long segmentInterval,
final KeySchema keySchema) {
- super(name, metricsScope, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval));
+ super(name, metricsScope, retention, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval));
}
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
index 7fd958c2c27..39f493c761b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
@@ -23,6 +23,6 @@ public class RocksDBTimestampedSegmentedBytesStore extends AbstractRocksDBSegmen
final long retention,
final long segmentInterval,
final KeySchema keySchema) {
- super(name, metricsScope, keySchema, new TimestampedSegments(name, metricsScope, retention, segmentInterval));
+ super(name, metricsScope, retention, keySchema, new TimestampedSegments(name, metricsScope, retention, segmentInterval));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
index e0ec6c9bbef..752bbfe4b63 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
@@ -64,6 +64,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+import java.util.Collections;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
@@ -195,22 +196,25 @@ public class TimeWindowedKStreamIntegrationTest {
startStreams();
+ // on window close
+ // observedStreamTime : 10, retentionPeriod: 10, actualFrom: 0, timeTo: 0, timeFrom: 0
+ // observedStreamTime : 15, retentionPeriod: 10, actualFrom: 5, timeTo: 5, timeFrom: 1
+ // observedStreamTime : 25, retentionPeriod: 10, actualFrom: 15, timeTo: 15, timeFrom: 6
+
final List<KeyValueTimestamp<Windowed<String>, String>> windowedMessages = receiveMessagesWithTimestamp(
- new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
- new StringDeserializer(),
- 10L,
- String.class,
- emitFinal ? 6 : 12);
+ new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
+ new StringDeserializer(),
+ 10L,
+ String.class,
+ emitFinal ? 4 : 12);
final List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
if (emitFinal) {
expectResult = asList(
- new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+2", 5),
- new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2+3", 10),
- new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4+5", 11),
- new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10L, 20L)), "0+3", 10),
- new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+5+6", 15),
- new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)), "0+6", 15)
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+2", 5),
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2+3", 10),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4+5", 11),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)), "0+6", 15)
);
} else {
expectResult = asList(
@@ -260,20 +264,22 @@ public class TimeWindowedKStreamIntegrationTest {
startStreams();
+ // on window close
+ // observedStreamTime : 15, retentionPeriod: 15, actualFrom: 0, timeTo: 0, timeFrom: 0
+ // observedStreamTime : 25, retentionPeriod: 15, actualFrom: 10, timeTo: 10, timeFrom: 1
+
final List<KeyValueTimestamp<Windowed<String>, String>> windowedMessages = receiveMessagesWithTimestamp(
new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
new StringDeserializer(),
10L,
String.class,
- emitFinal ? 6 : 13);
+ emitFinal ? 4 : 13);
final List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
if (emitFinal) {
expectResult = asList(
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+2", 5),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0L, 10L)), "0+4", 6),
- new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2+3", 10),
- new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4+5", 11),
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10L, 20L)), "0+3", 10),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+5+6", 15)
);
@@ -342,12 +348,13 @@ public class TimeWindowedKStreamIntegrationTest {
startStreams();
+ // ON_WINDOW_CLOSE expires all records.
List<KeyValueTimestamp<Windowed<String>, String>> windowedMessages = receiveMessagesWithTimestamp(
new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
new StringDeserializer(),
10L,
String.class,
- emitFinal ? 5 : 9);
+ emitFinal ? 4 : 9);
List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
if (emitFinal) {
@@ -358,8 +365,6 @@ public class TimeWindowedKStreamIntegrationTest {
5),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+L2,R2",
11),
- new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)),
- "0+L2,R2+L2,R2", 15),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)),
"0+L2,R2", 15)
);
@@ -403,22 +408,27 @@ public class TimeWindowedKStreamIntegrationTest {
// Restart
startStreams();
- windowedMessages = receiveMessagesWithTimestamp(
- new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
- new StringDeserializer(),
- 10L,
- String.class,
- 2);
-
if (emitFinal) {
- // Output just new closed window for C
- expectResult = asList(
- new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(20L, 30L)),
- "0+L3,R3", 25),
+ windowedMessages = receiveMessagesWithTimestamp(
+ new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
+ new StringDeserializer(),
+ 10L,
+ String.class,
+ 1);
+
+ // Output just new/unexpired closed window for C
+ expectResult = Collections.singletonList(
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(25L, 35L)),
"0+L3,R3", 25)
);
} else {
+ windowedMessages = receiveMessagesWithTimestamp(
+ new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
+ new StringDeserializer(),
+ 10L,
+ String.class,
+ 2);
+
expectResult = asList(
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(30L, 40L)),
"0+L3,R3", 35),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index df95103791d..796afd38845 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -1650,22 +1650,7 @@ public class KStreamSlidingWindowAggregateTest {
final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
if (emitFinal) {
- expected.put(0L, ValueAndTimestamp.make("ARSTU", 10L));
- expected.put(3L, ValueAndTimestamp.make("ASTU", 10L));
- expected.put(4L, ValueAndTimestamp.make("ATU", 10L));
- expected.put(5L, ValueAndTimestamp.make("ABTU", 15L));
- expected.put(6L, ValueAndTimestamp.make("ABCU", 16L));
- expected.put(8L, ValueAndTimestamp.make("ABCDU", 18L));
- expected.put(9L, ValueAndTimestamp.make("ABCD", 18L));
- expected.put(11L, ValueAndTimestamp.make("BCD", 18L));
- expected.put(16L, ValueAndTimestamp.make("CD", 18L));
- expected.put(17L, ValueAndTimestamp.make("D", 18L));
- expected.put(20L, ValueAndTimestamp.make("E", 30L));
- expected.put(30L, ValueAndTimestamp.make("EF", 40L));
- expected.put(31L, ValueAndTimestamp.make("F", 40L));
- expected.put(45L, ValueAndTimestamp.make("G", 55L));
- expected.put(46L, ValueAndTimestamp.make("GH", 56L));
- expected.put(48L, ValueAndTimestamp.make("GHIJ", 58L));
+ // only non-expired records
expected.put(52L, ValueAndTimestamp.make("GHIJK", 62L));
expected.put(53L, ValueAndTimestamp.make("GHIJKLMN", 63L));
expected.put(56L, ValueAndTimestamp.make("HIJKLMN", 63L));
@@ -1675,6 +1660,7 @@ public class KStreamSlidingWindowAggregateTest {
expected.put(66L, ValueAndTimestamp.make("O", 76L));
expected.put(67L, ValueAndTimestamp.make("OP", 77L));
expected.put(70L, ValueAndTimestamp.make("OPQ", 80L));
+
} else {
expected.put(0L, ValueAndTimestamp.make("ARSTU", 10L));
expected.put(3L, ValueAndTimestamp.make("ASTU", 10L));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 8af320ae705..124bb5593b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -286,11 +286,12 @@ public class KStreamWindowAggregateTest {
inputTopic1.pipeInput("A", "1", 20L);
processors.get(0).checkAndClearProcessResult(
- new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+1", 10),
- new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+2+2", 13),
- new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)), "0+3", 14),
- new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+4", 12)
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+1", 10),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+2+2", 13),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)), "0+3", 14),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+4", 12)
);
+
processors.get(1).checkAndClearProcessResult();
processors.get(2).checkAndClearProcessResult();
@@ -301,18 +302,24 @@ public class KStreamWindowAggregateTest {
inputTopic2.pipeInput("A", "a", 15L);
processors.get(0).checkAndClearProcessResult();
- processors.get(1).checkAndClearProcessResult(
- new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+a", 0),
- new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)), "0+b", 1),
- new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+c", 2)
- );
- processors.get(2).checkAndClearProcessResult(
- new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),
- "0+1+1%0+a", 9),
- new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),
- "0+2%0+b", 1),
- new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+3%0+c",
- 2));
+
+ if (withCache) {
+ processors.get(1).checkAndClearProcessResult(
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+a", 0),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)), "0+b", 1),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+c", 2)
+ );
+ processors.get(2).checkAndClearProcessResult(
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),
+ "0+1+1%0+a", 9),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),
+ "0+2%0+b", 1),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+3%0+c",
+ 2));
+ } else {
+ processors.get(0).checkAndClearProcessResult();
+ processors.get(2).checkAndClearProcessResult();
+ }
inputTopic2.pipeInput("A", "a", 5L);
inputTopic2.pipeInput("B", "b", 6L);
@@ -321,11 +328,23 @@ public class KStreamWindowAggregateTest {
inputTopic2.pipeInput("A", "a", 21L);
processors.get(0).checkAndClearProcessResult();
- processors.get(1).checkAndClearProcessResult(
- new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+a", 5),
- new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+b", 6),
- new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+d+d", 10)
- );
+ if (withCache) {
+ processors.get(1).checkAndClearProcessResult(
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+a", 5),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+b", 6),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+d+d", 10)
+ );
+ } else {
+ processors.get(1).checkAndClearProcessResult(
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+a", 0),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)), "0+b", 1),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+c", 2),
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+a", 5),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+b", 6),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+d+d", 10)
+ );
+
+ }
processors.get(2).checkAndClearProcessResult(
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+1%0+a",
10),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 3356b37c408..03ac8ea4041 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -233,12 +233,21 @@ public class SessionWindowedKStreamImplTest {
processData(driver);
final SessionStore<String, Long> store = driver.getSessionStore("count-store");
final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(store.fetch("1", "2"));
- assertThat(
- data,
- equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L),
- KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L),
- KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), 2L))));
+ if (!emitFinal) {
+ assertThat(
+ data,
+ equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L),
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L),
+ KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), 2L))));
+ } else {
+ assertThat(
+ data,
+ equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L),
+ KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), 2L))));
+
+ }
}
}
@@ -251,12 +260,21 @@ public class SessionWindowedKStreamImplTest {
final SessionStore<String, String> sessionStore = driver.getSessionStore("reduced");
final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
- assertThat(
- data,
- equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"),
- KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"),
- KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "1+2"))));
+ if (!emitFinal) {
+ assertThat(
+ data,
+ equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"),
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"),
+ KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "1+2"))));
+ } else {
+ assertThat(
+ data,
+ equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"),
+ KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "1+2"))));
+
+ }
}
}
@@ -272,12 +290,21 @@ public class SessionWindowedKStreamImplTest {
processData(driver);
final SessionStore<String, String> sessionStore = driver.getSessionStore("aggregated");
final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
- assertThat(
- data,
- equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"),
- KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"),
- KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "0+0+1+2"))));
+ if (!emitFinal) {
+ assertThat(
+ data,
+ equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"),
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"),
+ KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "0+0+1+2"))));
+ } else {
+ assertThat(
+ data,
+ equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"),
+ KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "0+0+1+2"))));
+
+ }
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index 5ac43ac8082..6aa4d17dca2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -51,6 +51,7 @@ import org.junit.Test;
import java.util.List;
import java.util.Properties;
+import java.util.Collections;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
@@ -198,6 +199,7 @@ public class TimeWindowedKStreamImplTest {
}
final ArrayList<KeyValueTimestamp<Windowed<String>, String>> processed = supplier.theCapturedProcessor().processed();
+
if (emitFinal) {
assertEquals(
asList(
@@ -238,11 +240,26 @@ public class TimeWindowedKStreamImplTest {
final List<KeyValue<Windowed<String>, Long>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
- assertThat(data, equalTo(asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L))));
+ if (withCache) {
+ // with cache returns all records (expired from underneath as well) as part of
+ // the merge process
+ assertThat(data, equalTo(asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L))));
+ } else {
+ // without cache, we get only non-expired record from underlying store.
+ if (!emitFinal) {
+ assertThat(data, equalTo(Collections.singletonList(
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L))));
+ } else {
+ assertThat(data, equalTo(asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L))));
+ }
+ }
}
{
final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
@@ -250,11 +267,24 @@ public class TimeWindowedKStreamImplTest {
final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
- assertThat(data, equalTo(asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make(2L, 15L)),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make(1L, 500L)),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make(2L, 550L)),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L)))));
+ // the same values and logic described above applies here as well.
+ if (withCache) {
+ assertThat(data, equalTo(asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make(2L, 15L)),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make(1L, 500L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make(2L, 550L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L)))));
+ } else {
+ if (!emitFinal) {
+ assertThat(data, equalTo(Collections.singletonList(
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L)))));
+ } else {
+ assertThat(data, equalTo(asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make(1L, 500L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make(2L, 550L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L)))));
+ }
+ }
}
}
}
@@ -274,22 +304,37 @@ public class TimeWindowedKStreamImplTest {
final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
- assertThat(data, equalTo(asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "10+20"),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "30"))));
+ if (withCache) {
+ // with cache returns all records (expired from underneath as well) as part of
+ // the merge process
+ assertThat(data, equalTo(asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "10+20"),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "30"))));
+ } else {
+ // without cache, we get only non-expired record from underlying store.
+ // actualFrom = observedStreamTime(1500) - retentionPeriod(1000) + 1 = 501.
+ // only 1 record is non expired and would be returned.
+ assertThat(data, equalTo(Collections.singletonList(KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "30"))));
+ }
}
{
final WindowStore<String, ValueAndTimestamp<String>> windowStore = driver.getTimestampedWindowStore("reduced");
final List<KeyValue<Windowed<String>, ValueAndTimestamp<String>>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
- assertThat(data, equalTo(asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("1+2", 15L)),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("3", 500L)),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("10+20", 550L)),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("30", 1000L)))));
+ // same logic/data as explained above.
+ if (withCache) {
+ assertThat(data, equalTo(asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("1+2", 15L)),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("3", 500L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("10+20", 550L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("30", 1000L)))));
+ } else {
+ assertThat(data, equalTo(Collections.singletonList(
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("30", 1000L)))));
+ }
}
}
}
@@ -310,22 +355,36 @@ public class TimeWindowedKStreamImplTest {
final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
- assertThat(data, equalTo(asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+10+20"),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "0+30"))));
+ if (withCache) {
+ // with cache returns all records (expired from underneath as well) as part of
+ // the merge process
+ assertThat(data, equalTo(asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+10+20"),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "0+30"))));
+ } else {
+ // without cache, we get only non-expired record from underlying store.
+ // actualFrom = observedStreamTime(1500) - retentionPeriod(1000) + 1 = 501.
+ // only 1 record is non expired and would be returned.
+ assertThat(data, equalTo(Collections
+ .singletonList(KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "0+30"))));
+ }
}
{
final WindowStore<String, ValueAndTimestamp<String>> windowStore = driver.getTimestampedWindowStore("aggregated");
final List<KeyValue<Windowed<String>, ValueAndTimestamp<String>>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
-
- assertThat(data, equalTo(asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("0+1+2", 15L)),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+3", 500L)),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+10+20", 550L)),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("0+30", 1000L)))));
+ if (withCache) {
+ assertThat(data, equalTo(asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("0+1+2", 15L)),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+3", 500L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+10+20", 550L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("0+30", 1000L)))));
+ } else {
+ assertThat(data, equalTo(Collections.singletonList(
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("0+30", 1000L)))));
+ }
}
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 3644e8eaa6d..8a44b9ab501 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -179,11 +179,10 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
-
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L)
- );
+ // For all tests, actualFrom is computed using observedStreamTime - retention + 1.
+ // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
+ // all records expired as actual from is 59001 and to is 1000
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
assertEquals(expected, toList(values));
}
@@ -191,11 +190,8 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
- );
+ // all records expired as actual from is 59001 and to is 1000
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
assertEquals(expected, toList(values));
}
@@ -203,20 +199,16 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
- );
-
+ // all records expired as actual from is 59001 and to is 1000
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
assertEquals(expected, toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+ // key B is expired as actual from is 59001
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
@@ -226,10 +218,8 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
null, null, 0, windows[3].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+ // keys A and B expired as actual from is 59001
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
@@ -251,10 +241,10 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
- );
+ // For all tests, actualFrom is computed using observedStreamTime - retention + 1.
+ // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
+ // all records expired as actual from is 59001 and to = 1000
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
assertEquals(expected, toList(values));
}
@@ -262,11 +252,8 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
- );
+ // all records expired as actual from is 59001 and to = 1000
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
assertEquals(expected, toList(values));
}
@@ -274,21 +261,17 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
- );
-
+ // all records expired as actual from is 59001 and to = 1000
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
assertEquals(expected, toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
+ // only 1 record left as actual from is 59001 and to = 60,000
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
+ KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
assertEquals(expected, toList(values));
@@ -297,11 +280,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
null, null, 0, windows[3].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
+ // only 1 record left as actual from is 59001 and to = 60,000
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
+ KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
assertEquals(expected, toList(values));
@@ -854,18 +835,24 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), expectedValue3);
bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), expectedValue4);
+ // Record expired as timestampFromRawKey = 1000 while observedStreamTime = 60,000 and retention = 1000.
final byte[] value1 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
key1, windows[0].start(), windows[0].end());
- assertEquals(Bytes.wrap(value1), Bytes.wrap(expectedValue1));
+ assertNull(value1);
+ // Record expired as timestampFromRawKey = 1000 while observedStreamTime = 60,000 and retention = 1000.
final byte[] value2 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
key1, windows[1].start(), windows[1].end());
- assertEquals(Bytes.wrap(value2), Bytes.wrap(expectedValue2));
+ assertNull(value2);
+ // expired record
+ // timestampFromRawKey = 1500 while observedStreamTime = 60,000 and retention = 1000.
final byte[] value3 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
key2, windows[2].start(), windows[2].end());
- assertEquals(Bytes.wrap(value3), Bytes.wrap(expectedValue3));
+ assertNull(value3);
+ // only non-expired record
+ // timestampFromRawKey = 60,000 while observedStreamTime = 60,000 and retention = 1000.
final byte[] value4 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
key3, windows[3].start(), windows[3].end());
assertEquals(Bytes.wrap(value4), Bytes.wrap(expectedValue4));
@@ -991,10 +978,26 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(
Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 1, 2000)) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 20L)
- );
+ final List<KeyValue<Windowed<String>, Long>> expected;
+
+ // actual from: observedStreamTime - retention + 1
+ if (getBaseSchema() instanceof TimeFirstWindowKeySchema) {
+ // For windowkeyschema, actual from is 1
+ // observed stream time = 1000. Retention Period = 1000.
+ // actual from = (1000 - 1000 + 1)
+ // and search happens in the range 1-2000
+ expected = asList(
+ KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
+ KeyValue.pair(new Windowed<>(keyB, windows[2]), 20L)
+ );
+ } else {
+ // For session key schema, actual from is 501
+ // observed stream time = 1500. Retention Period = 1000.
+ // actual from = (1500 - 1000 + 1)
+ // and search happens in the range 501-2000
+ expected = Collections.singletonList(KeyValue.pair(new Windowed<>(keyB, windows[2]), 20L));
+ }
+
assertEquals(expected, toList(results));
}
@@ -1010,11 +1013,27 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(10));
bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50));
bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100));
+ // actual from: observedStreamTime - retention + 1
+ // retention = 1000
try (final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999)) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
- );
+
+ final List<KeyValue<Windowed<String>, Long>> expected;
+
+ // actual from: observedStreamTime - retention + 1
+ if (getBaseSchema() instanceof TimeFirstWindowKeySchema) {
+ // For windowkeyschema, actual from is 1
+ // observed stream time = 1000. actual from = (1000 - 1000 + 1)
+ // and search happens in the range 1-2000
+ expected = asList(
+ KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
+ KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
+ );
+ } else {
+ // For session key schema, actual from is 501
+ // observed stream time = 1500. actual from = (1500 - 1000 + 1)
+ // and search happens in the range 501-2000 deeming first record as expired.
+ expected = Collections.singletonList(KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
+ }
assertEquals(expected, toList(results));
}
@@ -1054,15 +1073,24 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500));
+ // For all tests, actualFrom is computed using observedStreamTime - retention + 1.
+ // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
+ // don't return expired records.
assertEquals(
- asList(
- KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
- KeyValue.pair(new Windowed<>(key, windows[1]), 100L),
- KeyValue.pair(new Windowed<>(key, windows[2]), 500L)
- ),
+ Collections.emptyList(),
results
);
+ final List<KeyValue<Windowed<String>, Long>> results1 = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 59000, 60000));
+
+ // only non expired record as actual from is 59001
+ assertEquals(
+ Collections.singletonList(
+ KeyValue.pair(new Windowed<>(key, windows[3]), 1000L)
+ ),
+ results1
+ );
+
segments.close();
}
@@ -1086,9 +1114,11 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
);
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
+ // actualFrom is computed using observedStreamTime - retention + 1.
+ // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
+ // only one record returned as actual from is 59001
assertEquals(
- asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 50L),
+ Collections.singletonList(
KeyValue.pair(new Windowed<>(keyB, windows[3]), 100L)
),
results
@@ -1115,12 +1145,13 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
),
segmentDirs()
);
-
+ // For all tests, actualFrom is computed using observedStreamTime - retention + 1.
+ // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
+ // key A expired as actual from is 59,001
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.backwardAll());
assertEquals(
- asList(
- KeyValue.pair(new Windowed<>(keyB, windows[3]), 100L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 50L)
+ Collections.singletonList(
+ KeyValue.pair(new Windowed<>(keyB, windows[3]), 100L)
),
results
);
@@ -1147,9 +1178,11 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
);
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetchAll(0L, 60_000L));
+ // For all tests, actualFrom is computed using observedStreamTime - retention + 1.
+ // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
+ // only 1 record fetched as actual from is 59001
assertEquals(
- asList(
- KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+ Collections.singletonList(
KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
),
results
@@ -1277,9 +1310,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
assertEquals(2, bytesStore.getSegments().size());
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
- expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L));
+ // after restoration, only 1 record should be returned as actual from is 59001 and the prior record is expired.
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
assertEquals(expected, results);
}
@@ -1332,10 +1365,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
final String key = "a";
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
- expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
- expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L));
+ // after restoration, only non expired segments should be returned which is one as actual from is 59001
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
assertEquals(expected, results);
assertThat(bytesStore.getPosition(), Matchers.notNullValue());
@@ -1368,9 +1400,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
assertEquals(2, bytesStore.getSegments().size());
final String key = "a";
+
+ // only non expired record as actual from is 59001
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
- expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
- expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L));
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
@@ -1407,7 +1439,15 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
assertEquals(1, bytesStore.getSegments().size());
final String key = "a";
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
- expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
+
+ // actual from = observedStreamTime - retention + 1.
+ // retention = 1000
+ if (getBaseSchema() instanceof TimeFirstWindowKeySchema) {
+ // For window stores, observedSteam = 1000 => actualFrom = 1
+ // For session stores, observedSteam = 1500 => actualFrom = 501 which deems
+ // the below record as expired.
+ expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
+ }
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
assertEquals(expected, results);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 32e33860ec9..b82f544026e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -171,44 +171,30 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
-
- final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L)
- );
-
- assertEquals(expected, toList(values));
+ // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+ assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
-
- final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
- );
-
- assertEquals(expected, toList(values));
+ // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+ assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
-
- final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
- );
-
- assertEquals(expected, toList(values));
+ // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+ assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
-
- final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+ // Only 1 record not expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
@@ -217,11 +203,9 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
null, null, 0, windows[3].start())) {
-
- final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+ // Only 1 record not expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
@@ -242,44 +226,33 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
- );
-
- assertEquals(expected, toList(values));
+ // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+ assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
- );
-
- assertEquals(expected, toList(values));
+ // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+ assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
- );
-
- assertEquals(expected, toList(values));
+ // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+ assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
-
- final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
- KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
+ // Only 1 record not expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
+ KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
assertEquals(expected, toList(values));
@@ -287,12 +260,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
null, null, 0, windows[3].start())) {
-
- final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
- KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
+ // Only 1 record not expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
+ final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
+ KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
assertEquals(expected, toList(values));
@@ -306,10 +277,18 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50));
bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100));
try (final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999)) {
- final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
- KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
- );
+ final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
+ /*
+ * For WindowKeySchema, the observedStreamTime is 1000 which means 1 extra record gets returned while for
+ * SessionKeySchema, it's 1500. Which changes the actual-from while fetching. In case of SessionKeySchema, the
+ * fetch happens from 501-999 while for WindowKeySchema it's from 1-999.
+ */
+ if (schema instanceof SessionKeySchema) {
+ expected.add(KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
+ } else {
+ expected.add(KeyValue.pair(new Windowed<>(key, windows[0]), 10L));
+ expected.add(KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
+ }
assertEquals(expected, toList(results));
}
@@ -341,16 +320,13 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
assertEquals(Utils.mkSet(segments.segmentName(0), segments.segmentName(1)), segmentDirs());
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500));
-
+ /*
+ * All records expired as observed stream time = 60,000 which sets actual-from to 59001(60,000 - 1000 + 1). to = 1500.
+ */
assertEquals(
- Arrays.asList(
- KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
- KeyValue.pair(new Windowed<>(key, windows[1]), 100L),
- KeyValue.pair(new Windowed<>(key, windows[2]), 500L)
- ),
+ Collections.emptyList(),
results
);
-
segments.close();
}
@@ -371,11 +347,12 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
),
segmentDirs()
);
-
+ /*
+ * Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = Long.MAX.
+ */
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
assertEquals(
- Arrays.asList(
- KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+ Collections.singletonList(
KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
),
results
@@ -401,11 +378,12 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
),
segmentDirs()
);
-
+ /*
+ * Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = 60,000.
+ */
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetchAll(0L, 60_000L));
assertEquals(
- Arrays.asList(
- KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+ Collections.singletonList(
KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
),
results
@@ -529,8 +507,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
// 2 segments are created during restoration.
assertEquals(2, bytesStore.getSegments().size());
+ /*
+ * Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = Long.MAX.
+ */
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
- expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L));
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
@@ -584,9 +564,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
assertEquals(2, bytesStore.getSegments().size());
final String key = "a";
+ /*
+ * Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = Long.MAX.
+ */
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
- expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
- expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L));
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
@@ -621,9 +602,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
assertEquals(2, bytesStore.getSegments().size());
final String key = "a";
+ /*
+ * Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = Long.MAX.
+ */
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
- expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
- expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L));
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
@@ -659,11 +641,20 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
// 1 segments are created during restoration.
assertEquals(1, bytesStore.getSegments().size());
final String key = "a";
- final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
- expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
+ /*
+ * For WindowKeySchema, the observedStreamTime is 1000 which means 1 extra record gets returned while for
+ * SessionKeySchema, it's 1500. Which changes the actual-from while fetching. In case of SessionKeySchema, the
+ * fetch happens from 501 to end while for WindowKeySchema it's from 1 to end.
+ */
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
- assertEquals(expected, results);
+ if (schema instanceof SessionKeySchema) {
+ assertEquals(Collections.emptyList(), results);
+ } else {
+ final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
+ expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
+ assertEquals(expected, results);
+ }
assertThat(bytesStore.getPosition(), Matchers.notNullValue());
assertThat(bytesStore.getPosition().getPartitionPositions("A"), hasEntry(0, 2L));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index e93f758c5cf..c69dedc91db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -342,7 +342,7 @@ public abstract class AbstractWindowBytesStoreTest {
);
assertEquals(
asList(zero, one, two, three),
- toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime + 0), ofEpochMilli(defaultStartTime + 3)))
+ toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime), ofEpochMilli(defaultStartTime + 3)))
);
assertEquals(
asList(one, two, three, four, five),
@@ -360,7 +360,7 @@ public abstract class AbstractWindowBytesStoreTest {
);
assertEquals(
asList(three, two, one, zero),
- toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime + 0), ofEpochMilli(defaultStartTime + 3)))
+ toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime), ofEpochMilli(defaultStartTime + 3)))
);
assertEquals(
asList(five, four, three, two, one),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
index 83136c33e81..72779314e85 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
@@ -101,7 +101,8 @@ public class CachingPersistentWindowStoreTest {
@Before
public void setUp() {
keySchema = new WindowKeySchema();
- bytesStore = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0, SEGMENT_INTERVAL, keySchema);
+ ///KAFKA-12960: Adding a retention of 100 ms to make all test cases work as is.
+ bytesStore = new RocksDBSegmentedBytesStore("test", "metrics-scope", 100, SEGMENT_INTERVAL, keySchema);
underlyingStore = new RocksDBWindowStore(bytesStore, false, WINDOW_SIZE);
final TimeWindowedDeserializer<String> keyDeserializer = new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE);
keyDeserializer.setIsChangelogTopic(true);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 922608d4994..044f9484378 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -95,6 +95,7 @@ public class MeteredSessionStoreTest {
private static final byte[] VALUE_BYTES = VALUE.getBytes();
private static final long START_TIMESTAMP = 24L;
private static final long END_TIMESTAMP = 42L;
+ private static final int RETENTION_PERIOD = 100;
private final String threadId = Thread.currentThread().getName();
private final TaskId taskId = new TaskId(0, 0, "My-Topology");
@@ -429,6 +430,54 @@ public class MeteredSessionStoreTest {
verify(innerStore);
}
+ @Test
+ public void shouldReturnNoSessionsWhenFetchedKeyHasExpired() {
+ final long systemTime = Time.SYSTEM.milliseconds();
+ expect(innerStore.findSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime))
+ .andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+ init();
+
+ final KeyValueIterator<Windowed<String>, String> iterator = store.findSessions(KEY, systemTime - RETENTION_PERIOD, systemTime);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+ }
+
+ @Test
+ public void shouldReturnNoSessionsInBackwardOrderWhenFetchedKeyHasExpired() {
+ final long systemTime = Time.SYSTEM.milliseconds();
+ expect(innerStore.backwardFindSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime))
+ .andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+ init();
+
+ final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFindSessions(KEY, systemTime - RETENTION_PERIOD, systemTime);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+ }
+
+ @Test
+ public void shouldNotFindExpiredSessionRangeFromStore() {
+ final long systemTime = Time.SYSTEM.milliseconds();
+ expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime))
+ .andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+ init();
+
+ final KeyValueIterator<Windowed<String>, String> iterator = store.findSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+ }
+
+ @Test
+ public void shouldNotFindExpiredSessionRangeInBackwardOrderFromStore() {
+ final long systemTime = Time.SYSTEM.milliseconds();
+ expect(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime))
+ .andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+ init();
+
+ final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFindSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+ }
+
@Test
public void shouldRecordRestoreTimeOnInit() {
init();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index 61a1a08975d..fcad32e0063 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -66,6 +66,7 @@ public class MeteredTimestampedWindowStoreTest {
ValueAndTimestamp.make("value", TIMESTAMP);
private static final byte[] VALUE_AND_TIMESTAMP_BYTES = "\0\0\0\0\0\0\0avalue".getBytes();
private static final int WINDOW_SIZE_MS = 10;
+ private static final int RETENTION_PERIOD = 100;
private InternalMockProcessorContext context;
private final TaskId taskId = new TaskId(0, 0, "My-Topology");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index ca6a518eb4c..20eb5ec88a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
+import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -82,6 +83,7 @@ public class MeteredWindowStoreTest {
private static final String VALUE = "value";
private static final byte[] VALUE_BYTES = VALUE.getBytes();
private static final int WINDOW_SIZE_MS = 10;
+ private static final int RETENTION_PERIOD = 100;
private static final long TIMESTAMP = 42L;
private final String threadId = Thread.currentThread().getName();
@@ -270,6 +272,18 @@ public class MeteredWindowStoreTest {
verify(innerStoreMock);
}
+ @Test
+ public void shouldReturnNoRecordWhenFetchedKeyHasExpired() {
+ expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1 + RETENTION_PERIOD))
+ .andReturn(KeyValueIterators.emptyWindowStoreIterator());
+ replay(innerStoreMock);
+
+ store.init((StateStoreContext) context, store);
+ store.fetch("a", ofEpochMilli(1), ofEpochMilli(1).plus(RETENTION_PERIOD, ChronoUnit.MILLIS)).close(); // recorded on close;
+
+ verify(innerStoreMock);
+ }
+
@Test
public void shouldFetchRangeFromInnerStoreAndRecordFetchMetrics() {
expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1))
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 8a849d86bcb..0f73518b112 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -17,16 +17,34 @@
package org.apache.kafka.streams.state.internals;
import java.util.Collection;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.Stores;
+import java.util.Collections;
+import java.util.HashSet;
+
+import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.StreamsTestUtils.valuesToSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
@RunWith(Parameterized.class)
public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest {
@@ -90,4 +108,79 @@ public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest {
}
}
+
+ @Test
+ public void shouldNotFetchExpiredSessions() {
+ final long systemTime = Time.SYSTEM.milliseconds();
+ sessionStore.put(new Windowed<>("p", new SessionWindow(systemTime - 3 * RETENTION_PERIOD, systemTime - 2 * RETENTION_PERIOD)), 1L);
+ sessionStore.put(new Windowed<>("q", new SessionWindow(systemTime - 2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD)), 4L);
+ sessionStore.put(new Windowed<>("r", new SessionWindow(systemTime - RETENTION_PERIOD, systemTime - RETENTION_PERIOD / 2)), 3L);
+ sessionStore.put(new Windowed<>("p", new SessionWindow(systemTime - RETENTION_PERIOD, systemTime - RETENTION_PERIOD / 2)), 2L);
+ try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.findSessions("p", systemTime - 2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD)
+ ) {
+ assertEquals(mkSet(2L), valuesToSet(iterator));
+ }
+ try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.backwardFindSessions("p", systemTime - 5 * RETENTION_PERIOD, systemTime - 4 * RETENTION_PERIOD)
+ ) {
+ assertFalse(iterator.hasNext());
+ }
+ try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.findSessions("p", "r", systemTime - 5 * RETENTION_PERIOD, systemTime - 4 * RETENTION_PERIOD)
+ ) {
+ assertFalse(iterator.hasNext());
+ }
+ try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.findSessions("p", "r", systemTime - RETENTION_PERIOD, systemTime - RETENTION_PERIOD / 2)
+ ) {
+ assertEquals(valuesToSet(iterator), mkSet(2L, 3L, 4L));
+ }
+ try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.findSessions("p", "r", systemTime - 2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD)
+ ) {
+ assertEquals(valuesToSet(iterator), mkSet(2L, 3L, 4L));
+ }
+ try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.backwardFindSessions("p", "r", systemTime - 2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD)
+ ) {
+ assertEquals(valuesToSet(iterator), mkSet(2L, 3L, 4L));
+ }
+ }
+
+ @Test
+ public void shouldRemoveExpired() {
+ sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
+ sessionStore.put(new Windowed<>("aa", new SessionWindow(0, SEGMENT_INTERVAL)), 2L);
+ sessionStore.put(new Windowed<>("a", new SessionWindow(10, SEGMENT_INTERVAL)), 3L);
+
+ // Advance stream time to expire the first record
+ sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 2 * SEGMENT_INTERVAL)), 4L);
+
+ try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE)
+ ) {
+ // The 2 records with values 2L and 3L are considered expired as
+ // their end times < observed stream time - retentionPeriod + 1.
+ assertEquals(valuesToSet(iterator), new HashSet<>(Collections.singletonList(4L)));
+ }
+ }
+
+ @Test
+ public void shouldMatchPositionAfterPut() {
+ final MeteredSessionStore<String, Long> meteredSessionStore = (MeteredSessionStore<String, Long>) sessionStore;
+ final ChangeLoggingSessionBytesStore changeLoggingSessionBytesStore = (ChangeLoggingSessionBytesStore) meteredSessionStore.wrapped();
+ final WrappedStateStore rocksDBSessionStore = (WrappedStateStore) changeLoggingSessionBytesStore.wrapped();
+
+ context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new RecordHeaders()));
+ sessionStore.put(new Windowed<String>("a", new SessionWindow(0, 0)), 1L);
+ context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new RecordHeaders()));
+ sessionStore.put(new Windowed<String>("aa", new SessionWindow(0, SEGMENT_INTERVAL)), 2L);
+ context.setRecordContext(new ProcessorRecordContext(0, 3, 0, "", new RecordHeaders()));
+ sessionStore.put(new Windowed<String>("a", new SessionWindow(10, SEGMENT_INTERVAL)), 3L);
+
+ final Position expected = Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 3L)))));
+ final Position actual = rocksDBSessionStore.getPosition();
+ assertEquals(expected, actual);
+ }
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index c0c7e963e6e..d2f5289a1ad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -187,19 +187,40 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
),
segmentDirs(baseDir)
);
-
+ // For all tests, for WindowStore actualFrom is computed using observedStreamTime - retention + 1.
+ // while for TimeOrderedWindowStores, actualFrom = observedStreamTime - retention
+ // expired record
assertEquals(
- new HashSet<>(Collections.singletonList("zero")),
+ new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
0,
ofEpochMilli(startTime - WINDOW_SIZE),
ofEpochMilli(startTime + WINDOW_SIZE))));
- assertEquals(
- new HashSet<>(Collections.singletonList("one")),
- valuesToSet(windowStore.fetch(
- 1,
- ofEpochMilli(startTime + increment - WINDOW_SIZE),
- ofEpochMilli(startTime + increment + WINDOW_SIZE))));
+ // RocksDbWindwStore =>
+ // from = 149997
+ // to = 150003
+ // actualFrom = 150001
+ // record one timestamp is 150,000 So, it's ignored.
+ // RocksDBTimeOrderedWindowStore*Index =>
+ // from = 149997
+ // to = 150003
+ // actualFrom = 150000, hence not ignored
+ if (storeType == StoreType.RocksDBWindowStore) {
+ assertEquals(
+ new HashSet<>(Collections.emptyList()),
+ valuesToSet(windowStore.fetch(
+ 1,
+ ofEpochMilli(startTime + increment - WINDOW_SIZE),
+ ofEpochMilli(startTime + increment + WINDOW_SIZE))));
+
+ } else {
+ assertEquals(
+ new HashSet<>(Collections.singletonList("one")),
+ valuesToSet(windowStore.fetch(
+ 1,
+ ofEpochMilli(startTime + increment - WINDOW_SIZE),
+ ofEpochMilli(startTime + increment + WINDOW_SIZE))));
+ }
assertEquals(
new HashSet<>(Collections.singletonList("two")),
valuesToSet(windowStore.fetch(
@@ -247,12 +268,32 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
1,
ofEpochMilli(startTime + increment - WINDOW_SIZE),
ofEpochMilli(startTime + increment + WINDOW_SIZE))));
- assertEquals(
- new HashSet<>(Collections.singletonList("two")),
- valuesToSet(windowStore.fetch(
- 2,
- ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE),
- ofEpochMilli(startTime + increment * 2 + WINDOW_SIZE))));
+ // RocksDbWindwStore =>
+ // from = 179997
+ // to = 180003
+ // actualFrom = 170001
+ // record one timestamp is 180,000 So, it's ignored.
+ // RocksDBTimeOrderedWindowStore*Index =>
+ // from = 179997
+ // to = 180003
+ // actualFrom = 180000, hence not ignored
+ if (storeType == StoreType.RocksDBWindowStore) {
+ assertEquals(
+ // expired record
+ new HashSet<>(Collections.emptyList()),
+ valuesToSet(windowStore.fetch(
+ 2,
+ ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE),
+ ofEpochMilli(startTime + increment * 2 + WINDOW_SIZE))));
+ } else {
+ assertEquals(
+ // expired record
+ new HashSet<>(Collections.singletonList("two")),
+ valuesToSet(windowStore.fetch(
+ 2,
+ ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE),
+ ofEpochMilli(startTime + increment * 2 + WINDOW_SIZE))));
+ }
assertEquals(
new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
@@ -301,7 +342,8 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
ofEpochMilli(startTime + increment - WINDOW_SIZE),
ofEpochMilli(startTime + increment + WINDOW_SIZE))));
assertEquals(
- new HashSet<>(Collections.singletonList("two")),
+ // expired record
+ new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
2,
ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE),
@@ -371,12 +413,24 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
3,
ofEpochMilli(startTime + increment * 3 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 3 + WINDOW_SIZE))));
- assertEquals(
- new HashSet<>(Collections.singletonList("four")),
- valuesToSet(windowStore.fetch(
- 4,
- ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
- ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
+ if (storeType == StoreType.RocksDBWindowStore) {
+ assertEquals(
+ // expired record
+ new HashSet<>(Collections.emptyList()),
+ valuesToSet(windowStore.fetch(
+ 4,
+ ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
+ ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
+ } else {
+ assertEquals(
+ // expired record
+ new HashSet<>(Collections.singletonList("four")),
+ valuesToSet(windowStore.fetch(
+ 4,
+ ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
+ ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
+
+ }
assertEquals(
new HashSet<>(Collections.singletonList("five")),
valuesToSet(windowStore.fetch(
@@ -465,7 +519,14 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
iter.next();
fetchedCount++;
}
- assertEquals(2, fetchedCount);
+ // 1 extra record is expired in the case of RocksDBWindowStore as
+ // actualFrom = observedStreamTime - retentionPeriod + 1. The +1
+ // isn't present for RocksDbTimeOrderedStoreWith*Index
+ if (storeType == StoreType.RocksDBWindowStore) {
+ assertEquals(1, fetchedCount);
+ } else {
+ assertEquals(2, fetchedCount);
+ }
assertEquals(
Utils.mkSet(segments.segmentName(1L), segments.segmentName(3L)),
@@ -480,8 +541,10 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
iter.next();
fetchedCount++;
}
- assertEquals(1, fetchedCount);
+ // the latest record has a timestamp > 60k. So, the +1 in actualFrom calculation in
+ // RocksDbWindowStore shouldn't have an implciation and all stores should return the same fetched counts.
+ assertEquals(1, fetchedCount);
assertEquals(
Utils.mkSet(segments.segmentName(3L), segments.segmentName(5L)),
segmentDirs(baseDir)
@@ -564,6 +627,9 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
Serdes.String());
windowStore.init((StateStoreContext) context, windowStore);
+ // For all tests, for WindowStore actualFrom is computed using observedStreamTime - retention + 1.
+ // while for TimeOrderedWindowStores, actualFrom = observedStreamTime - retention
+
assertEquals(
new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
@@ -650,12 +716,31 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
3,
ofEpochMilli(startTime + increment * 3 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 3 + WINDOW_SIZE))));
- assertEquals(
- new HashSet<>(Collections.singletonList("four")),
- valuesToSet(windowStore.fetch(
- 4,
- ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
- ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
+ // RocksDbWindwStore =>
+ // from = 239,997
+ // to = 240,003
+ // actualFrom = 240,001
+ // record four timestamp is 240,000 So, it's ignored.
+ // RocksDBTimeOrderedWindowStore*Index =>
+ // from = 239,997
+ // to = 240,003
+ // actualFrom = 240,000, hence not ignored
+ if (storeType == StoreType.RocksDBWindowStore) {
+ assertEquals(
+ new HashSet<>(Collections.emptyList()),
+ valuesToSet(windowStore.fetch(
+ 4,
+ ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
+ ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
+ } else {
+ assertEquals(
+ new HashSet<>(Collections.singletonList("four")),
+ valuesToSet(windowStore.fetch(
+ 4,
+ ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
+ ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
+
+ }
assertEquals(
new HashSet<>(Collections.singletonList("five")),
valuesToSet(windowStore.fetch(