You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/05/12 22:32:09 UTC
[kafka] branch trunk updated: KAFKA-6455: Session Aggregation
should use window-end-time as record timestamp (#6645)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8a237f5 KAFKA-6455: Session Aggregation should use window-end-time as record timestamp (#6645)
8a237f5 is described below
commit 8a237f599afa539868a138b5a2534dbf884cb4ec
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Mon May 13 00:31:44 2019 +0200
KAFKA-6455: Session Aggregation should use window-end-time as record timestamp (#6645)
For session-windows, the result record should have the window-end timestamp as record timestamp.
Rebased to resolve merge conflicts. Removed unused classes TupleForwarder and ForwardingCacheFlushListener (replace with TimestampedTupleForwarder, SessionTupleForwarder, TimestampedCacheFlushListerner, and SessionCacheFlushListener)
Reviewers: John Roesler <jo...@confluent.io>, Bruno Cadonna <br...@confluent.io>, Boyang Chen <bo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../internals/KStreamSessionWindowAggregate.java | 10 +-
...istener.java => SessionCacheFlushListener.java} | 9 +-
...leForwarder.java => SessionTupleForwarder.java} | 21 +++--
.../processor/internals/ProcessorContextImpl.java | 11 ++-
.../internals/ProcessorRecordContext.java | 6 +-
.../state/internals/AbstractStoreBuilder.java | 4 +-
.../state/internals/CachingSessionStore.java | 1 -
.../state/internals/SessionStoreBuilder.java | 19 ++--
.../KStreamAggregationIntegrationTest.java | 101 ++++++++++++++-------
.../kstream/internals/KGroupedStreamImplTest.java | 95 ++++++++++++-------
...KStreamSessionWindowAggregateProcessorTest.java | 88 ++++++++++++++----
.../internals/SessionCacheFlushListenerTest.java | 52 +++++++++++
...derTest.java => SessionTupleForwarderTest.java} | 34 ++++---
.../internals/SessionWindowedKStreamImplTest.java | 93 ++++++++++++++-----
.../kstream/internals/SuppressScenarioTest.java | 37 +++++---
.../state/internals/CachingSessionStoreTest.java | 91 ++++++++++++++-----
.../state/internals/SessionStoreBuilderTest.java | 50 ++++++----
.../java/org/apache/kafka/test/MockProcessor.java | 9 ++
.../kafka/streams/scala/kstream/KTableTest.scala | 20 +++-
19 files changed, 531 insertions(+), 220 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 3168393..68dc4a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -78,7 +78,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
private class KStreamSessionWindowAggregateProcessor extends AbstractProcessor<K, V> {
private SessionStore<K, Agg> store;
- private TupleForwarder<Windowed<K>, Agg> tupleForwarder;
+ private SessionTupleForwarder<K, Agg> tupleForwarder;
private StreamsMetricsImpl metrics;
private InternalProcessorContext internalProcessorContext;
private Sensor lateRecordDropSensor;
@@ -93,7 +93,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);
store = (SessionStore<K, Agg>) context.getStateStore(storeName);
- tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues);
+ tupleForwarder = new SessionTupleForwarder<>(store, context, new SessionCacheFlushListener<>(context), sendOldValues);
}
@Override
@@ -109,10 +109,10 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
return;
}
- observedStreamTime = Math.max(observedStreamTime, context().timestamp());
+ final long timestamp = context().timestamp();
+ observedStreamTime = Math.max(observedStreamTime, timestamp);
final long closeTime = observedStreamTime - windows.gracePeriodMs();
- final long timestamp = context().timestamp();
final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>();
final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp);
SessionWindow mergedWindow = newSessionWindow;
@@ -148,7 +148,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
context().topic(),
context().partition(),
context().offset(),
- context().timestamp(),
+ timestamp,
mergedWindow.start(),
mergedWindow.end(),
closeTime,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
similarity index 85%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
index 6c6b8fd..f40fdfe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
@@ -16,30 +16,31 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
-class ForwardingCacheFlushListener<K, V> implements CacheFlushListener<K, V> {
+class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>, V> {
private final InternalProcessorContext context;
private final ProcessorNode myNode;
- ForwardingCacheFlushListener(final ProcessorContext context) {
+ SessionCacheFlushListener(final ProcessorContext context) {
this.context = (InternalProcessorContext) context;
myNode = this.context.currentNode();
}
@Override
- public void apply(final K key,
+ public void apply(final Windowed<K> key,
final V newValue,
final V oldValue,
final long timestamp) {
final ProcessorNode prev = context.currentNode();
context.setCurrentNode(myNode);
try {
- context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(timestamp));
+ context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(key.window().end()));
} finally {
context.setCurrentNode(prev);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
similarity index 75%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
index 94b0ebd..bad255a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
@@ -16,8 +16,11 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
/**
@@ -25,29 +28,29 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore;
* Forwarding by this class only occurs when caching is not enabled. If caching is enabled,
* forwarding occurs in the flush listener when the cached store flushes.
*
- * @param <K> the type of the key
- * @param <V> the type of the value
+ * @param <K>
+ * @param <V>
*/
-class TupleForwarder<K, V> {
+class SessionTupleForwarder<K, V> {
private final ProcessorContext context;
private final boolean sendOldValues;
private final boolean cachingEnabled;
@SuppressWarnings("unchecked")
- TupleForwarder(final StateStore store,
- final ProcessorContext context,
- final ForwardingCacheFlushListener<K, V> flushListener,
- final boolean sendOldValues) {
+ SessionTupleForwarder(final StateStore store,
+ final ProcessorContext context,
+ final CacheFlushListener<Windowed<K>, V> flushListener,
+ final boolean sendOldValues) {
this.context = context;
this.sendOldValues = sendOldValues;
cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
}
- public void maybeForward(final K key,
+ public void maybeForward(final Windowed<K> key,
final V newValue,
final V oldValue) {
if (!cachingEnabled) {
- context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null));
+ context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null), To.all().withTimestamp(key.window().end()));
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 1df6610..cd2b179 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -160,12 +160,17 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
final V value,
final To to) {
final ProcessorNode previousNode = currentNode();
- final long currentTimestamp = recordContext.timestamp();
+ final ProcessorRecordContext previousContext = recordContext;
try {
toInternal.update(to);
if (toInternal.hasTimestamp()) {
- recordContext.setTimestamp(toInternal.timestamp());
+ recordContext = new ProcessorRecordContext(
+ toInternal.timestamp(),
+ recordContext.offset(),
+ recordContext.partition(),
+ recordContext.topic(),
+ recordContext.headers());
}
final String sendTo = toInternal.child();
@@ -183,7 +188,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
forward(child, key, value);
}
} finally {
- recordContext.setTimestamp(currentTimestamp);
+ recordContext = previousContext;
setCurrentNode(previousNode);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index cc512ae..1b22482 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -29,7 +29,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
public class ProcessorRecordContext implements RecordContext {
- private long timestamp;
+ private final long timestamp;
private final long offset;
private final String topic;
private final int partition;
@@ -48,10 +48,6 @@ public class ProcessorRecordContext implements RecordContext {
this.headers = headers;
}
- public void setTimestamp(final long timestamp) {
- this.timestamp = timestamp;
- }
-
@Override
public long offset() {
return offset;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
index 898db9e..fd8267b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
@@ -38,8 +38,8 @@ abstract public class AbstractStoreBuilder<K, V, T extends StateStore> implement
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time) {
- Objects.requireNonNull(name, "name can't be null");
- Objects.requireNonNull(time, "time can't be null");
+ Objects.requireNonNull(name, "name cannot be null");
+ Objects.requireNonNull(time, "time cannot be null");
this.name = name;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index adbaf4c..d48f540 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
-
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
index a40eab3..51ef319 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
+import java.util.Objects;
+
public class SessionStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, SessionStore<K, V>> {
@@ -31,26 +33,25 @@ public class SessionStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, Sessio
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time) {
- super(storeSupplier.name(), keySerde, valueSerde, time);
+ super(Objects.requireNonNull(storeSupplier, "supplier cannot be null").name(), keySerde, valueSerde, time);
this.storeSupplier = storeSupplier;
}
@Override
public SessionStore<K, V> build() {
- return new MeteredSessionStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
- storeSupplier.metricsScope(),
- keySerde,
- valueSerde,
- time);
+ return new MeteredSessionStore<>(
+ maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+ storeSupplier.metricsScope(),
+ keySerde,
+ valueSerde,
+ time);
}
private SessionStore<Bytes, byte[]> maybeWrapCaching(final SessionStore<Bytes, byte[]> inner) {
if (!enableCaching) {
return inner;
}
- return new CachingSessionStore(
- inner,
- storeSupplier.segmentIntervalMs());
+ return new CachingSessionStore(inner, storeSupplier.segmentIntervalMs());
}
private SessionStore<Bytes, byte[]> maybeWrapLogging(final SessionStore<Bytes, byte[]> inner) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index a13408e..ea3695e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@@ -57,7 +56,6 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.test.IntegrationTest;
@@ -154,7 +152,7 @@ public class KStreamAggregationIntegrationTest {
public void shouldReduce() throws Exception {
produceMessages(mockTime.milliseconds());
groupedStream
- .reduce(reducer, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("reduce-by-key"))
+ .reduce(reducer, Materialized.as("reduce-by-key"))
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
@@ -167,7 +165,7 @@ public class KStreamAggregationIntegrationTest {
new StringDeserializer(),
10);
- Collections.sort(results, KStreamAggregationIntegrationTest::compare);
+ results.sort(KStreamAggregationIntegrationTest::compare);
assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"),
KeyValue.pair("A", "A:A"),
@@ -226,7 +224,7 @@ public class KStreamAggregationIntegrationTest {
comparator =
Comparator.comparing((KeyValue<Windowed<String>, String> o) -> o.key.key()).thenComparing(o -> o.value);
- Collections.sort(windowedOutput, comparator);
+ windowedOutput.sort(comparator);
final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
final long secondBatchWindow = secondBatchTimestamp / 500 * 500;
@@ -267,7 +265,7 @@ public class KStreamAggregationIntegrationTest {
groupedStream.aggregate(
initializer,
aggregator,
- Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("aggregate-by-selected-key"))
+ Materialized.as("aggregate-by-selected-key"))
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));
@@ -280,7 +278,7 @@ public class KStreamAggregationIntegrationTest {
new IntegerDeserializer(),
10);
- Collections.sort(results, KStreamAggregationIntegrationTest::compare);
+ results.sort(KStreamAggregationIntegrationTest::compare);
assertThat(results, is(Arrays.asList(
KeyValue.pair("A", 1),
@@ -335,7 +333,7 @@ public class KStreamAggregationIntegrationTest {
comparator =
Comparator.comparing((KeyValue<Windowed<String>, KeyValue<Integer, Long>> o) -> o.key.key()).thenComparingInt(o -> o.value.key);
- Collections.sort(windowedMessages, comparator);
+ windowedMessages.sort(comparator);
final long firstWindow = firstTimestamp / 500 * 500;
final long secondWindow = secondTimestamp / 500 * 500;
@@ -381,7 +379,7 @@ public class KStreamAggregationIntegrationTest {
new StringDeserializer(),
new LongDeserializer(),
10);
- Collections.sort(results, KStreamAggregationIntegrationTest::compare);
+ results.sort(KStreamAggregationIntegrationTest::compare);
assertThat(results, is(Arrays.asList(
KeyValue.pair("A", 1L),
@@ -436,7 +434,7 @@ public class KStreamAggregationIntegrationTest {
new StringDeserializer(),
new LongDeserializer(),
10);
- Collections.sort(results, KStreamAggregationIntegrationTest::compare);
+ results.sort(KStreamAggregationIntegrationTest::compare);
final long window = timestamp / 500 * 500;
assertThat(results, is(Arrays.asList(
@@ -457,13 +455,12 @@ public class KStreamAggregationIntegrationTest {
@Test
public void shouldCountSessionWindows() throws Exception {
final long sessionGap = 5 * 60 * 1000L;
-
- final long t1 = mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
final List<KeyValue<String, String>> t1Messages = Arrays.asList(new KeyValue<>("bob", "start"),
new KeyValue<>("penny", "start"),
new KeyValue<>("jo", "pause"),
new KeyValue<>("emily", "pause"));
+ final long t1 = mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
userSessionsStream,
t1Messages,
@@ -473,7 +470,6 @@ public class KStreamAggregationIntegrationTest {
StringSerializer.class,
new Properties()),
t1);
-
final long t2 = t1 + (sessionGap / 2);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
userSessionsStream,
@@ -499,7 +495,6 @@ public class KStreamAggregationIntegrationTest {
StringSerializer.class,
new Properties()),
t3);
-
final long t4 = t3 + (sessionGap / 2);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
userSessionsStream,
@@ -513,9 +508,21 @@ public class KStreamAggregationIntegrationTest {
StringSerializer.class,
new Properties()),
t4);
+ final long t5 = t4 - 1;
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ userSessionsStream,
+ Collections.singletonList(
+ new KeyValue<>("jo", "late") // jo has late arrival
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ t5);
final Map<Windowed<String>, KeyValue<Long, Long>> results = new HashMap<>();
- final CountDownLatch latch = new CountDownLatch(11);
+ final CountDownLatch latch = new CountDownLatch(13);
builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
@@ -543,10 +550,11 @@ public class KStreamAggregationIntegrationTest {
startStreams();
latch.await(30, TimeUnit.SECONDS);
+
assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L, t1)));
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L, t1)));
assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L, t1)));
- assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo(KeyValue.pair(1L, t4)));
+ assertThat(results.get(new Windowed<>("jo", new SessionWindow(t5, t4))), equalTo(KeyValue.pair(2L, t4)));
assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo(KeyValue.pair(2L, t2)));
assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(KeyValue.pair(2L, t4)));
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(KeyValue.pair(1L, t3)));
@@ -555,13 +563,12 @@ public class KStreamAggregationIntegrationTest {
@Test
public void shouldReduceSessionWindows() throws Exception {
final long sessionGap = 1000L; // something to do with time
-
- final long t1 = mockTime.milliseconds();
final List<KeyValue<String, String>> t1Messages = Arrays.asList(new KeyValue<>("bob", "start"),
new KeyValue<>("penny", "start"),
new KeyValue<>("jo", "pause"),
new KeyValue<>("emily", "pause"));
+ final long t1 = mockTime.milliseconds();
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
userSessionsStream,
t1Messages,
@@ -571,7 +578,6 @@ public class KStreamAggregationIntegrationTest {
StringSerializer.class,
new Properties()),
t1);
-
final long t2 = t1 + (sessionGap / 2);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
userSessionsStream,
@@ -597,7 +603,6 @@ public class KStreamAggregationIntegrationTest {
StringSerializer.class,
new Properties()),
t3);
-
final long t4 = t3 + (sessionGap / 2);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
userSessionsStream,
@@ -611,40 +616,65 @@ public class KStreamAggregationIntegrationTest {
StringSerializer.class,
new Properties()),
t4);
+ final long t5 = t4 - 1;
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ userSessionsStream,
+ Collections.singletonList(
+ new KeyValue<>("jo", "late") // jo has late arrival
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ t5);
- final Map<Windowed<String>, String> results = new HashMap<>();
- final CountDownLatch latch = new CountDownLatch(11);
+ final Map<Windowed<String>, KeyValue<String, Long>> results = new HashMap<>();
+ final CountDownLatch latch = new CountDownLatch(13);
final String userSessionsStore = "UserSessionsStore";
builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.with(ofMillis(sessionGap)))
.reduce((value1, value2) -> value1 + ":" + value2, Materialized.as(userSessionsStore))
.toStream()
- .foreach((key, value) -> {
- results.put(key, value);
+ .transform(() -> new Transformer<Windowed<String>, String, KeyValue<Object, Object>>() {
+ private ProcessorContext context;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public KeyValue<Object, Object> transform(final Windowed<String> key, final String value) {
+ results.put(key, KeyValue.pair(value, context.timestamp()));
latch.countDown();
- });
+ return null;
+ }
+
+ @Override
+ public void close() {}
+ });
startStreams();
latch.await(30, TimeUnit.SECONDS);
- final ReadOnlySessionStore<String, String> sessionStore =
- kafkaStreams.store(userSessionsStore, QueryableStoreTypes.sessionStore());
// verify correct data received
- assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo("start"));
- assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo("start"));
- assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo("pause"));
- assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo("resume"));
- assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo("pause:resume"));
- assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo("pause:resume"));
- assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo("stop"));
+ assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(KeyValue.pair("start", t1)));
+ assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo(KeyValue.pair("start", t1)));
+ assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(KeyValue.pair("pause", t1)));
+ assertThat(results.get(new Windowed<>("jo", new SessionWindow(t5, t4))), equalTo(KeyValue.pair("resume:late", t4)));
+ assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo(KeyValue.pair("pause:resume", t2)));
+ assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(KeyValue.pair("pause:resume", t4)));
+ assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(KeyValue.pair("stop", t3)));
// verify can query data via IQ
+ final ReadOnlySessionStore<String, String> sessionStore =
+ kafkaStreams.store(userSessionsStore, QueryableStoreTypes.sessionStore());
final KeyValueIterator<Windowed<String>, String> bob = sessionStore.fetch("bob");
assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t1, t1)), "start")));
assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t3, t4)), "pause:resume")));
assertFalse(bob.hasNext());
-
}
@Test
@@ -727,6 +757,7 @@ public class KStreamAggregationIntegrationTest {
});
startStreams();
assertTrue(latch.await(30, TimeUnit.SECONDS));
+
assertThat(results.get(new Windowed<>("bob", new UnlimitedWindow(startTime))), equalTo(KeyValue.pair(2L, t4)));
assertThat(results.get(new Windowed<>("penny", new UnlimitedWindow(startTime))), equalTo(KeyValue.pair(1L, t3)));
assertThat(results.get(new Windowed<>("jo", new UnlimitedWindow(startTime))), equalTo(KeyValue.pair(1L, t4)));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index acdbb39..97d1566 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -38,9 +38,11 @@ import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
@@ -153,23 +155,31 @@ public class KGroupedStreamImplTest {
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME));
}
- private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) {
+ private void doAggregateSessionWindows(final MockProcessorSupplier<Windowed<String>, Integer> supplier) {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
}
- assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
- assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
- assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
+ final Map<Windowed<String>, ValueAndTimestamp<Integer>> result
+ = supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
+ assertEquals(
+ ValueAndTimestamp.make(2, 30L),
+ result.get(new Windowed<>("1", new SessionWindow(10L, 30L))));
+ assertEquals(
+ ValueAndTimestamp.make(1, 15L),
+ result.get(new Windowed<>("2", new SessionWindow(15L, 15L))));
+ assertEquals(
+ ValueAndTimestamp.make(3, 100L),
+ result.get(new Windowed<>("1", new SessionWindow(70L, 100L))));
}
@Test
public void shouldAggregateSessionWindows() {
- final Map<Windowed<String>, Integer> results = new HashMap<>();
+ final MockProcessorSupplier<Windowed<String>, Integer> supplier = new MockProcessorSupplier<>();
final KTable<Windowed<String>, Integer> table = groupedStream
.windowedBy(SessionWindows.with(ofMillis(30)))
.aggregate(
@@ -179,15 +189,15 @@ public class KGroupedStreamImplTest {
Materialized
.<String, Integer, SessionStore<Bytes, byte[]>>as("session-store").
withValueSerde(Serdes.Integer()));
- table.toStream().foreach(results::put);
+ table.toStream().process(supplier);
- doAggregateSessionWindows(results);
+ doAggregateSessionWindows(supplier);
assertEquals(table.queryableStoreName(), "session-store");
}
@Test
public void shouldAggregateSessionWindowsWithInternalStoreName() {
- final Map<Windowed<String>, Integer> results = new HashMap<>();
+ final MockProcessorSupplier<Windowed<String>, Integer> supplier = new MockProcessorSupplier<>();
final KTable<Windowed<String>, Integer> table = groupedStream
.windowedBy(SessionWindows.with(ofMillis(30)))
.aggregate(
@@ -195,80 +205,97 @@ public class KGroupedStreamImplTest {
(aggKey, value, aggregate) -> aggregate + 1,
(aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
Materialized.with(null, Serdes.Integer()));
- table.toStream().foreach(results::put);
+ table.toStream().process(supplier);
- doAggregateSessionWindows(results);
+ doAggregateSessionWindows(supplier);
}
- private void doCountSessionWindows(final Map<Windowed<String>, Long> results) {
+ private void doCountSessionWindows(final MockProcessorSupplier<Windowed<String>, Long> supplier) {
+
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
}
- assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
- assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
- assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
+ final Map<Windowed<String>, ValueAndTimestamp<Long>> result =
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
+ assertEquals(
+ ValueAndTimestamp.make(2L, 30L),
+ result.get(new Windowed<>("1", new SessionWindow(10L, 30L))));
+ assertEquals(
+ ValueAndTimestamp.make(1L, 15L),
+ result.get(new Windowed<>("2", new SessionWindow(15L, 15L))));
+ assertEquals(
+ ValueAndTimestamp.make(3L, 100L),
+ result.get(new Windowed<>("1", new SessionWindow(70L, 100L))));
}
@Test
public void shouldCountSessionWindows() {
- final Map<Windowed<String>, Long> results = new HashMap<>();
+ final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
final KTable<Windowed<String>, Long> table = groupedStream
.windowedBy(SessionWindows.with(ofMillis(30)))
.count(Materialized.as("session-store"));
- table.toStream().foreach(results::put);
- doCountSessionWindows(results);
+ table.toStream().process(supplier);
+ doCountSessionWindows(supplier);
assertEquals(table.queryableStoreName(), "session-store");
}
@Test
public void shouldCountSessionWindowsWithInternalStoreName() {
- final Map<Windowed<String>, Long> results = new HashMap<>();
+ final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
final KTable<Windowed<String>, Long> table = groupedStream
.windowedBy(SessionWindows.with(ofMillis(30)))
.count();
- table.toStream().foreach(results::put);
- doCountSessionWindows(results);
+ table.toStream().process(supplier);
+ doCountSessionWindows(supplier);
assertNull(table.queryableStoreName());
}
- private void doReduceSessionWindows(final Map<Windowed<String>, String> results) {
+ private void doReduceSessionWindows(final MockProcessorSupplier<Windowed<String>, String> supplier) {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 10));
driver.pipeInput(recordFactory.create(TOPIC, "2", "Z", 15));
driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 30));
driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 70));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 90));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "C", 100));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 100));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "C", 90));
}
- assertEquals("A:B", results.get(new Windowed<>("1", new SessionWindow(10, 30))));
- assertEquals("Z", results.get(new Windowed<>("2", new SessionWindow(15, 15))));
- assertEquals("A:B:C", results.get(new Windowed<>("1", new SessionWindow(70, 100))));
+ final Map<Windowed<String>, ValueAndTimestamp<String>> result =
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
+ assertEquals(
+ ValueAndTimestamp.make("A:B", 30L),
+ result.get(new Windowed<>("1", new SessionWindow(10L, 30L))));
+ assertEquals(
+ ValueAndTimestamp.make("Z", 15L),
+ result.get(new Windowed<>("2", new SessionWindow(15L, 15L))));
+ assertEquals(
+ ValueAndTimestamp.make("A:B:C", 100L),
+ result.get(new Windowed<>("1", new SessionWindow(70L, 100L))));
}
@Test
public void shouldReduceSessionWindows() {
- final Map<Windowed<String>, String> results = new HashMap<>();
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
final KTable<Windowed<String>, String> table = groupedStream
.windowedBy(SessionWindows.with(ofMillis(30)))
.reduce((value1, value2) -> value1 + ":" + value2, Materialized.as("session-store"));
- table.toStream().foreach(results::put);
- doReduceSessionWindows(results);
+ table.toStream().process(supplier);
+ doReduceSessionWindows(supplier);
assertEquals(table.queryableStoreName(), "session-store");
}
@Test
public void shouldReduceSessionWindowsWithInternalStoreName() {
- final Map<Windowed<String>, String> results = new HashMap<>();
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
final KTable<Windowed<String>, String> table = groupedStream
.windowedBy(SessionWindows.with(ofMillis(30)))
.reduce((value1, value2) -> value1 + ":" + value2);
- table.toStream().foreach(results::put);
- doReduceSessionWindows(results);
+ table.toStream().process(supplier);
+ doReduceSessionWindows(supplier);
assertNull(table.queryableStoreName());
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index f53d9f3..2ea7700 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
@@ -31,6 +32,7 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.ToInternal;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
@@ -67,6 +69,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
private static final long GAP_MS = 5 * 60 * 1000L;
private static final String STORE_NAME = "session-store";
+ private final ToInternal toInternal = new ToInternal();
private final Initializer<Long> initializer = () -> 0L;
private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1;
private final Merger<String, Long> sessionMerger = (aggKey, aggOne, aggTwo) -> aggOne + aggTwo;
@@ -78,7 +81,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
aggregator,
sessionMerger);
- private final List<KeyValue> results = new ArrayList<>();
+ private final List<KeyValueTimestamp> results = new ArrayList<>();
private final Processor<String, String> processor = sessionAggregator.get();
private SessionStore<String, Long> sessionStore;
private InternalMockProcessorContext context;
@@ -100,7 +103,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
) {
@Override
public <K, V> void forward(final K key, final V value, final To to) {
- results.add(KeyValue.pair(key, value));
+ toInternal.update(to);
+ results.add(new KeyValueTimestamp<>(key, value, toInternal.timestamp()));
}
};
@@ -195,9 +199,18 @@ public class KStreamSessionWindowAggregateProcessorTest {
sessionStore.flush();
assertEquals(
Arrays.asList(
- KeyValue.pair(new Windowed<>(sessionId, new SessionWindow(0, 0)), new Change<>(1L, null)),
- KeyValue.pair(new Windowed<>(sessionId, new SessionWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(2L, null)),
- KeyValue.pair(new Windowed<>(sessionId, new SessionWindow(time, time)), new Change<>(3L, null))
+ new KeyValueTimestamp<>(
+ new Windowed<>(sessionId, new SessionWindow(0, 0)),
+ new Change<>(1L, null),
+ 0L),
+ new KeyValueTimestamp<>(
+ new Windowed<>(sessionId, new SessionWindow(GAP_MS + 1, GAP_MS + 1)),
+ new Change<>(2L, null),
+ GAP_MS + 1),
+ new KeyValueTimestamp<>(
+ new Windowed<>(sessionId, new SessionWindow(time, time)),
+ new Change<>(3L, null),
+ time)
),
results
);
@@ -244,13 +257,34 @@ public class KStreamSessionWindowAggregateProcessorTest {
assertEquals(
Arrays.asList(
- KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(1L, null)),
- KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), new Change<>(1L, null)),
- KeyValue.pair(new Windowed<>("c", new SessionWindow(0, 0)), new Change<>(1L, null)),
- KeyValue.pair(new Windowed<>("d", new SessionWindow(0, GAP_MS / 2)), new Change<>(2L, null)),
- KeyValue.pair(new Windowed<>("b", new SessionWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(1L, null)),
- KeyValue.pair(new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)), new Change<>(2L, null)),
- KeyValue.pair(new Windowed<>("c", new SessionWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, null))
+ new KeyValueTimestamp<>(
+ new Windowed<>("a", new SessionWindow(0, 0)),
+ new Change<>(1L, null),
+ 0L),
+ new KeyValueTimestamp<>(
+ new Windowed<>("b", new SessionWindow(0, 0)),
+ new Change<>(1L, null),
+ 0L),
+ new KeyValueTimestamp<>(
+ new Windowed<>("c", new SessionWindow(0, 0)),
+ new Change<>(1L, null),
+ 0L),
+ new KeyValueTimestamp<>(
+ new Windowed<>("d", new SessionWindow(0, GAP_MS / 2)),
+ new Change<>(2L, null),
+ GAP_MS / 2),
+ new KeyValueTimestamp<>(
+ new Windowed<>("b", new SessionWindow(GAP_MS + 1, GAP_MS + 1)),
+ new Change<>(1L, null),
+ GAP_MS + 1),
+ new KeyValueTimestamp<>(
+ new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)),
+ new Change<>(2L, null),
+ GAP_MS + 1 + GAP_MS / 2),
+ new KeyValueTimestamp<>(new Windowed<>(
+ "c",
+ new SessionWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, null),
+ GAP_MS + 1 + GAP_MS / 2)
),
results
);
@@ -283,9 +317,18 @@ public class KStreamSessionWindowAggregateProcessorTest {
assertEquals(
Arrays.asList(
- KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(1L, null)),
- KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), new Change<>(1L, null)),
- KeyValue.pair(new Windowed<>("c", new SessionWindow(0, 0)), new Change<>(1L, null))
+ new KeyValueTimestamp<>(
+ new Windowed<>("a", new SessionWindow(0, 0)),
+ new Change<>(1L, null),
+ 0L),
+ new KeyValueTimestamp<>(
+ new Windowed<>("b", new SessionWindow(0, 0)),
+ new Change<>(1L, null),
+ 0L),
+ new KeyValueTimestamp<>(
+ new Windowed<>("c", new SessionWindow(0, 0)),
+ new Change<>(1L, null),
+ 0L)
),
results
);
@@ -302,9 +345,18 @@ public class KStreamSessionWindowAggregateProcessorTest {
processor.process("a", "1");
assertEquals(
Arrays.asList(
- KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(1L, null)),
- KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(null, null)),
- KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 5)), new Change<>(2L, null))
+ new KeyValueTimestamp<>(
+ new Windowed<>("a", new SessionWindow(0, 0)),
+ new Change<>(1L, null),
+ 0L),
+ new KeyValueTimestamp<>(
+ new Windowed<>("a", new SessionWindow(0, 0)),
+ new Change<>(null, null),
+ 0L),
+ new KeyValueTimestamp<>(
+ new Windowed<>("a", new SessionWindow(0, 5)),
+ new Change<>(2L, null),
+ 5L)
),
results
);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
new file mode 100644
index 0000000..b25febf
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class SessionCacheFlushListenerTest {
+ @Test
+ public void shouldForwardKeyNewValueOldValueAndTimestamp() {
+ final InternalProcessorContext context = mock(InternalProcessorContext.class);
+ expect(context.currentNode()).andReturn(null).anyTimes();
+ context.setCurrentNode(null);
+ context.setCurrentNode(null);
+ context.forward(
+ new Windowed<>("key", new SessionWindow(21L, 73L)),
+ new Change<>("newValue", "oldValue"),
+ To.all().withTimestamp(73L));
+ expectLastCall();
+ replay(context);
+
+ new SessionCacheFlushListener<>(context).apply(
+ new Windowed<>("key", new SessionWindow(21L, 73L)),
+ "newValue",
+ "oldValue",
+ 42L);
+
+ verify(context);
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
similarity index 67%
rename from streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java
rename to streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
index f62e826..e99c684 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.junit.Test;
@@ -27,7 +29,7 @@ import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
-public class TupleForwarderTest {
+public class SessionTupleForwarderTest {
@Test
public void shouldSetFlushListenerOnWrappedStateStore() {
@@ -36,13 +38,13 @@ public class TupleForwarderTest {
}
private void setFlushListener(final boolean sendOldValues) {
- final WrappedStateStore<StateStore, Object, Object> store = mock(WrappedStateStore.class);
- final ForwardingCacheFlushListener<Object, Object> flushListener = mock(ForwardingCacheFlushListener.class);
+ final WrappedStateStore<StateStore, Windowed<Object>, Object> store = mock(WrappedStateStore.class);
+ final SessionCacheFlushListener<Object, Object> flushListener = mock(SessionCacheFlushListener.class);
expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false);
replay(store);
- new TupleForwarder<>(store, null, flushListener, sendOldValues);
+ new SessionTupleForwarder<>(store, null, flushListener, sendOldValues);
verify(store);
}
@@ -53,21 +55,27 @@ public class TupleForwarderTest {
shouldForwardRecordsIfWrappedStateStoreDoesNotCache(true);
}
- private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValues) {
+ private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValued) {
final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
final ProcessorContext context = mock(ProcessorContext.class);
- expect(store.setFlushListener(null, sendOldValues)).andReturn(false);
- if (sendOldValues) {
- context.forward("key", new Change<>("newValue", "oldValue"));
+ expect(store.setFlushListener(null, sendOldValued)).andReturn(false);
+ if (sendOldValued) {
+ context.forward(
+ new Windowed<>("key", new SessionWindow(21L, 42L)),
+ new Change<>("value", "oldValue"),
+ To.all().withTimestamp(42L));
} else {
- context.forward("key", new Change<>("newValue", null));
+ context.forward(
+ new Windowed<>("key", new SessionWindow(21L, 42L)),
+ new Change<>("value", null),
+ To.all().withTimestamp(42L));
}
expectLastCall();
replay(store, context);
- new TupleForwarder<>(store, context, null, sendOldValues)
- .maybeForward("key", "newValue", "oldValue");
+ new SessionTupleForwarder<>(store, context, null, sendOldValued)
+ .maybeForward(new Windowed<>("key", new SessionWindow(21L, 42L)), "value", "oldValue");
verify(store, context);
}
@@ -80,8 +88,8 @@ public class TupleForwarderTest {
expect(store.setFlushListener(null, false)).andReturn(true);
replay(store, context);
- new TupleForwarder<>(store, context, null, false)
- .maybeForward("key", "newValue", "oldValue");
+ new SessionTupleForwarder<>(store, context, null, false)
+ .maybeForward(new Windowed<>("key", new SessionWindow(21L, 42L)), "value", "oldValue");
verify(store, context);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index b2c4ec8..d1e5448 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
@@ -32,16 +33,17 @@ import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -67,50 +69,93 @@ public class SessionWindowedKStreamImplTest {
}
@Test
- public void shouldCountSessionWindowed() {
- final Map<Windowed<String>, Long> results = new HashMap<>();
+ public void shouldCountSessionWindowedWithCachingDisabled() {
+ props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ shouldCountSessionWindowed();
+ }
+
+ @Test
+ public void shouldCountSessionWindowedWithCachingEnabled() {
+ shouldCountSessionWindowed();
+ }
+
+ private void shouldCountSessionWindowed() {
+ final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
stream.count()
- .toStream()
- .foreach(results::put);
+ .toStream()
+ .process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(driver);
}
- assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo(2L));
- assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo(1L));
- assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo(1L));
+
+ final Map<Windowed<String>, ValueAndTimestamp<Long>> result =
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
+
+ assertThat(result.size(), equalTo(3));
+ assertThat(
+ result.get(new Windowed<>("1", new SessionWindow(10L, 15L))),
+ equalTo(ValueAndTimestamp.make(2L, 15L)));
+ assertThat(
+ result.get(new Windowed<>("2", new SessionWindow(599L, 600L))),
+ equalTo(ValueAndTimestamp.make(2L, 600L)));
+ assertThat(
+ result.get(new Windowed<>("1", new SessionWindow(600L, 600L))),
+ equalTo(ValueAndTimestamp.make(1L, 600L)));
}
@Test
public void shouldReduceWindowed() {
- final Map<Windowed<String>, String> results = new HashMap<>();
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
stream.reduce(MockReducer.STRING_ADDER)
- .toStream()
- .foreach(results::put);
+ .toStream()
+ .process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(driver);
}
- assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("1+2"));
- assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("1"));
- assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("3"));
+
+ final Map<Windowed<String>, ValueAndTimestamp<String>> result =
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
+
+ assertThat(result.size(), equalTo(3));
+ assertThat(
+ result.get(new Windowed<>("1", new SessionWindow(10, 15))),
+ equalTo(ValueAndTimestamp.make("1+2", 15L)));
+ assertThat(
+ result.get(new Windowed<>("2", new SessionWindow(599L, 600))),
+ equalTo(ValueAndTimestamp.make("1+2", 600L)));
+ assertThat(
+ result.get(new Windowed<>("1", new SessionWindow(600, 600))),
+ equalTo(ValueAndTimestamp.make("3", 600L)));
}
@Test
public void shouldAggregateSessionWindowed() {
- final Map<Windowed<String>, String> results = new HashMap<>();
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
stream.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
sessionMerger,
Materialized.with(Serdes.String(), Serdes.String()))
- .toStream()
- .foreach(results::put);
+ .toStream()
+ .process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(driver);
}
- assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("0+0+1+2"));
- assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("0+1"));
- assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("0+3"));
+
+ final Map<Windowed<String>, ValueAndTimestamp<String>> result =
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
+
+ assertThat(result.size(), equalTo(3));
+ assertThat(
+ result.get(new Windowed<>("1", new SessionWindow(10, 15))),
+ equalTo(ValueAndTimestamp.make("0+0+1+2", 15L)));
+ assertThat(
+ result.get(new Windowed<>("2", new SessionWindow(599, 600))),
+ equalTo(ValueAndTimestamp.make("0+0+1+2", 600L)));
+ assertThat(
+ result.get(new Windowed<>("1", new SessionWindow(600, 600))),
+ equalTo(ValueAndTimestamp.make("0+3", 600L)));
}
@Test
@@ -126,7 +171,7 @@ public class SessionWindowedKStreamImplTest {
equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L),
KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L),
- KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), 1L))));
+ KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), 2L))));
}
}
@@ -144,7 +189,7 @@ public class SessionWindowedKStreamImplTest {
equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"),
KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"),
- KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "1"))));
+ KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "1+2"))));
}
}
@@ -165,7 +210,7 @@ public class SessionWindowedKStreamImplTest {
equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"),
KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"),
- KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "0+1"))));
+ KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "0+0+1+2"))));
}
}
@@ -247,6 +292,6 @@ public class SessionWindowedKStreamImplTest {
driver.pipeInput(recordFactory.create(TOPIC, "1", "2", 15));
driver.pipeInput(recordFactory.create(TOPIC, "1", "3", 600));
driver.pipeInput(recordFactory.create(TOPIC, "2", "1", 600));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 599));
}
-
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index 24f222b..a7151cd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -103,7 +103,8 @@ public class SuppressScenarioTest {
final Topology topology = builder.build();
- final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+ final ConsumerRecordFactory<String, String> recordFactory =
+ new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
@@ -179,7 +180,8 @@ public class SuppressScenarioTest {
.toStream()
.to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
final Topology topology = builder.build();
- final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+ final ConsumerRecordFactory<String, String> recordFactory =
+ new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L));
@@ -248,7 +250,8 @@ public class SuppressScenarioTest {
.to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
final Topology topology = builder.build();
System.out.println(topology.describe());
- final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+ final ConsumerRecordFactory<String, String> recordFactory =
+ new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L));
@@ -311,7 +314,8 @@ public class SuppressScenarioTest {
.to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
final Topology topology = builder.build();
System.out.println(topology.describe());
- final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+ final ConsumerRecordFactory<String, String> recordFactory =
+ new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L));
@@ -370,7 +374,8 @@ public class SuppressScenarioTest {
.to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
final Topology topology = builder.build();
System.out.println(topology.describe());
- final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+ final ConsumerRecordFactory<String, String> recordFactory =
+ new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
@@ -420,7 +425,8 @@ public class SuppressScenarioTest {
.to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
final Topology topology = builder.build();
System.out.println(topology.describe());
- final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+ final ConsumerRecordFactory<String, String> recordFactory =
+ new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
@@ -475,7 +481,8 @@ public class SuppressScenarioTest {
.to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
final Topology topology = builder.build();
System.out.println(topology.describe());
- final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+ final ConsumerRecordFactory<String, String> recordFactory =
+ new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
// first window
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
@@ -492,10 +499,10 @@ public class SuppressScenarioTest {
drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
asList(
new KeyValueTimestamp<>("[k1@0/0]", 1L, 0L),
- new KeyValueTimestamp<>("[k1@0/0]", null, 5L),
+ new KeyValueTimestamp<>("[k1@0/0]", null, 0L),
new KeyValueTimestamp<>("[k1@0/5]", 2L, 5L),
- new KeyValueTimestamp<>("[k1@0/5]", null, 1L),
- new KeyValueTimestamp<>("[k1@0/5]", 3L, 1L),
+ new KeyValueTimestamp<>("[k1@0/5]", null, 5L),
+ new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L),
new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L),
new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L)
)
@@ -503,14 +510,15 @@ public class SuppressScenarioTest {
verify(
drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
asList(
- new KeyValueTimestamp<>("[k1@0/5]", 3L, 1L),
+ new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L),
new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L)
)
);
}
}
- private static <K, V> void verify(final List<ProducerRecord<K, V>> results, final List<KeyValueTimestamp<K, V>> expectedResults) {
+ private static <K, V> void verify(final List<ProducerRecord<K, V>> results,
+ final List<KeyValueTimestamp<K, V>> expectedResults) {
if (results.size() != expectedResults.size()) {
throw new AssertionError(printRecords(results) + " != " + expectedResults);
}
@@ -525,7 +533,10 @@ public class SuppressScenarioTest {
}
}
- private static <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) {
+ private static <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver,
+ final String topic,
+ final Deserializer<K> keyDeserializer,
+ final Deserializer<V> valueDeserializer) {
final List<ProducerRecord<K, V>> result = new LinkedList<>();
for (ProducerRecord<K, V> next = driver.readOutput(topic, keyDeserializer, valueDeserializer);
next != null;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 8c7325c..128cdc2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -17,11 +17,13 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.Windowed;
@@ -34,20 +36,18 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
+import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.test.StreamsTestUtils.toList;
import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
@@ -74,8 +74,7 @@ public class CachingSessionStoreTest {
private CachingSessionStore cachingStore;
private ThreadCache cache;
- @Before
- public void setUp() {
+ public CachingSessionStoreTest() {
final SessionKeySchema schema = new SessionKeySchema();
final RocksDBSegmentedBytesStore root =
new RocksDBSegmentedBytesStore("test", "metrics-scope", 0L, SEGMENT_INTERVAL, schema);
@@ -140,7 +139,7 @@ public class CachingSessionStoreTest {
@Test
public void shouldFetchAllSessionsWithSameRecordKey() {
- final List<KeyValue<Windowed<Bytes>, byte[]>> expected = Arrays.asList(
+ final List<KeyValue<Windowed<Bytes>, byte[]>> expected = asList(
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()),
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()),
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)), "3".getBytes()),
@@ -247,8 +246,8 @@ public class CachingSessionStoreTest {
final Windowed<Bytes> b = new Windowed<>(keyA, new SessionWindow(1, 2));
final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(2, 4));
final Windowed<String> bDeserialized = new Windowed<>("a", new SessionWindow(1, 2));
- final CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>, String> flushListener =
- new CachingKeyValueStoreTest.CacheFlushListenerStub<>(
+ final CacheFlushListenerStub<Windowed<String>, String> flushListener =
+ new CacheFlushListenerStub<>(
new SessionWindowedDeserializer<>(new StringDeserializer()),
new StringDeserializer());
cachingStore.setFlushListener(flushListener, true);
@@ -257,7 +256,11 @@ public class CachingSessionStoreTest {
cachingStore.flush();
assertEquals(
- Collections.singletonMap(bDeserialized, new Change<>("1", null)),
+ Collections.singletonList(
+ new KeyValueTimestamp<>(
+ bDeserialized,
+ new Change<>("1", null),
+ DEFAULT_TIMESTAMP)),
flushListener.forwarded
);
flushListener.forwarded.clear();
@@ -266,7 +269,11 @@ public class CachingSessionStoreTest {
cachingStore.flush();
assertEquals(
- Collections.singletonMap(aDeserialized, new Change<>("1", null)),
+ Collections.singletonList(
+ new KeyValueTimestamp<>(
+ aDeserialized,
+ new Change<>("1", null),
+ DEFAULT_TIMESTAMP)),
flushListener.forwarded
);
flushListener.forwarded.clear();
@@ -275,7 +282,11 @@ public class CachingSessionStoreTest {
cachingStore.flush();
assertEquals(
- Collections.singletonMap(aDeserialized, new Change<>("2", "1")),
+ Collections.singletonList(
+ new KeyValueTimestamp<>(
+ aDeserialized,
+ new Change<>("2", "1"),
+ DEFAULT_TIMESTAMP)),
flushListener.forwarded
);
flushListener.forwarded.clear();
@@ -284,7 +295,11 @@ public class CachingSessionStoreTest {
cachingStore.flush();
assertEquals(
- Collections.singletonMap(aDeserialized, new Change<>(null, "2")),
+ Collections.singletonList(
+ new KeyValueTimestamp<>(
+ aDeserialized,
+ new Change<>(null, "2"),
+ DEFAULT_TIMESTAMP)),
flushListener.forwarded
);
flushListener.forwarded.clear();
@@ -295,7 +310,7 @@ public class CachingSessionStoreTest {
cachingStore.flush();
assertEquals(
- Collections.emptyMap(),
+ Collections.emptyList(),
flushListener.forwarded
);
flushListener.forwarded.clear();
@@ -305,8 +320,8 @@ public class CachingSessionStoreTest {
public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() {
final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
- final CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>, String> flushListener =
- new CachingKeyValueStoreTest.CacheFlushListenerStub<>(
+ final CacheFlushListenerStub<Windowed<String>, String> flushListener =
+ new CacheFlushListenerStub<>(
new SessionWindowedDeserializer<>(new StringDeserializer()),
new StringDeserializer());
cachingStore.setFlushListener(flushListener, false);
@@ -321,11 +336,18 @@ public class CachingSessionStoreTest {
cachingStore.flush();
assertEquals(
- mkMap(
- mkEntry(aDeserialized, new Change<>("1", null)),
- mkEntry(aDeserialized, new Change<>("2", null)),
- mkEntry(aDeserialized, new Change<>(null, null))
- ),
+ asList(new KeyValueTimestamp<>(
+ aDeserialized,
+ new Change<>("1", null),
+ DEFAULT_TIMESTAMP),
+ new KeyValueTimestamp<>(
+ aDeserialized,
+ new Change<>("2", null),
+ DEFAULT_TIMESTAMP),
+ new KeyValueTimestamp<>(
+ aDeserialized,
+ new Change<>(null, null),
+ DEFAULT_TIMESTAMP)),
flushListener.forwarded
);
flushListener.forwarded.clear();
@@ -336,7 +358,7 @@ public class CachingSessionStoreTest {
cachingStore.flush();
assertEquals(
- Collections.emptyMap(),
+ Collections.emptyList(),
flushListener.forwarded
);
flushListener.forwarded.clear();
@@ -466,4 +488,29 @@ public class CachingSessionStoreTest {
allSessions.add(KeyValue.pair(key, value));
}
+ public static class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[], byte[]> {
+ final Deserializer<K> keyDeserializer;
+ final Deserializer<V> valueDesializer;
+ final List<KeyValueTimestamp<K, Change<V>>> forwarded = new LinkedList<>();
+
+ CacheFlushListenerStub(final Deserializer<K> keyDeserializer,
+ final Deserializer<V> valueDesializer) {
+ this.keyDeserializer = keyDeserializer;
+ this.valueDesializer = valueDesializer;
+ }
+
+ @Override
+ public void apply(final byte[] key,
+ final byte[] newValue,
+ final byte[] oldValue,
+ final long timestamp) {
+ forwarded.add(
+ new KeyValueTimestamp<>(
+ keyDeserializer.deserialize(null, key),
+ new Change<>(
+ valueDesializer.deserialize(null, newValue),
+ valueDesializer.deserialize(null, oldValue)),
+ timestamp));
+ }
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
index eb0cc55..3e3c1a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
-import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
@@ -34,8 +33,13 @@ import org.junit.runner.RunWith;
import java.util.Collections;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThrows;
@RunWith(EasyMockRunner.class)
public class SessionStoreBuilderTest {
@@ -49,15 +53,15 @@ public class SessionStoreBuilderTest {
@Before
public void setUp() throws Exception {
- EasyMock.expect(supplier.get()).andReturn(inner);
- EasyMock.expect(supplier.name()).andReturn("name");
- EasyMock.replay(supplier);
+ expect(supplier.get()).andReturn(inner);
+ expect(supplier.name()).andReturn("name");
+ replay(supplier);
- builder = new SessionStoreBuilder<>(supplier,
- Serdes.String(),
- Serdes.String(),
- new MockTime()
- );
+ builder = new SessionStoreBuilder<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime());
}
@Test
@@ -113,29 +117,37 @@ public class SessionStoreBuilderTest {
assertThat(changeLogging.wrapped(), CoreMatchers.<StateStore>equalTo(inner));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldThrowNullPointerIfInnerIsNull() {
- new SessionStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());
+ final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime()));
+ assertThat(e.getMessage(), equalTo("supplier cannot be null"));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldThrowNullPointerIfKeySerdeIsNull() {
- new SessionStoreBuilder<>(supplier, null, Serdes.String(), new MockTime());
+ final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, null, Serdes.String(), new MockTime()));
+ assertThat(e.getMessage(), equalTo("name cannot be null"));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldThrowNullPointerIfValueSerdeIsNull() {
- new SessionStoreBuilder<>(supplier, Serdes.String(), null, new MockTime());
+ final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, Serdes.String(), null, new MockTime()));
+ assertThat(e.getMessage(), equalTo("name cannot be null"));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldThrowNullPointerIfTimeIsNull() {
- new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null);
+ reset(supplier);
+ expect(supplier.name()).andReturn("name");
+ replay(supplier);
+ final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null));
+ assertThat(e.getMessage(), equalTo("time cannot be null"));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void shouldThrowNullPointerIfMetricsScopeIsNull() {
- new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime());
+ final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()));
+ assertThat(e.getMessage(), equalTo("name cannot be null"));
}
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index e000b82..0ff5c8a 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -20,9 +20,12 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -32,6 +35,7 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
public final ArrayList<String> processed = new ArrayList<>();
public final ArrayList<K> processedKeys = new ArrayList<>();
public final ArrayList<V> processedValues = new ArrayList<>();
+ public final Map<K, ValueAndTimestamp<V>> lastValueAndTimestampPerKey = new HashMap<>();
public final ArrayList<Long> punctuatedStreamTime = new ArrayList<>();
public final ArrayList<Long> punctuatedSystemTime = new ArrayList<>();
@@ -77,6 +81,11 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
public void process(final K key, final V value) {
processedKeys.add(key);
processedValues.add(value);
+ if (value != null) {
+ lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, context().timestamp()));
+ } else {
+ lastValueAndTimestampPerKey.remove(key);
+ }
processed.add(
(key == null ? "null" : key) +
":" + (value == null ? "null" : value) +
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
index f2de3de..c0110a1 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
@@ -289,22 +289,34 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
}
{
- // late event for first window, included since grade period hasn't passed
+ // out-of-order event for first window, included since grade period hasn't passed
testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 2L)
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
}
{
+ // add to second window
+ testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 13L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
+ // add out-of-order to second window
+ testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 10L)
+ Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+ }
+ {
// push stream time forward to flush other events through
testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 30L)
- // too-late event should get dropped from the stream
+ // late event should get dropped from the stream
testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 3L)
// should now have to results
val r1 = testDriver.readRecord[String, Long](sinkTopic)
r1.key shouldBe "0:2:k1"
r1.value shouldBe 3L
+ r1.timestamp shouldBe 2L
val r2 = testDriver.readRecord[String, Long](sinkTopic)
- r2.key shouldBe "8:8:k1"
- r2.value shouldBe 1
+ r2.key shouldBe "8:13:k1"
+ r2.value shouldBe 3L
+ r2.timestamp shouldBe 13L
}
testDriver.readRecord[String, Long](sinkTopic) shouldBe null