You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/17 23:48:23 UTC
[kafka] branch trunk updated: KAFKA-6455: Improve DSL operator
timestamp semantics (#6725)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6a2749f KAFKA-6455: Improve DSL operator timestamp semantics (#6725)
6a2749f is described below
commit 6a2749faa63397caa93dae7bfdc3f1d0573a2ff4
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Sat May 18 01:48:07 2019 +0200
KAFKA-6455: Improve DSL operator timestamp semantics (#6725)
Basic idea:
KTable-KTable join: set max(left-ts,right-ts) for result
#agg(...) (stream/table windowed/non-windowed): set max(ts1, ts2, ts3,...) of all input records that contribute to the aggregation result
for all stateless transformation: input-ts -> output-ts
Reviewers: Guozhang Wang <wa...@gmail.com>, John Roesler <jo...@confluent.io>, Andy Coates <an...@confluent.io>, Bill Bejeck <bbejeck@gmail.com
---
.../kstream/internals/KStreamAggregate.java | 21 +-
.../internals/KStreamKTableJoinProcessor.java | 4 +-
.../streams/kstream/internals/KStreamReduce.java | 20 +-
.../internals/KStreamSessionWindowAggregate.java | 7 +-
.../kstream/internals/KStreamWindowAggregate.java | 20 +-
.../streams/kstream/internals/KTableAggregate.java | 9 +-
.../streams/kstream/internals/KTableFilter.java | 15 +-
.../kstream/internals/KTableKTableInnerJoin.java | 30 ++-
.../kstream/internals/KTableKTableLeftJoin.java | 40 ++-
.../kstream/internals/KTableKTableOuterJoin.java | 48 +++-
.../kstream/internals/KTableKTableRightJoin.java | 36 ++-
.../streams/kstream/internals/KTableMapValues.java | 16 +-
.../KTableMaterializedValueGetterSupplier.java | 8 +-
.../streams/kstream/internals/KTableReduce.java | 7 +-
.../kstream/internals/KTableRepartitionMap.java | 12 +-
.../streams/kstream/internals/KTableSource.java | 15 +-
.../internals/KTableSourceValueGetterSupplier.java | 7 +-
.../kstream/internals/KTableTransformValues.java | 8 +-
.../kstream/internals/KTableValueGetter.java | 3 +-
.../internals/TimestampedTupleForwarder.java | 10 +
.../org/apache/kafka/streams/processor/To.java | 1 +
.../state/internals/KeyValueIteratorFacade.java | 4 +-
.../internals/ReadOnlyKeyValueStoreFacade.java | 6 +-
.../state/internals/ReadOnlyWindowStoreFacade.java | 7 +-
.../integration/utils/IntegrationTestUtils.java | 14 +-
.../kstream/internals/GlobalKTableJoinsTest.java | 61 ++---
.../kstream/internals/KGroupedStreamImplTest.java | 136 +++++++----
.../kstream/internals/KGroupedTableImplTest.java | 94 +++++---
.../kstream/internals/KStreamBranchTest.java | 39 +--
.../kstream/internals/KStreamFilterTest.java | 20 +-
.../kstream/internals/KStreamForeachTest.java | 12 +-
.../internals/KStreamGlobalKTableJoinTest.java | 37 +--
.../internals/KStreamGlobalKTableLeftJoinTest.java | 45 ++--
.../streams/kstream/internals/KStreamImplTest.java | 42 ++--
.../kstream/internals/KStreamKStreamJoinTest.java | 35 +--
.../internals/KStreamKStreamLeftJoinTest.java | 7 +-
.../kstream/internals/KStreamKTableJoinTest.java | 42 ++--
.../internals/KStreamKTableLeftJoinTest.java | 45 ++--
.../streams/kstream/internals/KStreamMapTest.java | 4 +-
.../kstream/internals/KStreamMapValuesTest.java | 8 +-
.../streams/kstream/internals/KStreamPeekTest.java | 7 +-
...KStreamSessionWindowAggregateProcessorTest.java | 4 +-
.../kstream/internals/KStreamTransformTest.java | 3 +-
.../internals/KStreamTransformValuesTest.java | 8 +-
.../internals/KStreamWindowAggregateTest.java | 72 +++---
.../kstream/internals/KTableAggregateTest.java | 87 ++++---
.../kstream/internals/KTableFilterTest.java | 123 +++++-----
.../streams/kstream/internals/KTableImplTest.java | 18 +-
.../internals/KTableKTableInnerJoinTest.java | 267 ++++++++++++++++-----
.../internals/KTableKTableLeftJoinTest.java | 248 +++++++++++++------
.../internals/KTableKTableOuterJoinTest.java | 264 +++++++++++++-------
.../kstream/internals/KTableMapKeysTest.java | 4 +-
.../kstream/internals/KTableMapValuesTest.java | 113 ++++-----
.../kstream/internals/KTableReduceTest.java | 12 +-
.../kstream/internals/KTableSourceTest.java | 99 ++++----
.../internals/KTableTransformValuesTest.java | 54 +++--
.../kstream/internals/SuppressScenarioTest.java | 10 +-
.../internals/TimeWindowedKStreamImplTest.java | 131 +++++++---
.../internals/TimestampedTupleForwarderTest.java | 19 +-
.../java/org/apache/kafka/test/MockProcessor.java | 14 ++
.../streams/internals/KeyValueStoreFacade.java | 8 +-
61 files changed, 1621 insertions(+), 939 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 7d367d7..8fd4f5b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -87,18 +87,21 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key);
T oldAgg = getValueOrNull(oldAggAndTimestamp);
+ final T newAgg;
+ final long newTimestamp;
+
if (oldAgg == null) {
oldAgg = initializer.apply();
+ newTimestamp = context().timestamp();
+ } else {
+ oldAgg = oldAggAndTimestamp.value();
+ newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
}
- T newAgg = oldAgg;
-
- // try to add the new value
- newAgg = aggregator.apply(key, value, newAgg);
+ newAgg = aggregator.apply(key, value, oldAgg);
- // update the store with the new value
- store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()));
- tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
+ store.put(key, ValueAndTimestamp.make(newAgg, newTimestamp));
+ tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null, newTimestamp);
}
}
@@ -128,8 +131,8 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
}
@Override
- public T get(final K key) {
- return getValueOrNull(store.get(key));
+ public ValueAndTimestamp<T> get(final K key) {
+ return store.get(key);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index 70b9bad..dcf0799 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -24,6 +24,8 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoinProcessor.class);
@@ -68,7 +70,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
metrics.skippedRecordsSensor().record();
} else {
final K2 mappedKey = keyMapper.apply(key, value);
- final V2 value2 = mappedKey == null ? null : valueGetter.get(mappedKey);
+ final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));
if (leftJoin || value2 != null) {
context().forward(key, joiner.apply(value, value2));
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index a1b9f73..58769df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -84,18 +84,20 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(key);
final V oldAgg = getValueOrNull(oldAggAndTimestamp);
- V newAgg = oldAgg;
- // try to add the new value
- if (newAgg == null) {
+ final V newAgg;
+ final long newTimestamp;
+
+ if (oldAgg == null) {
newAgg = value;
+ newTimestamp = context().timestamp();
} else {
- newAgg = reducer.apply(newAgg, value);
+ newAgg = reducer.apply(oldAgg, value);
+ newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
}
- // update the store with the new value
- store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()));
- tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
+ store.put(key, ValueAndTimestamp.make(newAgg, newTimestamp));
+ tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null, newTimestamp);
}
}
@@ -125,8 +127,8 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
}
@Override
- public V get(final K key) {
- return getValueOrNull(store.get(key));
+ public ValueAndTimestamp<V> get(final K key) {
+ return store.get(key);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 68dc4a7..fdbf475 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -202,8 +203,10 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
}
@Override
- public Agg get(final Windowed<K> key) {
- return store.fetchSession(key.key(), key.window().start(), key.window().end());
+ public ValueAndTimestamp<Agg> get(final Windowed<K> key) {
+ return ValueAndTimestamp.make(
+ store.fetchSession(key.key(), key.window().start(), key.window().end()),
+ key.window().end());
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 95ae54f..3458ca0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -122,15 +122,25 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
final ValueAndTimestamp<Agg> oldAggAndTimestamp = windowStore.fetch(key, windowStart);
Agg oldAgg = getValueOrNull(oldAggAndTimestamp);
+ final Agg newAgg;
+ final long newTimestamp;
+
if (oldAgg == null) {
oldAgg = initializer.apply();
+ newTimestamp = context().timestamp();
+ } else {
+ newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
}
- final Agg newAgg = aggregator.apply(key, value, oldAgg);
+ newAgg = aggregator.apply(key, value, oldAgg);
// update the store with the new value
- windowStore.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()), windowStart);
- tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, sendOldValues ? oldAgg : null);
+ windowStore.put(key, ValueAndTimestamp.make(newAgg, newTimestamp), windowStart);
+ tupleForwarder.maybeForward(
+ new Windowed<>(key, entry.getValue()),
+ newAgg,
+ sendOldValues ? oldAgg : null,
+ newTimestamp);
} else {
log.debug(
"Skipping record for expired window. " +
@@ -184,10 +194,10 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
@SuppressWarnings("unchecked")
@Override
- public Agg get(final Windowed<K> windowedKey) {
+ public ValueAndTimestamp<Agg> get(final Windowed<K> windowedKey) {
final K key = windowedKey.key();
final W window = (W) windowedKey.window();
- return getValueOrNull(windowStore.fetch(key, window.start()));
+ return windowStore.fetch(key, window.start());
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 88bf867..be0a7a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -85,10 +85,12 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key);
final T oldAgg = getValueOrNull(oldAggAndTimestamp);
final T intermediateAgg;
+ long newTimestamp = context().timestamp();
// first try to remove the old value
if (value.oldValue != null && oldAgg != null) {
intermediateAgg = remove.apply(key, value.oldValue, oldAgg);
+ newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
} else {
intermediateAgg = oldAgg;
}
@@ -104,13 +106,16 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
}
newAgg = add.apply(key, value.newValue, initializedAgg);
+ if (oldAggAndTimestamp != null) {
+ newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
+ }
} else {
newAgg = intermediateAgg;
}
// update the store with the new value
- store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()));
- tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
+ store.put(key, ValueAndTimestamp.make(newAgg, newTimestamp));
+ tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null, newTimestamp);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index afa1822..6c9d452 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -61,6 +61,19 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
return newValue;
}
+ private ValueAndTimestamp<V> computeValue(final K key, final ValueAndTimestamp<V> valueAndTimestamp) {
+ ValueAndTimestamp<V> newValueAndTimestamp = null;
+
+ if (valueAndTimestamp != null) {
+ final V value = valueAndTimestamp.value();
+ if (filterNot ^ predicate.test(key, value)) {
+ newValueAndTimestamp = valueAndTimestamp;
+ }
+ }
+
+ return newValueAndTimestamp;
+ }
+
private class KTableFilterProcessor extends AbstractProcessor<K, Change<V>> {
private TimestampedKeyValueStore<K, V> store;
@@ -135,7 +148,7 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
}
@Override
- public V get(final K key) {
+ public ValueAndTimestamp<V> get(final K key) {
return computeValue(key, parentGetter.get(key));
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
index 58110b4..2d4cbc8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
@@ -21,10 +21,14 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
private static final Logger LOG = LoggerFactory.getLogger(KTableKTableInnerJoin.class);
@@ -87,22 +91,26 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
R newValue = null;
+ final long resultTimestamp;
R oldValue = null;
- final V2 value2 = valueGetter.get(key);
- if (value2 == null) {
+ final ValueAndTimestamp<V2> valueAndTimestampRight = valueGetter.get(key);
+ final V2 valueRight = getValueOrNull(valueAndTimestampRight);
+ if (valueRight == null) {
return;
}
+ resultTimestamp = Math.max(context().timestamp(), valueAndTimestampRight.timestamp());
+
if (change.newValue != null) {
- newValue = joiner.apply(change.newValue, value2);
+ newValue = joiner.apply(change.newValue, valueRight);
}
if (sendOldValues && change.oldValue != null) {
- oldValue = joiner.apply(change.oldValue, value2);
+ oldValue = joiner.apply(change.oldValue, valueRight);
}
- context().forward(key, new Change<>(newValue, oldValue));
+ context().forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(resultTimestamp));
}
@Override
@@ -129,14 +137,18 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
- public R get(final K key) {
- final V1 value1 = valueGetter1.get(key);
+ public ValueAndTimestamp<R> get(final K key) {
+ final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+ final V1 value1 = getValueOrNull(valueAndTimestamp1);
if (value1 != null) {
- final V2 value2 = valueGetter2.get(keyValueMapper.apply(key, value1));
+ final ValueAndTimestamp<V2> valueAndTimestamp2 = valueGetter2.get(keyValueMapper.apply(key, value1));
+ final V2 value2 = getValueOrNull(valueAndTimestamp2);
if (value2 != null) {
- return joiner.apply(value1, value2);
+ return ValueAndTimestamp.make(
+ joiner.apply(value1, value2),
+ Math.max(valueAndTimestamp1.timestamp(), valueAndTimestamp2.timestamp()));
} else {
return null;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index cbb63c6..03e59d9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -20,10 +20,15 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.kafka.streams.processor.internals.RecordQueue.UNKNOWN;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
private static final Logger LOG = LoggerFactory.getLogger(KTableKTableLeftJoin.class);
@@ -85,13 +90,24 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
R newValue = null;
+ final long resultTimestamp;
R oldValue = null;
- final V2 value2 = valueGetter.get(key);
- if (value2 == null && change.newValue == null && change.oldValue == null) {
- return;
+ final ValueAndTimestamp<V2> valueAndTimestampRight = valueGetter.get(key);
+ final V2 value2 = getValueOrNull(valueAndTimestampRight);
+ final long timestampRight;
+
+ if (value2 == null) {
+ if (change.newValue == null && change.oldValue == null) {
+ return;
+ }
+ timestampRight = UNKNOWN;
+ } else {
+ timestampRight = valueAndTimestampRight.timestamp();
}
+ resultTimestamp = Math.max(context().timestamp(), timestampRight);
+
if (change.newValue != null) {
newValue = joiner.apply(change.newValue, value2);
}
@@ -100,7 +116,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
oldValue = joiner.apply(change.oldValue, value2);
}
- context().forward(key, new Change<>(newValue, oldValue));
+ context().forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(resultTimestamp));
}
@Override
@@ -127,12 +143,20 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
- public R get(final K key) {
- final V1 value1 = valueGetter1.get(key);
+ public ValueAndTimestamp<R> get(final K key) {
+ final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+ final V1 value1 = getValueOrNull(valueAndTimestamp1);
if (value1 != null) {
- final V2 value2 = valueGetter2.get(key);
- return joiner.apply(value1, value2);
+ final ValueAndTimestamp<V2> valueAndTimestamp2 = valueGetter2.get(key);
+ final V2 value2 = getValueOrNull(valueAndTimestamp2);
+ final long resultTimestamp;
+ if (valueAndTimestamp2 == null) {
+ resultTimestamp = valueAndTimestamp1.timestamp();
+ } else {
+ resultTimestamp = Math.max(valueAndTimestamp1.timestamp(), valueAndTimestamp2.timestamp());
+ }
+ return ValueAndTimestamp.make(joiner.apply(value1, value2), resultTimestamp);
} else {
return null;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 27eb698..d600718 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -20,10 +20,15 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.kafka.streams.processor.internals.RecordQueue.UNKNOWN;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
private static final Logger LOG = LoggerFactory.getLogger(KTableKTableOuterJoin.class);
@@ -84,11 +89,18 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
R newValue = null;
+ final long resultTimestamp;
R oldValue = null;
- final V2 value2 = valueGetter.get(key);
- if (value2 == null && change.newValue == null && change.oldValue == null) {
- return;
+ final ValueAndTimestamp<V2> valueAndTimestamp2 = valueGetter.get(key);
+ final V2 value2 = getValueOrNull(valueAndTimestamp2);
+ if (value2 == null) {
+ if (change.newValue == null && change.oldValue == null) {
+ return;
+ }
+ resultTimestamp = context().timestamp();
+ } else {
+ resultTimestamp = Math.max(context().timestamp(), valueAndTimestamp2.timestamp());
}
if (value2 != null || change.newValue != null) {
@@ -99,7 +111,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
oldValue = joiner.apply(change.oldValue, value2);
}
- context().forward(key, new Change<>(newValue, oldValue));
+ context().forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(resultTimestamp));
}
@Override
@@ -126,16 +138,36 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
- public R get(final K key) {
+ public ValueAndTimestamp<R> get(final K key) {
R newValue = null;
- final V1 value1 = valueGetter1.get(key);
- final V2 value2 = valueGetter2.get(key);
+
+ final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+ final V1 value1;
+ final long timestamp1;
+ if (valueAndTimestamp1 == null) {
+ value1 = null;
+ timestamp1 = UNKNOWN;
+ } else {
+ value1 = valueAndTimestamp1.value();
+ timestamp1 = valueAndTimestamp1.timestamp();
+ }
+
+ final ValueAndTimestamp<V2> valueAndTimestamp2 = valueGetter2.get(key);
+ final V2 value2;
+ final long timestamp2;
+ if (valueAndTimestamp2 == null) {
+ value2 = null;
+ timestamp2 = UNKNOWN;
+ } else {
+ value2 = valueAndTimestamp2.value();
+ timestamp2 = valueAndTimestamp2.timestamp();
+ }
if (value1 != null || value2 != null) {
newValue = joiner.apply(value1, value2);
}
- return newValue;
+ return ValueAndTimestamp.make(newValue, Math.max(timestamp1, timestamp2));
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index 1e634d0..fbddd9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -20,10 +20,14 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
private static final Logger LOG = LoggerFactory.getLogger(KTableKTableRightJoin.class);
@@ -84,20 +88,26 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
final R newValue;
+ final long resultTimestamp;
R oldValue = null;
- final V2 value2 = valueGetter.get(key);
- if (value2 == null) {
+ final ValueAndTimestamp<V2> valueAndTimestampLeft = valueGetter.get(key);
+ final V2 valueLeft = getValueOrNull(valueAndTimestampLeft);
+ if (valueLeft == null) {
return;
}
- newValue = joiner.apply(change.newValue, value2);
+ resultTimestamp = Math.max(context().timestamp(), valueAndTimestampLeft.timestamp());
+
+ // joiner == "reverse joiner"
+ newValue = joiner.apply(change.newValue, valueLeft);
if (sendOldValues) {
- oldValue = joiner.apply(change.oldValue, value2);
+ // joiner == "reverse joiner"
+ oldValue = joiner.apply(change.oldValue, valueLeft);
}
- context().forward(key, new Change<>(newValue, oldValue));
+ context().forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(resultTimestamp));
}
@Override
@@ -124,12 +134,20 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
- public R get(final K key) {
- final V2 value2 = valueGetter2.get(key);
+ public ValueAndTimestamp<R> get(final K key) {
+ final ValueAndTimestamp<V2> valueAndTimestamp2 = valueGetter2.get(key);
+ final V2 value2 = getValueOrNull(valueAndTimestamp2);
if (value2 != null) {
- final V1 value1 = valueGetter1.get(key);
- return joiner.apply(value1, value2);
+ final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key);
+ final V1 value1 = getValueOrNull(valueAndTimestamp1);
+ final long resultTimestamp;
+ if (valueAndTimestamp1 == null) {
+ resultTimestamp = valueAndTimestamp2.timestamp();
+ } else {
+ resultTimestamp = Math.max(valueAndTimestamp1.timestamp(), valueAndTimestamp2.timestamp());
+ }
+ return ValueAndTimestamp.make(joiner.apply(value1, value2), resultTimestamp);
} else {
return null;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 496127c..e734457 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -81,6 +81,18 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
return newValue;
}
+ private ValueAndTimestamp<V1> computeValueAndTimestamp(final K key, final ValueAndTimestamp<V> valueAndTimestamp) {
+ V1 newValue = null;
+ long timestamp = 0;
+
+ if (valueAndTimestamp != null) {
+ newValue = mapper.apply(key, valueAndTimestamp.value());
+ timestamp = valueAndTimestamp.timestamp();
+ }
+
+ return ValueAndTimestamp.make(newValue, timestamp);
+ }
+
private class KTableMapValuesProcessor extends AbstractProcessor<K, Change<V>> {
private TimestampedKeyValueStore<K, V1> store;
@@ -128,8 +140,8 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
}
@Override
- public V1 get(final K key) {
- return computeValue(key, parentGetter.get(key));
+ public ValueAndTimestamp<V1> get(final K key) {
+ return computeValueAndTimestamp(key, parentGetter.get(key));
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
index a84251c..8258109 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
@@ -14,13 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-
-import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
public class KTableMaterializedValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
private final String storeName;
@@ -48,8 +46,8 @@ public class KTableMaterializedValueGetterSupplier<K, V> implements KTableValueG
}
@Override
- public V get(final K key) {
- return getValueOrNull(store.get(key));
+ public ValueAndTimestamp<V> get(final K key) {
+ return store.get(key);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 3055f51..1b1a2bf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -80,10 +80,12 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(key);
final V oldAgg = getValueOrNull(oldAggAndTimestamp);
final V intermediateAgg;
+ long newTimestamp = context().timestamp();
// first try to remove the old value
if (value.oldValue != null && oldAgg != null) {
intermediateAgg = removeReducer.apply(oldAgg, value.oldValue);
+ newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
} else {
intermediateAgg = oldAgg;
}
@@ -95,14 +97,15 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
newAgg = value.newValue;
} else {
newAgg = addReducer.apply(intermediateAgg, value.newValue);
+ newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
}
} else {
newAgg = intermediateAgg;
}
// update the store with the new value
- store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()));
- tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
+ store.put(key, ValueAndTimestamp.make(newAgg, newTimestamp));
+ tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null, newTimestamp);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index e86445a..a3d8b79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -22,6 +22,9 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
/**
* KTable repartition map functions are not exposed to public APIs, but only used for keyed aggregations.
@@ -101,6 +104,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
private class KTableMapValueGetter implements KTableValueGetter<K, KeyValue<K1, V1>> {
private final KTableValueGetter<K, V> parentGetter;
+ private ProcessorContext context;
KTableMapValueGetter(final KTableValueGetter<K, V> parentGetter) {
this.parentGetter = parentGetter;
@@ -108,12 +112,16 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
@Override
public void init(final ProcessorContext context) {
+ this.context = context;
parentGetter.init(context);
}
@Override
- public KeyValue<K1, V1> get(final K key) {
- return mapper.apply(key, parentGetter.get(key));
+ public ValueAndTimestamp<KeyValue<K1, V1>> get(final K key) {
+ final ValueAndTimestamp<V> valueAndTimestamp = parentGetter.get(key);
+ return ValueAndTimestamp.make(
+ mapper.apply(key, getValueOrNull(valueAndTimestamp)),
+ valueAndTimestamp == null ? context.timestamp() : valueAndTimestamp.timestamp());
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 8b6ec6e..864cf51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -28,8 +28,6 @@ import org.slf4j.LoggerFactory;
import java.util.Objects;
-import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
-
public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class);
@@ -101,8 +99,17 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
}
if (queryableName != null) {
- final ValueAndTimestamp<V> oldValueAndTimestamp = sendOldValues ? store.get(key) : null;
- final V oldValue = getValueOrNull(oldValueAndTimestamp);
+ final ValueAndTimestamp<V> oldValueAndTimestamp = store.get(key);
+ final V oldValue;
+ if (oldValueAndTimestamp != null) {
+ oldValue = oldValueAndTimestamp.value();
+ if (context().timestamp() < oldValueAndTimestamp.timestamp()) {
+ LOG.warn("Detected out-of-order KTable update for {} at offset {}, partition {}.",
+ store.name(), context().offset(), context().partition());
+ }
+ } else {
+ oldValue = null;
+ }
store.put(key, ValueAndTimestamp.make(value, context().timestamp()));
tupleForwarder.maybeForward(key, value, oldValue);
} else {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
index 5ec33f8..21731e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -18,8 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-
-import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> {
private final String storeName;
@@ -45,8 +44,8 @@ public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterS
store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
}
- public V get(final K key) {
- return getValueOrNull(store.get(key));
+ public ValueAndTimestamp<V> get(final K key) {
+ return store.get(key);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index 5d81711..86da063 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.util.Objects;
+import static org.apache.kafka.streams.processor.internals.RecordQueue.UNKNOWN;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
@@ -139,8 +140,11 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
}
@Override
- public V1 get(final K key) {
- return valueTransformer.transform(key, parentGetter.get(key));
+ public ValueAndTimestamp<V1> get(final K key) {
+ final ValueAndTimestamp<V> valueAndTimestamp = parentGetter.get(key);
+ return ValueAndTimestamp.make(
+ valueTransformer.transform(key, getValueOrNull(valueAndTimestamp)),
+ valueAndTimestamp == null ? UNKNOWN : valueAndTimestamp.timestamp());
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
index edd9644..a2695d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
@@ -17,12 +17,13 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
public interface KTableValueGetter<K, V> {
void init(ProcessorContext context);
- V get(K key);
+ ValueAndTimestamp<V> get(K key);
void close();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
index ab2b506..910dd8f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
/**
@@ -50,4 +51,13 @@ class TimestampedTupleForwarder<K, V> {
context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null));
}
}
+
+ public void maybeForward(final K key,
+ final V newValue,
+ final V oldValue,
+ final long timestamp) {
+ if (!cachingEnabled) {
+ context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null), To.all().withTimestamp(timestamp));
+ }
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/To.java b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
index dc124aa..fe19dbf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/To.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
@@ -88,4 +88,5 @@ public class To {
public int hashCode() {
throw new UnsupportedOperationException("To is unsafe for use in Hash collections");
}
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
index 099c3df..f79b6f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
@@ -20,6 +20,8 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
public class KeyValueIteratorFacade<K, V> implements KeyValueIterator<K, V> {
private final KeyValueIterator<K, ValueAndTimestamp<V>> innerIterator;
@@ -40,7 +42,7 @@ public class KeyValueIteratorFacade<K, V> implements KeyValueIterator<K, V> {
@Override
public KeyValue<K, V> next() {
final KeyValue<K, ValueAndTimestamp<V>> innerKeyValue = innerIterator.next();
- return KeyValue.pair(innerKeyValue.key, innerKeyValue.value.value());
+ return KeyValue.pair(innerKeyValue.key, getValueOrNull(innerKeyValue.value));
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
index fe24043..862e6fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
@@ -19,7 +19,8 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
public class ReadOnlyKeyValueStoreFacade<K, V> implements ReadOnlyKeyValueStore<K, V> {
protected final TimestampedKeyValueStore<K, V> inner;
@@ -30,8 +31,7 @@ public class ReadOnlyKeyValueStoreFacade<K, V> implements ReadOnlyKeyValueStore<
@Override
public V get(final K key) {
- final ValueAndTimestamp<V> valueAndTimestamp = inner.get(key);
- return valueAndTimestamp == null ? null : valueAndTimestamp.value();
+ return getValueOrNull(inner.get(key));
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacade.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacade.java
index 46db1f2..713959d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacade.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacade.java
@@ -26,6 +26,8 @@ import org.apache.kafka.streams.state.WindowStoreIterator;
import java.time.Instant;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
public class ReadOnlyWindowStoreFacade<K, V> implements ReadOnlyWindowStore<K, V> {
protected final TimestampedWindowStore<K, V> inner;
@@ -36,8 +38,7 @@ public class ReadOnlyWindowStoreFacade<K, V> implements ReadOnlyWindowStore<K, V
@Override
public V fetch(final K key,
final long time) {
- final ValueAndTimestamp<V> valueAndTimestamp = inner.fetch(key, time);
- return valueAndTimestamp == null ? null : valueAndTimestamp.value();
+ return getValueOrNull(inner.fetch(key, time));
}
@Override
@@ -117,7 +118,7 @@ public class ReadOnlyWindowStoreFacade<K, V> implements ReadOnlyWindowStore<K, V
@Override
public KeyValue<Long, V> next() {
final KeyValue<Long, ValueAndTimestamp<V>> innerKeyValue = innerIterator.next();
- return KeyValue.pair(innerKeyValue.key, innerKeyValue.value.value());
+ return KeyValue.pair(innerKeyValue.key, getValueOrNull(innerKeyValue.value));
}
}
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index e786a44..e6cf85d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -678,11 +678,11 @@ public class IntegrationTestUtils {
}
- public static void verifyKeyValueTimestamps(final Properties consumerConfig,
- final String topic,
- final List<KeyValueTimestamp<String, Long>> expected) {
+ public static <K, V> void verifyKeyValueTimestamps(final Properties consumerConfig,
+ final String topic,
+ final List<KeyValueTimestamp<K, V>> expected) {
- final List<ConsumerRecord<String, Long>> results;
+ final List<ConsumerRecord<K, V>> results;
try {
results = waitUntilMinRecordsReceived(consumerConfig, topic, expected.size());
} catch (final InterruptedException e) {
@@ -692,9 +692,9 @@ public class IntegrationTestUtils {
if (results.size() != expected.size()) {
throw new AssertionError(printRecords(results) + " != " + expected);
}
- final Iterator<KeyValueTimestamp<String, Long>> expectedIterator = expected.iterator();
- for (final ConsumerRecord<String, Long> result : results) {
- final KeyValueTimestamp<String, Long> expected1 = expectedIterator.next();
+ final Iterator<KeyValueTimestamp<K, V>> expectedIterator = expected.iterator();
+ for (final ConsumerRecord<K, V> result : results) {
+ final KeyValueTimestamp<K, V> expected1 = expectedIterator.next();
try {
compareKeyValueTimestamp(result, expected1.key(), expected1.value(), expected1.timestamp());
} catch (final AssertionError e) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index c43fcd8..b29903f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -18,14 +18,15 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
@@ -41,76 +42,64 @@ import static org.junit.Assert.assertEquals;
public class GlobalKTableJoinsTest {
private final StreamsBuilder builder = new StreamsBuilder();
- private final Map<String, String> results = new HashMap<>();
private final String streamTopic = "stream";
private final String globalTopic = "global";
private GlobalKTable<String, String> global;
private KStream<String, String> stream;
private KeyValueMapper<String, String, String> keyValueMapper;
- private ForeachAction<String, String> action;
-
@Before
public void setUp() {
final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
global = builder.globalTable(globalTopic, consumed);
stream = builder.stream(streamTopic, consumed);
- keyValueMapper = new KeyValueMapper<String, String, String>() {
- @Override
- public String apply(final String key, final String value) {
- return value;
- }
- };
- action = new ForeachAction<String, String>() {
- @Override
- public void apply(final String key, final String value) {
- results.put(key, value);
- }
- };
+ keyValueMapper = (key, value) -> value;
}
@Test
public void shouldLeftJoinWithStream() {
+ final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
stream
.leftJoin(global, keyValueMapper, MockValueJoiner.TOSTRING_JOINER)
- .foreach(action);
-
- final Map<String, String> expected = new HashMap<>();
- expected.put("1", "a+A");
- expected.put("2", "b+B");
- expected.put("3", "c+null");
+ .process(supplier);
- verifyJoin(expected);
+ final Map<String, ValueAndTimestamp<String>> expected = new HashMap<>();
+ expected.put("1", ValueAndTimestamp.make("a+A", 2L));
+ expected.put("2", ValueAndTimestamp.make("b+B", 10L));
+ expected.put("3", ValueAndTimestamp.make("c+null", 3L));
+ verifyJoin(expected, supplier);
}
@Test
public void shouldInnerJoinWithStream() {
+ final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
stream
.join(global, keyValueMapper, MockValueJoiner.TOSTRING_JOINER)
- .foreach(action);
+ .process(supplier);
- final Map<String, String> expected = new HashMap<>();
- expected.put("1", "a+A");
- expected.put("2", "b+B");
+ final Map<String, ValueAndTimestamp<String>> expected = new HashMap<>();
+ expected.put("1", ValueAndTimestamp.make("a+A", 2L));
+ expected.put("2", ValueAndTimestamp.make("b+B", 10L));
- verifyJoin(expected);
+ verifyJoin(expected, supplier);
}
- private void verifyJoin(final Map<String, String> expected) {
+ private void verifyJoin(final Map<String, ValueAndTimestamp<String>> expected,
+ final MockProcessorSupplier<String, String> supplier) {
final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
// write some data to the global table
- driver.pipeInput(recordFactory.create(globalTopic, "a", "A"));
- driver.pipeInput(recordFactory.create(globalTopic, "b", "B"));
+ driver.pipeInput(recordFactory.create(globalTopic, "a", "A", 1L));
+ driver.pipeInput(recordFactory.create(globalTopic, "b", "B", 5L));
//write some data to the stream
- driver.pipeInput(recordFactory.create(streamTopic, "1", "a"));
- driver.pipeInput(recordFactory.create(streamTopic, "2", "b"));
- driver.pipeInput(recordFactory.create(streamTopic, "3", "c"));
+ driver.pipeInput(recordFactory.create(streamTopic, "1", "a", 2L));
+ driver.pipeInput(recordFactory.create(streamTopic, "2", "b", 10L));
+ driver.pipeInput(recordFactory.create(streamTopic, "3", "c", 3L));
}
- assertEquals(expected, results);
+ assertEquals(expected, supplier.theCapturedProcessor().lastValueAndTimestampPerKey);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 97d1566..0c850e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException;
@@ -50,7 +51,6 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -412,11 +412,20 @@ public class KGroupedStreamImplTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(driver);
- final KeyValueStore<String, Long> count = driver.getKeyValueStore("count");
+ {
+ final KeyValueStore<String, Long> count = driver.getKeyValueStore("count");
- assertThat(count.get("1"), equalTo(3L));
- assertThat(count.get("2"), equalTo(1L));
- assertThat(count.get("3"), equalTo(2L));
+ assertThat(count.get("1"), equalTo(3L));
+ assertThat(count.get("2"), equalTo(1L));
+ assertThat(count.get("3"), equalTo(2L));
+ }
+ {
+ final KeyValueStore<String, ValueAndTimestamp<Long>> count = driver.getTimestampedKeyValueStore("count");
+
+ assertThat(count.get("1"), equalTo(ValueAndTimestamp.make(3L, 10L)));
+ assertThat(count.get("2"), equalTo(ValueAndTimestamp.make(1L, 1L)));
+ assertThat(count.get("3"), equalTo(ValueAndTimestamp.make(2L, 9L)));
+ }
}
}
@@ -448,11 +457,20 @@ public class KGroupedStreamImplTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(driver);
- final KeyValueStore<String, String> reduced = driver.getKeyValueStore("reduce");
+ {
+ final KeyValueStore<String, String> reduced = driver.getKeyValueStore("reduce");
+
+ assertThat(reduced.get("1"), equalTo("A+C+D"));
+ assertThat(reduced.get("2"), equalTo("B"));
+ assertThat(reduced.get("3"), equalTo("E+F"));
+ }
+ {
+ final KeyValueStore<String, ValueAndTimestamp<String>> reduced = driver.getTimestampedKeyValueStore("reduce");
- assertThat(reduced.get("1"), equalTo("A+C+D"));
- assertThat(reduced.get("2"), equalTo("B"));
- assertThat(reduced.get("3"), equalTo("E+F"));
+ assertThat(reduced.get("1"), equalTo(ValueAndTimestamp.make("A+C+D", 10L)));
+ assertThat(reduced.get("2"), equalTo(ValueAndTimestamp.make("B", 1L)));
+ assertThat(reduced.get("3"), equalTo(ValueAndTimestamp.make("E+F", 9L)));
+ }
}
}
@@ -491,84 +509,110 @@ public class KGroupedStreamImplTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(driver);
- final KeyValueStore<String, String> aggregate = driver.getKeyValueStore("aggregate");
+ {
+ final KeyValueStore<String, String> aggregate = driver.getKeyValueStore("aggregate");
+
+ assertThat(aggregate.get("1"), equalTo("0+A+C+D"));
+ assertThat(aggregate.get("2"), equalTo("0+B"));
+ assertThat(aggregate.get("3"), equalTo("0+E+F"));
+ }
+ {
+ final KeyValueStore<String, ValueAndTimestamp<String>> aggregate = driver.getTimestampedKeyValueStore("aggregate");
- assertThat(aggregate.get("1"), equalTo("0+A+C+D"));
- assertThat(aggregate.get("2"), equalTo("0+B"));
- assertThat(aggregate.get("3"), equalTo("0+E+F"));
+ assertThat(aggregate.get("1"), equalTo(ValueAndTimestamp.make("0+A+C+D", 10L)));
+ assertThat(aggregate.get("2"), equalTo(ValueAndTimestamp.make("0+B", 1L)));
+ assertThat(aggregate.get("3"), equalTo(ValueAndTimestamp.make("0+E+F", 9L)));
+ }
}
}
@SuppressWarnings("unchecked")
@Test
public void shouldAggregateWithDefaultSerdes() {
- final Map<String, String> results = new HashMap<>();
+ final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
groupedStream
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER)
.toStream()
- .foreach(results::put);
+ .process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(driver);
- assertThat(results.get("1"), equalTo("0+A+C+D"));
- assertThat(results.get("2"), equalTo("0+B"));
- assertThat(results.get("3"), equalTo("0+E+F"));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey.get("1"),
+ equalTo(ValueAndTimestamp.make("0+A+C+D", 10L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey.get("2"),
+ equalTo(ValueAndTimestamp.make("0+B", 1L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey.get("3"),
+ equalTo(ValueAndTimestamp.make("0+E+F", 9L)));
}
}
private void processData(final TopologyTestDriver driver) {
- driver.pipeInput(recordFactory.create(TOPIC, "1", "A"));
- driver.pipeInput(recordFactory.create(TOPIC, "2", "B"));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "C"));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "D"));
- driver.pipeInput(recordFactory.create(TOPIC, "3", "E"));
- driver.pipeInput(recordFactory.create(TOPIC, "3", "F"));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 5L));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 1L));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "C", 3L));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "D", 10L));
+ driver.pipeInput(recordFactory.create(TOPIC, "3", "E", 8L));
+ driver.pipeInput(recordFactory.create(TOPIC, "3", "F", 9L));
driver.pipeInput(recordFactory.create(TOPIC, "3", (String) null));
}
- private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) {
+ private void doCountWindowed(final MockProcessorSupplier<Windowed<String>, Long> supplier) {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
- driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 0));
- driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 0));
- driver.pipeInput(recordFactory.create(TOPIC, "3", "C", 0));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
- driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500));
- driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
- driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 0L));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 499L));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 100L));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 0L));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 100L));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 200L));
+ driver.pipeInput(recordFactory.create(TOPIC, "3", "C", 1L));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500L));
+ driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 500L));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500L));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500L));
+ driver.pipeInput(recordFactory.create(TOPIC, "3", "B", 100L));
}
- assertThat(results, equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 1L),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(0, 500)), 1L),
- KeyValue.pair(new Windowed<>("3", new TimeWindow(0, 500)), 1L),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 2L),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L)
+ assertThat(supplier.theCapturedProcessor().processedWithTimestamps, equalTo(Arrays.asList(
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 0L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 2L, 499L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 3L, 499L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 0L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 2L, 100L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 3L, 200L),
+ new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(0L, 500L)), 1L, 1L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 1L, 500L),
+ new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 2L, 500L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 1L, 500L),
+ new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 2L, 500L),
+ new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(0L, 500L)), 2L, 100L)
)));
}
@Test
public void shouldCountWindowed() {
- final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
+ final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
groupedStream
.windowedBy(TimeWindows.of(ofMillis(500L)))
.count(Materialized.as("aggregate-by-key-windowed"))
.toStream()
- .foreach((key, value) -> results.add(KeyValue.pair(key, value)));
+ .process(supplier);
- doCountWindowed(results);
+ doCountWindowed(supplier);
}
@Test
public void shouldCountWindowedWithInternalStoreName() {
- final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
+ final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
+ final List<KeyValue<Windowed<String>, KeyValue<Long, Long>>> results = new ArrayList<>();
groupedStream
.windowedBy(TimeWindows.of(ofMillis(500L)))
.count()
.toStream()
- .foreach((key, value) -> results.add(KeyValue.pair(key, value)));
+ .process(supplier);
- doCountWindowed(results);
+ doCountWindowed(supplier);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 34002ae..a8fed67 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -31,16 +31,17 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Test;
-import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -124,32 +125,32 @@ public class KGroupedTableImplTest {
Materialized.as(INVALID_STORE_NAME));
}
- private Map<String, Integer> getReducedResults(final KTable<String, Integer> inputKTable) {
- final Map<String, Integer> reducedResults = new HashMap<>();
+ private MockProcessorSupplier<String, Integer> getReducedResults(final KTable<String, Integer> inputKTable) {
+ final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
inputKTable
.toStream()
- .foreach(reducedResults::put);
- return reducedResults;
+ .process(supplier);
+ return supplier;
}
- private void assertReduced(final Map<String, Integer> reducedResults,
+ private void assertReduced(final Map<String, ValueAndTimestamp<Integer>> reducedResults,
final String topic,
final TopologyTestDriver driver) {
final ConsumerRecordFactory<String, Double> recordFactory =
new ConsumerRecordFactory<>(new StringSerializer(), new DoubleSerializer());
driver.pipeInput(recordFactory.create(topic, "A", 1.1, 10));
- driver.pipeInput(recordFactory.create(topic, "B", 2.2, 10));
+ driver.pipeInput(recordFactory.create(topic, "B", 2.2, 11));
- assertEquals(Integer.valueOf(1), reducedResults.get("A"));
- assertEquals(Integer.valueOf(2), reducedResults.get("B"));
+ assertEquals(ValueAndTimestamp.make(1, 10L), reducedResults.get("A"));
+ assertEquals(ValueAndTimestamp.make(2, 11L), reducedResults.get("B"));
- driver.pipeInput(recordFactory.create(topic, "A", 2.6, 10));
- driver.pipeInput(recordFactory.create(topic, "B", 1.3, 10));
- driver.pipeInput(recordFactory.create(topic, "A", 5.7, 10));
- driver.pipeInput(recordFactory.create(topic, "B", 6.2, 10));
+ driver.pipeInput(recordFactory.create(topic, "A", 2.6, 30));
+ driver.pipeInput(recordFactory.create(topic, "B", 1.3, 30));
+ driver.pipeInput(recordFactory.create(topic, "A", 5.7, 50));
+ driver.pipeInput(recordFactory.create(topic, "B", 6.2, 20));
- assertEquals(Integer.valueOf(5), reducedResults.get("A"));
- assertEquals(Integer.valueOf(6), reducedResults.get("B"));
+ assertEquals(ValueAndTimestamp.make(5, 50L), reducedResults.get("A"));
+ assertEquals(ValueAndTimestamp.make(6, 30L), reducedResults.get("B"));
}
@Test
@@ -170,9 +171,9 @@ public class KGroupedTableImplTest {
MockReducer.INTEGER_SUBTRACTOR,
Materialized.as("reduced"));
- final Map<String, Integer> results = getReducedResults(reduced);
+ final MockProcessorSupplier<String, Integer> supplier = getReducedResults(reduced);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
- assertReduced(results, topic, driver);
+ assertReduced(supplier.theCapturedProcessor().lastValueAndTimestampPerKey, topic, driver);
assertEquals(reduced.queryableStoreName(), "reduced");
}
}
@@ -192,9 +193,9 @@ public class KGroupedTableImplTest {
.groupBy(intProjection)
.reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR);
- final Map<String, Integer> results = getReducedResults(reduced);
+ final MockProcessorSupplier<String, Integer> supplier = getReducedResults(reduced);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
- assertReduced(results, topic, driver);
+ assertReduced(supplier.theCapturedProcessor().lastValueAndTimestampPerKey, topic, driver);
assertNull(reduced.queryableStoreName());
}
}
@@ -217,12 +218,19 @@ public class KGroupedTableImplTest {
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Integer()));
- final Map<String, Integer> results = getReducedResults(reduced);
+ final MockProcessorSupplier<String, Integer> supplier = getReducedResults(reduced);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
- assertReduced(results, topic, driver);
- final KeyValueStore<String, Integer> reduce = driver.getKeyValueStore("reduce");
- assertThat(reduce.get("A"), equalTo(5));
- assertThat(reduce.get("B"), equalTo(6));
+ assertReduced(supplier.theCapturedProcessor().lastValueAndTimestampPerKey, topic, driver);
+ {
+ final KeyValueStore<String, Integer> reduce = driver.getKeyValueStore("reduce");
+ assertThat(reduce.get("A"), equalTo(5));
+ assertThat(reduce.get("B"), equalTo(6));
+ }
+ {
+ final KeyValueStore<String, ValueAndTimestamp<Integer>> reduce = driver.getTimestampedKeyValueStore("reduce");
+ assertThat(reduce.get("A"), equalTo(ValueAndTimestamp.make(5, 50L)));
+ assertThat(reduce.get("B"), equalTo(ValueAndTimestamp.make(6, 30L)));
+ }
}
}
@@ -243,9 +251,16 @@ public class KGroupedTableImplTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(topic, driver);
- final KeyValueStore<String, Long> counts = driver.getKeyValueStore("count");
- assertThat(counts.get("1"), equalTo(3L));
- assertThat(counts.get("2"), equalTo(2L));
+ {
+ final KeyValueStore<String, Long> counts = driver.getKeyValueStore("count");
+ assertThat(counts.get("1"), equalTo(3L));
+ assertThat(counts.get("2"), equalTo(2L));
+ }
+ {
+ final KeyValueStore<String, ValueAndTimestamp<Long>> counts = driver.getTimestampedKeyValueStore("count");
+ assertThat(counts.get("1"), equalTo(ValueAndTimestamp.make(3L, 50L)));
+ assertThat(counts.get("2"), equalTo(ValueAndTimestamp.make(2L, 60L)));
+ }
}
}
@@ -269,9 +284,18 @@ public class KGroupedTableImplTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(topic, driver);
- final KeyValueStore<String, String> aggregate = driver.getKeyValueStore("aggregate");
- assertThat(aggregate.get("1"), equalTo("0+1+1+1"));
- assertThat(aggregate.get("2"), equalTo("0+2+2"));
+ {
+ {
+ final KeyValueStore<String, String> aggregate = driver.getKeyValueStore("aggregate");
+ assertThat(aggregate.get("1"), equalTo("0+1+1+1"));
+ assertThat(aggregate.get("2"), equalTo("0+2+2"));
+ }
+ {
+ final KeyValueStore<String, ValueAndTimestamp<String>> aggregate = driver.getTimestampedKeyValueStore("aggregate");
+ assertThat(aggregate.get("1"), equalTo(ValueAndTimestamp.make("0+1+1+1", 50L)));
+ assertThat(aggregate.get("2"), equalTo(ValueAndTimestamp.make("0+2+2", 60L)));
+ }
+ }
}
}
@@ -347,10 +371,10 @@ public class KGroupedTableImplTest {
final TopologyTestDriver driver) {
final ConsumerRecordFactory<String, String> recordFactory =
new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
- driver.pipeInput(recordFactory.create(topic, "A", "1"));
- driver.pipeInput(recordFactory.create(topic, "B", "1"));
- driver.pipeInput(recordFactory.create(topic, "C", "1"));
- driver.pipeInput(recordFactory.create(topic, "D", "2"));
- driver.pipeInput(recordFactory.create(topic, "E", "2"));
+ driver.pipeInput(recordFactory.create(topic, "A", "1", 10L));
+ driver.pipeInput(recordFactory.create(topic, "B", "1", 50L));
+ driver.pipeInput(recordFactory.create(topic, "C", "1", 30L));
+ driver.pipeInput(recordFactory.create(topic, "D", "2", 40L));
+ driver.pipeInput(recordFactory.create(topic, "E", "2", 60L));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index b977346..77d5269 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -46,24 +46,9 @@ public class KStreamBranchTest {
public void testKStreamBranch() {
final StreamsBuilder builder = new StreamsBuilder();
- final Predicate<Integer, String> isEven = new Predicate<Integer, String>() {
- @Override
- public boolean test(final Integer key, final String value) {
- return (key % 2) == 0;
- }
- };
- final Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
- @Override
- public boolean test(final Integer key, final String value) {
- return (key % 3) == 0;
- }
- };
- final Predicate<Integer, String> isOdd = new Predicate<Integer, String>() {
- @Override
- public boolean test(final Integer key, final String value) {
- return (key % 2) != 0;
- }
- };
+ final Predicate<Integer, String> isEven = (key, value) -> (key % 2) == 0;
+ final Predicate<Integer, String> isMultipleOfThree = (key, value) -> (key % 3) == 0;
+ final Predicate<Integer, String> isOdd = (key, value) -> (key % 2) != 0;
final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6};
@@ -92,24 +77,14 @@ public class KStreamBranchTest {
assertEquals(2, processors.get(2).processed.size());
}
+ @SuppressWarnings("unchecked")
@Test
public void testTypeVariance() {
- final Predicate<Number, Object> positive = new Predicate<Number, Object>() {
- @Override
- public boolean test(final Number key, final Object value) {
- return key.doubleValue() > 0;
- }
- };
+ final Predicate<Number, Object> positive = (key, value) -> key.doubleValue() > 0;
- final Predicate<Number, Object> negative = new Predicate<Number, Object>() {
- @Override
- public boolean test(final Number key, final Object value) {
- return key.doubleValue() < 0;
- }
- };
+ final Predicate<Number, Object> negative = (key, value) -> key.doubleValue() < 0;
- @SuppressWarnings("unchecked")
- final KStream<Integer, String>[] branches = new StreamsBuilder()
+ new StreamsBuilder()
.<Integer, String>stream("empty")
.branch(positive, negative);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index d7e6240..3f8f835 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -39,12 +39,7 @@ public class KStreamFilterTest {
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
- private final Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
- @Override
- public boolean test(final Integer key, final String value) {
- return (key % 3) == 0;
- }
- };
+ private final Predicate<Integer, String> isMultipleOfThree = (key, value) -> (key % 3) == 0;
@Test
public void testFilter() {
@@ -52,9 +47,8 @@ public class KStreamFilterTest {
final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
final KStream<Integer, String> stream;
- final MockProcessorSupplier<Integer, String> supplier;
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
- supplier = new MockProcessorSupplier<>();
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.filter(isMultipleOfThree).process(supplier);
@@ -73,9 +67,8 @@ public class KStreamFilterTest {
final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
final KStream<Integer, String> stream;
- final MockProcessorSupplier<Integer, String> supplier;
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
- supplier = new MockProcessorSupplier<>();
stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.filterNot(isMultipleOfThree).process(supplier);
@@ -90,12 +83,7 @@ public class KStreamFilterTest {
@Test
public void testTypeVariance() {
- final Predicate<Number, Object> numberKeyPredicate = new Predicate<Number, Object>() {
- @Override
- public boolean test(final Number key, final Object value) {
- return false;
- }
- };
+ final Predicate<Number, Object> numberKeyPredicate = (key, value) -> false;
new StreamsBuilder()
.<Integer, String>stream("empty")
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
index feed2fb..c6f26c8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -62,12 +62,7 @@ public class KStreamForeachTest {
final List<KeyValue<Integer, String>> actualRecords = new ArrayList<>();
final ForeachAction<Integer, String> action =
- new ForeachAction<Integer, String>() {
- @Override
- public void apply(final Integer key, final String value) {
- actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase(Locale.ROOT)));
- }
- };
+ (key, value) -> actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase(Locale.ROOT)));
// When
final StreamsBuilder builder = new StreamsBuilder();
@@ -91,10 +86,7 @@ public class KStreamForeachTest {
@Test
public void testTypeVariance() {
- final ForeachAction<Number, Object> consume = new ForeachAction<Number, Object>() {
- @Override
- public void apply(final Number key, final Object value) {}
- };
+ final ForeachAction<Number, Object> consume = (key, value) -> { };
new StreamsBuilder()
.<Integer, String>stream("emptyTopic")
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index f98df4c..802aa33 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -42,11 +42,14 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
public class KStreamGlobalKTableJoinTest {
+ private final static String[] EMPTY = new String[0];
+
private final String streamTopic = "streamTopic";
private final String globalTableTopic = "globalTableTopic";
+ private final int[] expectedKeys = {0, 1, 2, 3};
+
private TopologyTestDriver driver;
private MockProcessor<Integer, String> processor;
- private final int[] expectedKeys = {0, 1, 2, 3};
private StreamsBuilder builder;
@Before
@@ -83,7 +86,7 @@ public class KStreamGlobalKTableJoinTest {
private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
final ConsumerRecordFactory<Integer, String> recordFactory =
- new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer(), 0L);
+ new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer(), 0L, 1L);
for (int i = 0; i < messageCount; i++) {
String value = valuePrefix + expectedKeys[i];
if (includeForeignKey) {
@@ -123,7 +126,7 @@ public class KStreamGlobalKTableJoinTest {
// push two items to the primary stream. the globalTable is empty
pushToStream(2, "X", true);
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
}
@Test
@@ -132,32 +135,32 @@ public class KStreamGlobalKTableJoinTest {
// push two items to the primary stream. the globalTable is empty
pushToStream(2, "X", true);
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push two items to the globalTable. this should not produce any item.
pushToGlobalTable(2, "Y");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce two items.
pushToStream(4, "X", true);
- processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 1)");
// push all items to the globalTable. this should not produce any item
pushToGlobalTable(4, "YY");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X", true);
- processor.checkAndClearProcessResult("0:X0,FKey0+YY0 (ts: 0)", "1:X1,FKey1+YY1 (ts: 0)", "2:X2,FKey2+YY2 (ts: 0)", "3:X3,FKey3+YY3 (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0,FKey0+YY0 (ts: 0)", "1:X1,FKey1+YY1 (ts: 1)", "2:X2,FKey2+YY2 (ts: 2)", "3:X3,FKey3+YY3 (ts: 3)");
// push all items to the globalTable. this should not produce any item
pushToGlobalTable(4, "YYY");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
}
@Test
@@ -166,12 +169,12 @@ public class KStreamGlobalKTableJoinTest {
// push two items to the globalTable. this should not produce any item.
pushToGlobalTable(2, "Y");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce two items.
pushToStream(4, "X", true);
- processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 1)");
}
@@ -181,22 +184,22 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the globalTable. this should not produce any item.
pushToGlobalTable(4, "Y");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X", true);
- processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 0)", "2:X2,FKey2+Y2 (ts: 0)", "3:X3,FKey3+Y3 (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 1)", "2:X2,FKey2+Y2 (ts: 2)", "3:X3,FKey3+Y3 (ts: 3)");
// push two items with null to the globalTable as deletes. this should not produce any item.
pushNullValueToGlobalTable(2);
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce two items.
pushToStream(4, "XX", true);
- processor.checkAndClearProcessResult("2:XX2,FKey2+Y2 (ts: 0)", "3:XX3,FKey3+Y3 (ts: 0)");
+ processor.checkAndClearProcessResult("2:XX2,FKey2+Y2 (ts: 2)", "3:XX3,FKey3+Y3 (ts: 3)");
}
@Test
@@ -205,13 +208,13 @@ public class KStreamGlobalKTableJoinTest {
// push all items to the globalTable. this should not produce any item
pushToGlobalTable(4, "Y");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream with no foreign key, resulting in null keyMapper values.
// this should not produce any item.
pushToStream(4, "XXX", false);
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index b8925bc..a6361da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -42,15 +42,16 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
public class KStreamGlobalKTableLeftJoinTest {
- final private String streamTopic = "streamTopic";
- final private String globalTableTopic = "globalTableTopic";
+ private final static String[] EMPTY = new String[0];
+
+ private final String streamTopic = "streamTopic";
+ private final String globalTableTopic = "globalTableTopic";
+ private final int[] expectedKeys = {0, 1, 2, 3};
private MockProcessor<Integer, String> processor;
private TopologyTestDriver driver;
private StreamsBuilder builder;
- private final int[] expectedKeys = {0, 1, 2, 3};
-
@Before
public void setUp() {
@@ -85,7 +86,7 @@ public class KStreamGlobalKTableLeftJoinTest {
private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
final ConsumerRecordFactory<Integer, String> recordFactory =
- new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer(), 0L);
+ new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer(), 0L, 1L);
for (int i = 0; i < messageCount; i++) {
String value = valuePrefix + expectedKeys[i];
if (includeForeignKey) {
@@ -97,7 +98,7 @@ public class KStreamGlobalKTableLeftJoinTest {
private void pushToGlobalTable(final int messageCount, final String valuePrefix) {
final ConsumerRecordFactory<String, String> recordFactory =
- new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer(), 0L);
+ new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer(), 0L, 1L);
for (int i = 0; i < messageCount; i++) {
driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]));
}
@@ -105,7 +106,7 @@ public class KStreamGlobalKTableLeftJoinTest {
private void pushNullValueToGlobalTable(final int messageCount) {
final ConsumerRecordFactory<String, String> recordFactory =
- new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer(), 0L);
+ new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer(), 0L, 1L);
for (int i = 0; i < messageCount; i++) {
driver.pipeInput(recordFactory.create(globalTableTopic, "FKey" + expectedKeys[i], (String) null));
}
@@ -125,7 +126,7 @@ public class KStreamGlobalKTableLeftJoinTest {
// push two items to the primary stream. the globalTable is empty
pushToStream(2, "X", true);
- processor.checkAndClearProcessResult("0:X0,FKey0+null (ts: 0)", "1:X1,FKey1+null (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0,FKey0+null (ts: 0)", "1:X1,FKey1+null (ts: 1)");
}
@Test
@@ -134,32 +135,32 @@ public class KStreamGlobalKTableLeftJoinTest {
// push two items to the primary stream. the globalTable is empty
pushToStream(2, "X", true);
- processor.checkAndClearProcessResult("0:X0,FKey0+null (ts: 0)", "1:X1,FKey1+null (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0,FKey0+null (ts: 0)", "1:X1,FKey1+null (ts: 1)");
// push two items to the globalTable. this should not produce any item.
pushToGlobalTable(2, "Y");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X", true);
- processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 0)", "2:X2,FKey2+null (ts: 0)", "3:X3,FKey3+null (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 1)", "2:X2,FKey2+null (ts: 2)", "3:X3,FKey3+null (ts: 3)");
// push all items to the globalTable. this should not produce any item
pushToGlobalTable(4, "YY");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X", true);
- processor.checkAndClearProcessResult("0:X0,FKey0+YY0 (ts: 0)", "1:X1,FKey1+YY1 (ts: 0)", "2:X2,FKey2+YY2 (ts: 0)", "3:X3,FKey3+YY3 (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0,FKey0+YY0 (ts: 0)", "1:X1,FKey1+YY1 (ts: 1)", "2:X2,FKey2+YY2 (ts: 2)", "3:X3,FKey3+YY3 (ts: 3)");
// push all items to the globalTable. this should not produce any item
pushToGlobalTable(4, "YYY");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
}
@Test
@@ -168,12 +169,12 @@ public class KStreamGlobalKTableLeftJoinTest {
// push two items to the globalTable. this should not produce any item.
pushToGlobalTable(2, "Y");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X", true);
- processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 0)", "2:X2,FKey2+null (ts: 0)", "3:X3,FKey3+null (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 1)", "2:X2,FKey2+null (ts: 2)", "3:X3,FKey3+null (ts: 3)");
}
@@ -183,22 +184,22 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the globalTable. this should not produce any item.
pushToGlobalTable(4, "Y");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X", true);
- processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 0)", "2:X2,FKey2+Y2 (ts: 0)", "3:X3,FKey3+Y3 (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0,FKey0+Y0 (ts: 0)", "1:X1,FKey1+Y1 (ts: 1)", "2:X2,FKey2+Y2 (ts: 2)", "3:X3,FKey3+Y3 (ts: 3)");
// push two items with null to the globalTable as deletes. this should not produce any item.
pushNullValueToGlobalTable(2);
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "XX", true);
- processor.checkAndClearProcessResult("0:XX0,FKey0+null (ts: 0)", "1:XX1,FKey1+null (ts: 0)", "2:XX2,FKey2+Y2 (ts: 0)", "3:XX3,FKey3+Y3 (ts: 0)");
+ processor.checkAndClearProcessResult("0:XX0,FKey0+null (ts: 0)", "1:XX1,FKey1+null (ts: 1)", "2:XX2,FKey2+Y2 (ts: 2)", "3:XX3,FKey3+Y3 (ts: 3)");
}
@Test
@@ -207,13 +208,13 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all items to the globalTable. this should not produce any item
pushToGlobalTable(4, "Y");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream with no foreign key, resulting in null keyMapper values.
// this should produce four items.
pushToStream(4, "XXX", false);
- processor.checkAndClearProcessResult("0:XXX0+null (ts: 0)", "1:XXX1+null (ts: 0)", "2:XXX2+null (ts: 0)", "3:XXX3+null (ts: 0)");
+ processor.checkAndClearProcessResult("0:XXX0+null (ts: 0)", "1:XXX1+null (ts: 1)", "2:XXX2+null (ts: 2)", "3:XXX3+null (ts: 3)");
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 5a1b579..0508d7c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -695,17 +695,17 @@ public class KStreamImplTest {
merged.process(processorSupplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
- driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
- driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
- driver.pipeInput(recordFactory.create(topic3, "C", "cc"));
- driver.pipeInput(recordFactory.create(topic4, "D", "dd"));
- driver.pipeInput(recordFactory.create(topic4, "E", "ee"));
- driver.pipeInput(recordFactory.create(topic3, "F", "ff"));
- driver.pipeInput(recordFactory.create(topic2, "G", "gg"));
- driver.pipeInput(recordFactory.create(topic1, "H", "hh"));
+ driver.pipeInput(recordFactory.create(topic1, "A", "aa", 1L));
+ driver.pipeInput(recordFactory.create(topic2, "B", "bb", 9L));
+ driver.pipeInput(recordFactory.create(topic3, "C", "cc", 2L));
+ driver.pipeInput(recordFactory.create(topic4, "D", "dd", 8L));
+ driver.pipeInput(recordFactory.create(topic4, "E", "ee", 3L));
+ driver.pipeInput(recordFactory.create(topic3, "F", "ff", 7L));
+ driver.pipeInput(recordFactory.create(topic2, "G", "gg", 4L));
+ driver.pipeInput(recordFactory.create(topic1, "H", "hh", 6L));
}
- assertEquals(asList("A:aa (ts: 0)", "B:bb (ts: 0)", "C:cc (ts: 0)", "D:dd (ts: 0)", "E:ee (ts: 0)", "F:ff (ts: 0)", "G:gg (ts: 0)", "H:hh (ts: 0)"),
+ assertEquals(asList("A:aa (ts: 1)", "B:bb (ts: 9)", "C:cc (ts: 2)", "D:dd (ts: 8)", "E:ee (ts: 3)", "F:ff (ts: 7)", "G:gg (ts: 4)", "H:hh (ts: 6)"),
processorSupplier.theCapturedProcessor().processed);
}
@@ -716,14 +716,14 @@ public class KStreamImplTest {
pattern2Source.process(processorSupplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
- driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
- driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
- driver.pipeInput(recordFactory.create("topic-5", "C", "cc"));
- driver.pipeInput(recordFactory.create("topic-6", "D", "dd"));
- driver.pipeInput(recordFactory.create("topic-7", "E", "ee"));
+ driver.pipeInput(recordFactory.create("topic-3", "A", "aa", 1L));
+ driver.pipeInput(recordFactory.create("topic-4", "B", "bb", 5L));
+ driver.pipeInput(recordFactory.create("topic-5", "C", "cc", 10L));
+ driver.pipeInput(recordFactory.create("topic-6", "D", "dd", 8L));
+ driver.pipeInput(recordFactory.create("topic-7", "E", "ee", 3L));
}
- assertEquals(asList("A:aa (ts: 0)", "B:bb (ts: 0)", "C:cc (ts: 0)", "D:dd (ts: 0)", "E:ee (ts: 0)"),
+ assertEquals(asList("A:aa (ts: 1)", "B:bb (ts: 5)", "C:cc (ts: 10)", "D:dd (ts: 8)", "E:ee (ts: 3)"),
processorSupplier.theCapturedProcessor().processed);
}
@@ -739,14 +739,14 @@ public class KStreamImplTest {
merged.process(processorSupplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
- driver.pipeInput(recordFactory.create("topic-3", "A", "aa"));
- driver.pipeInput(recordFactory.create("topic-4", "B", "bb"));
- driver.pipeInput(recordFactory.create("topic-A", "C", "cc"));
- driver.pipeInput(recordFactory.create("topic-Z", "D", "dd"));
- driver.pipeInput(recordFactory.create(topic3, "E", "ee"));
+ driver.pipeInput(recordFactory.create("topic-3", "A", "aa", 1L));
+ driver.pipeInput(recordFactory.create("topic-4", "B", "bb", 5L));
+ driver.pipeInput(recordFactory.create("topic-A", "C", "cc", 10L));
+ driver.pipeInput(recordFactory.create("topic-Z", "D", "dd", 8L));
+ driver.pipeInput(recordFactory.create(topic3, "E", "ee", 3L));
}
- assertEquals(asList("A:aa (ts: 0)", "B:bb (ts: 0)", "C:cc (ts: 0)", "D:dd (ts: 0)", "E:ee (ts: 0)"),
+ assertEquals(asList("A:aa (ts: 1)", "B:bb (ts: 5)", "C:cc (ts: 10)", "D:dd (ts: 8)", "E:ee (ts: 3)"),
processorSupplier.theCapturedProcessor().processed);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index fbd2750..18d601a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -47,9 +47,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
public class KStreamKStreamJoinTest {
- final private String topic1 = "topic1";
- final private String topic2 = "topic2";
+ private final static String[] EMPTY = new String[0];
+ private final String topic1 = "topic1";
+ private final String topic2 = "topic2";
private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
private final ConsumerRecordFactory<Integer, String> recordFactory =
new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer(), 0L);
@@ -118,7 +119,7 @@ public class KStreamKStreamJoinTest {
for (int i = 0; i < 2; i++) {
driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "A" + expectedKeys[i]));
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push two items to the other stream; this should produce two items
// w1 = { 0:A0, 1:A1 }
@@ -300,7 +301,7 @@ public class KStreamKStreamJoinTest {
for (int i = 0; i < 2; i++) {
driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "A" + expectedKeys[i], time));
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push two items to the other stream; this should produce two items
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
@@ -322,7 +323,7 @@ public class KStreamKStreamJoinTest {
for (int i = 0; i < expectedKeys.length; i++) {
driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "B" + expectedKeys[i], time + i));
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push four items to the other stream with fixed larger timestamp; this should produce four items
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
@@ -412,7 +413,7 @@ public class KStreamKStreamJoinTest {
for (final int expectedKey : expectedKeys) {
driver.pipeInput(recordFactory.create(topic2, expectedKey, "f" + expectedKey, time));
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push four items to the other stream with timestamp before the window bound; this should produce no items
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
@@ -436,7 +437,7 @@ public class KStreamKStreamJoinTest {
for (final int expectedKey : expectedKeys) {
driver.pipeInput(recordFactory.create(topic2, expectedKey, "g" + expectedKey, time));
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push four items to the other stream with with incremented timestamp; this should produce one item
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0),
@@ -566,7 +567,7 @@ public class KStreamKStreamJoinTest {
for (int i = 0; i < expectedKeys.length; i++) {
driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "l" + expectedKeys[i], time + i));
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push four items with larger timestamps to the primary stream; this should produce four items
// w1 = {}
@@ -637,7 +638,7 @@ public class KStreamKStreamJoinTest {
for (final int expectedKey : expectedKeys) {
driver.pipeInput(recordFactory.create(topic1, expectedKey, "G" + expectedKey, time));
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push four items with smaller timestamps (before window) to the primary stream; this should produce no items
// w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
@@ -657,7 +658,7 @@ public class KStreamKStreamJoinTest {
for (final int expectedKey : expectedKeys) {
driver.pipeInput(recordFactory.create(topic1, expectedKey, "H" + expectedKey, time));
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push four items with increased timestamps to the primary stream; this should produce one item
// w1 = { 0:C0 (ts: 2100), 1:C1 (ts: 2100), 2:C2 (ts: 2100), 3:C3 (ts: 2100),
@@ -801,7 +802,7 @@ public class KStreamKStreamJoinTest {
for (int i = 0; i < expectedKeys.length; i++) {
driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "A" + expectedKeys[i], time + i));
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push four items smaller timestamps (out of window) to the secondary stream; this should produce no items
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -812,7 +813,7 @@ public class KStreamKStreamJoinTest {
for (final int expectedKey : expectedKeys) {
driver.pipeInput(recordFactory.create(topic2, expectedKey, "a" + expectedKey, time));
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push four items with increased timestamps to the secondary stream; this should produce one item
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -992,7 +993,7 @@ public class KStreamKStreamJoinTest {
for (final int expectedKey : expectedKeys) {
driver.pipeInput(recordFactory.create(topic2, expectedKey, "j" + expectedKey, time));
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
}
}
@@ -1035,7 +1036,7 @@ public class KStreamKStreamJoinTest {
for (int i = 0; i < expectedKeys.length; i++) {
driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "A" + expectedKeys[i], time + i));
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push four items with smaller timestamps (before the window) to the other stream; this should produce no items
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -1046,7 +1047,7 @@ public class KStreamKStreamJoinTest {
for (final int expectedKey : expectedKeys) {
driver.pipeInput(recordFactory.create(topic2, expectedKey, "a" + expectedKey, time));
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push four items with increased timestamp to the other stream; this should produce one item
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
@@ -1226,7 +1227,7 @@ public class KStreamKStreamJoinTest {
for (final int expectedKey : expectedKeys) {
driver.pipeInput(recordFactory.create(topic2, expectedKey, "j" + expectedKey, time));
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
}
}
-}
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index c3d62e9..af8ec8f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -43,9 +43,10 @@ import static java.time.Duration.ofMillis;
import static org.junit.Assert.assertEquals;
public class KStreamKStreamLeftJoinTest {
- final private String topic1 = "topic1";
- final private String topic2 = "topic2";
+ private final static String[] EMPTY = new String[0];
+ private final String topic1 = "topic1";
+ private final String topic2 = "topic2";
private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
private final ConsumerRecordFactory<Integer, String> recordFactory =
new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer(), 0L);
@@ -201,7 +202,7 @@ public class KStreamKStreamLeftJoinTest {
for (int i = 0; i < expectedKeys.length; i++) {
driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "b" + expectedKeys[i], time + i));
}
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push four items with larger timestamp to the primary stream; this should produce four full-join items
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index ef36b6e..e8d962a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -19,10 +19,10 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -39,6 +39,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
+import java.util.Random;
import java.util.Set;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
@@ -47,12 +48,12 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
public class KStreamKTableJoinTest {
+ private final static String[] EMPTY = new String[0];
+
private final String streamTopic = "streamTopic";
private final String tableTopic = "tableTopic";
-
private final ConsumerRecordFactory<Integer, String> recordFactory =
new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer(), 0L);
-
private final int[] expectedKeys = {0, 1, 2, 3};
private MockProcessor<Integer, String> processor;
@@ -85,13 +86,18 @@ public class KStreamKTableJoinTest {
private void pushToStream(final int messageCount, final String valuePrefix) {
for (int i = 0; i < messageCount; i++) {
- driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i], i));
}
}
private void pushToTable(final int messageCount, final String valuePrefix) {
+ final Random r = new Random(System.currentTimeMillis());
for (int i = 0; i < messageCount; i++) {
- driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(
+ tableTopic,
+ expectedKeys[i],
+ valuePrefix + expectedKeys[i],
+ r.nextInt(Integer.MAX_VALUE)));
}
}
@@ -114,45 +120,45 @@ public class KStreamKTableJoinTest {
public void shouldNotJoinWithEmptyTableOnStreamUpdates() {
// push two items to the primary stream. the table is empty
pushToStream(2, "X");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
}
@Test
public void shouldNotJoinOnTableUpdates() {
// push two items to the primary stream. the table is empty
pushToStream(2, "X");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push two items to the table. this should not produce any item.
pushToTable(2, "Y");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce two items.
pushToStream(4, "X");
- processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 1)");
// push all items to the table. this should not produce any item
pushToTable(4, "YY");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X");
- processor.checkAndClearProcessResult("0:X0+YY0 (ts: 0)", "1:X1+YY1 (ts: 0)", "2:X2+YY2 (ts: 0)", "3:X3+YY3 (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0+YY0 (ts: 0)", "1:X1+YY1 (ts: 1)", "2:X2+YY2 (ts: 2)", "3:X3+YY3 (ts: 3)");
// push all items to the table. this should not produce any item
pushToTable(4, "YYY");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
}
@Test
public void shouldJoinOnlyIfMatchFoundOnStreamUpdates() {
// push two items to the table. this should not produce any item.
pushToTable(2, "Y");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce two items.
pushToStream(4, "X");
- processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 1)");
}
@@ -160,19 +166,19 @@ public class KStreamKTableJoinTest {
public void shouldClearTableEntryOnNullValueUpdates() {
// push all four items to the table. this should not produce any item.
pushToTable(4, "Y");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X");
- processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)", "2:X2+Y2 (ts: 0)", "3:X3+Y3 (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 1)", "2:X2+Y2 (ts: 2)", "3:X3+Y3 (ts: 3)");
// push two items with null to the table as deletes. this should not produce any item.
pushNullValueToTable();
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce two items.
pushToStream(4, "XX");
- processor.checkAndClearProcessResult("2:XX2+Y2 (ts: 0)", "3:XX3+Y3 (ts: 0)");
+ processor.checkAndClearProcessResult("2:XX2+Y2 (ts: 2)", "3:XX3+Y3 (ts: 3)");
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index e04da17..b83e53c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -38,20 +38,22 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
+import java.util.Random;
import java.util.Set;
import static org.junit.Assert.assertEquals;
public class KStreamKTableLeftJoinTest {
- final private String streamTopic = "streamTopic";
- final private String tableTopic = "tableTopic";
+ private final static String[] EMPTY = new String[0];
+ private final String streamTopic = "streamTopic";
+ private final String tableTopic = "tableTopic";
private final ConsumerRecordFactory<Integer, String> recordFactory =
new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer(), 0L);
+ private final int[] expectedKeys = {0, 1, 2, 3};
+
private TopologyTestDriver driver;
private MockProcessor<Integer, String> processor;
-
- private final int[] expectedKeys = {0, 1, 2, 3};
private StreamsBuilder builder;
@Before
@@ -80,13 +82,18 @@ public class KStreamKTableLeftJoinTest {
private void pushToStream(final int messageCount, final String valuePrefix) {
for (int i = 0; i < messageCount; i++) {
- driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(streamTopic, expectedKeys[i], valuePrefix + expectedKeys[i], i));
}
}
private void pushToTable(final int messageCount, final String valuePrefix) {
+ final Random r = new Random(System.currentTimeMillis());
for (int i = 0; i < messageCount; i++) {
- driver.pipeInput(recordFactory.create(tableTopic, expectedKeys[i], valuePrefix + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(
+ tableTopic,
+ expectedKeys[i],
+ valuePrefix + expectedKeys[i],
+ r.nextInt(Integer.MAX_VALUE)));
}
}
@@ -109,45 +116,45 @@ public class KStreamKTableLeftJoinTest {
public void shouldJoinWithEmptyTableOnStreamUpdates() {
// push two items to the primary stream. the table is empty
pushToStream(2, "X");
- processor.checkAndClearProcessResult("0:X0+null (ts: 0)", "1:X1+null (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0+null (ts: 0)", "1:X1+null (ts: 1)");
}
@Test
public void shouldNotJoinOnTableUpdates() {
// push two items to the primary stream. the table is empty
pushToStream(2, "X");
- processor.checkAndClearProcessResult("0:X0+null (ts: 0)", "1:X1+null (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0+null (ts: 0)", "1:X1+null (ts: 1)");
// push two items to the table. this should not produce any item.
pushToTable(2, "Y");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X");
- processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)", "2:X2+null (ts: 0)", "3:X3+null (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 1)", "2:X2+null (ts: 2)", "3:X3+null (ts: 3)");
// push all items to the table. this should not produce any item
pushToTable(4, "YY");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X");
- processor.checkAndClearProcessResult("0:X0+YY0 (ts: 0)", "1:X1+YY1 (ts: 0)", "2:X2+YY2 (ts: 0)", "3:X3+YY3 (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0+YY0 (ts: 0)", "1:X1+YY1 (ts: 1)", "2:X2+YY2 (ts: 2)", "3:X3+YY3 (ts: 3)");
// push all items to the table. this should not produce any item
pushToTable(4, "YYY");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
}
@Test
public void shouldJoinRegardlessIfMatchFoundOnStreamUpdates() {
// push two items to the table. this should not produce any item.
pushToTable(2, "Y");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X");
- processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)", "2:X2+null (ts: 0)", "3:X3+null (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 1)", "2:X2+null (ts: 2)", "3:X3+null (ts: 3)");
}
@@ -155,19 +162,19 @@ public class KStreamKTableLeftJoinTest {
public void shouldClearTableEntryOnNullValueUpdates() {
// push all four items to the table. this should not produce any item.
pushToTable(4, "Y");
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X");
- processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 0)", "2:X2+Y2 (ts: 0)", "3:X3+Y3 (ts: 0)");
+ processor.checkAndClearProcessResult("0:X0+Y0 (ts: 0)", "1:X1+Y1 (ts: 1)", "2:X2+Y2 (ts: 2)", "3:X3+Y3 (ts: 3)");
// push two items with null to the table as deletes. this should not produce any item.
pushNullValueToTable(2);
- processor.checkAndClearProcessResult();
+ processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "XX");
- processor.checkAndClearProcessResult("0:XX0+null (ts: 0)", "1:XX1+null (ts: 0)", "2:XX2+Y2 (ts: 0)", "3:XX3+Y3 (ts: 0)");
+ processor.checkAndClearProcessResult("0:XX0+null (ts: 0)", "1:XX1+null (ts: 1)", "2:XX2+Y2 (ts: 2)", "3:XX3+Y3 (ts: 3)");
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 7ea570d..3a346d3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -50,11 +50,11 @@ public class KStreamMapTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey));
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey, 10L - expectedKey));
}
}
- final String[] expected = new String[]{"V0:0 (ts: 0)", "V1:1 (ts: 0)", "V2:2 (ts: 0)", "V3:3 (ts: 0)"};
+ final String[] expected = new String[]{"V0:0 (ts: 10)", "V1:1 (ts: 9)", "V2:2 (ts: 8)", "V3:3 (ts: 7)"};
assertEquals(4, supplier.theCapturedProcessor().processed.size());
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index ab9e68e..3813495 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -51,10 +51,10 @@ public class KStreamMapValuesTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey), expectedKey / 2L));
}
}
- final String[] expected = {"1:1 (ts: 0)", "10:2 (ts: 0)", "100:3 (ts: 0)", "1000:4 (ts: 0)"};
+ final String[] expected = {"1:1 (ts: 0)", "10:2 (ts: 5)", "100:3 (ts: 50)", "1000:4 (ts: 500)"};
assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
}
@@ -72,10 +72,10 @@ public class KStreamMapValuesTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey)));
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, Integer.toString(expectedKey), expectedKey / 2L));
}
}
- final String[] expected = {"1:2 (ts: 0)", "10:12 (ts: 0)", "100:103 (ts: 0)", "1000:1004 (ts: 0)"};
+ final String[] expected = {"1:2 (ts: 0)", "10:12 (ts: 5)", "100:103 (ts: 50)", "1000:1004 (ts: 500)"};
assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
index 780bcae..4b7b04d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
@@ -75,11 +75,6 @@ public class KStreamPeekTest {
}
private static <K, V> ForeachAction<K, V> collect(final List<KeyValue<K, V>> into) {
- return new ForeachAction<K, V>() {
- @Override
- public void apply(final K key, final V value) {
- into.add(new KeyValue<>(key, value));
- }
- };
+ return (key, value) -> into.add(new KeyValue<>(key, value));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 2ea7700..0dfd8ad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -299,8 +299,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
context.setTime(GAP_MS + 1);
processor.process("a", "1");
processor.process("a", "2");
- final long t0 = getter.get(new Windowed<>("a", new SessionWindow(0, 0)));
- final long t1 = getter.get(new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1)));
+ final long t0 = getter.get(new Windowed<>("a", new SessionWindow(0, 0))).value();
+ final long t1 = getter.get(new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1))).value();
assertEquals(1L, t0);
assertEquals(2L, t1);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index de60fb9..8f87d40 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -77,6 +77,7 @@ public class KStreamTransformTest {
kstreamDriver.setUp(builder);
for (final int expectedKey : expectedKeys) {
+ kstreamDriver.setTime(expectedKey / 2L);
kstreamDriver.process(topicName, expectedKey, expectedKey * 10);
}
@@ -88,7 +89,7 @@ public class KStreamTransformTest {
//String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"};
- final String[] expected = {"2:10 (ts: 0)", "20:110 (ts: 0)", "200:1110 (ts: 0)", "2000:11110 (ts: 0)"};
+ final String[] expected = {"2:10 (ts: 0)", "20:110 (ts: 5)", "200:1110 (ts: 50)", "2000:11110 (ts: 500)"};
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], processor.theCapturedProcessor().processed.get(i));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index a3ce830..a786166 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -84,10 +84,10 @@ public class KStreamTransformValuesTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, expectedKey / 2L));
}
}
- final String[] expected = {"1:10 (ts: 0)", "10:110 (ts: 0)", "100:1110 (ts: 0)", "1000:11110 (ts: 0)"};
+ final String[] expected = {"1:10 (ts: 0)", "10:110 (ts: 5)", "100:1110 (ts: 50)", "1000:11110 (ts: 500)"};
assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
}
@@ -121,10 +121,10 @@ public class KStreamTransformValuesTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, 0L));
+ driver.pipeInput(recordFactory.create(topicName, expectedKey, expectedKey * 10, expectedKey / 2L));
}
}
- final String[] expected = {"1:11 (ts: 0)", "10:121 (ts: 0)", "100:1221 (ts: 0)", "1000:12221 (ts: 0)"};
+ final String[] expected = {"1:11 (ts: 0)", "10:121 (ts: 5)", "100:1221 (ts: 50)", "1000:12221 (ts: 500)"};
assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 0196c41..1c7abb6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -91,11 +91,16 @@ public class KStreamWindowAggregateTest {
driver.pipeInput(recordFactory.create(topic1, "D", "4", 7L));
driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L));
driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
+
driver.pipeInput(recordFactory.create(topic1, "A", "1", 10L));
driver.pipeInput(recordFactory.create(topic1, "B", "2", 11L));
driver.pipeInput(recordFactory.create(topic1, "D", "4", 12L));
driver.pipeInput(recordFactory.create(topic1, "B", "2", 13L));
driver.pipeInput(recordFactory.create(topic1, "C", "3", 14L));
+
+ driver.pipeInput(recordFactory.create(topic1, "B", "1", 3L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 2L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "3", 9L));
}
assertEquals(
@@ -116,8 +121,14 @@ public class KStreamWindowAggregateTest {
"[B@5/15]:0+2+2+2 (ts: 11)", "[B@10/20]:0+2 (ts: 11)",
"[D@5/15]:0+4+4 (ts: 12)", "[D@10/20]:0+4 (ts: 12)",
"[B@5/15]:0+2+2+2+2 (ts: 13)", "[B@10/20]:0+2+2 (ts: 13)",
- "[C@5/15]:0+3+3 (ts: 14)", "[C@10/20]:0+3 (ts: 14)"
- ),
+ "[C@5/15]:0+3+3 (ts: 14)", "[C@10/20]:0+3 (ts: 14)",
+
+ "[B@0/10]:0+2+2+2+1 (ts: 8)",
+ "[B@0/10]:0+2+2+2+1+2 (ts: 8)",
+ "[B@0/10]:0+2+2+2+1+2+3 (ts: 9)",
+ "[B@5/15]:0+2+2+2+2+3 (ts: 13)"
+
+ ),
supplier.theCapturedProcessor().processed
);
}
@@ -151,7 +162,7 @@ public class KStreamWindowAggregateTest {
driver.pipeInput(recordFactory.create(topic1, "B", "2", 1L));
driver.pipeInput(recordFactory.create(topic1, "C", "3", 2L));
driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L));
- driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L));
+ driver.pipeInput(recordFactory.create(topic1, "A", "1", 9L));
final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
@@ -160,10 +171,11 @@ public class KStreamWindowAggregateTest {
"[B@0/10]:0+2 (ts: 1)",
"[C@0/10]:0+3 (ts: 2)",
"[D@0/10]:0+4 (ts: 3)",
- "[A@0/10]:0+1+1 (ts: 4)"
+ "[A@0/10]:0+1+1 (ts: 9)",
+ "[A@5/15]:0+1 (ts: 9)"
);
- processors.get(1).checkAndClearProcessResult();
- processors.get(2).checkAndClearProcessResult();
+ processors.get(1).checkAndClearProcessResult(new String[0]);
+ processors.get(2).checkAndClearProcessResult(new String[0]);
driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L));
@@ -172,56 +184,54 @@ public class KStreamWindowAggregateTest {
driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
processors.get(0).checkAndClearProcessResult(
- "[A@0/10]:0+1+1+1 (ts: 5)", "[A@5/15]:0+1 (ts: 5)",
+ "[A@0/10]:0+1+1+1 (ts: 9)", "[A@5/15]:0+1+1 (ts: 9)",
"[B@0/10]:0+2+2 (ts: 6)", "[B@5/15]:0+2 (ts: 6)",
"[D@0/10]:0+4+4 (ts: 7)", "[D@5/15]:0+4 (ts: 7)",
"[B@0/10]:0+2+2+2 (ts: 8)", "[B@5/15]:0+2+2 (ts: 8)",
"[C@0/10]:0+3+3 (ts: 9)", "[C@5/15]:0+3 (ts: 9)"
);
- processors.get(1).checkAndClearProcessResult();
- processors.get(2).checkAndClearProcessResult();
+ processors.get(1).checkAndClearProcessResult(new String[0]);
+ processors.get(2).checkAndClearProcessResult(new String[0]);
driver.pipeInput(recordFactory.create(topic2, "A", "a", 0L));
driver.pipeInput(recordFactory.create(topic2, "B", "b", 1L));
driver.pipeInput(recordFactory.create(topic2, "C", "c", 2L));
- driver.pipeInput(recordFactory.create(topic2, "D", "d", 3L));
- driver.pipeInput(recordFactory.create(topic2, "A", "a", 4L));
+ driver.pipeInput(recordFactory.create(topic2, "D", "d", 20L));
+ driver.pipeInput(recordFactory.create(topic2, "A", "a", 20L));
- processors.get(0).checkAndClearProcessResult();
+ processors.get(0).checkAndClearProcessResult(new String[0]);
processors.get(1).checkAndClearProcessResult(
"[A@0/10]:0+a (ts: 0)",
"[B@0/10]:0+b (ts: 1)",
"[C@0/10]:0+c (ts: 2)",
- "[D@0/10]:0+d (ts: 3)",
- "[A@0/10]:0+a+a (ts: 4)"
+ "[D@15/25]:0+d (ts: 20)",
+ "[D@20/30]:0+d (ts: 20)",
+ "[A@15/25]:0+a (ts: 20)",
+ "[A@20/30]:0+a (ts: 20)"
);
processors.get(2).checkAndClearProcessResult(
- "[A@0/10]:0+1+1+1%0+a (ts: 0)",
- "[B@0/10]:0+2+2+2%0+b (ts: 1)",
- "[C@0/10]:0+3+3%0+c (ts: 2)",
- "[D@0/10]:0+4+4%0+d (ts: 3)",
- "[A@0/10]:0+1+1+1%0+a+a (ts: 4)");
+ "[A@0/10]:0+1+1+1%0+a (ts: 9)",
+ "[B@0/10]:0+2+2+2%0+b (ts: 8)",
+ "[C@0/10]:0+3+3%0+c (ts: 9)");
driver.pipeInput(recordFactory.create(topic2, "A", "a", 5L));
driver.pipeInput(recordFactory.create(topic2, "B", "b", 6L));
driver.pipeInput(recordFactory.create(topic2, "D", "d", 7L));
- driver.pipeInput(recordFactory.create(topic2, "B", "b", 8L));
- driver.pipeInput(recordFactory.create(topic2, "C", "c", 9L));
+ driver.pipeInput(recordFactory.create(topic2, "D", "d", 18L));
+ driver.pipeInput(recordFactory.create(topic2, "A", "a", 21L));
- processors.get(0).checkAndClearProcessResult();
+ processors.get(0).checkAndClearProcessResult(new String[0]);
processors.get(1).checkAndClearProcessResult(
- "[A@0/10]:0+a+a+a (ts: 5)", "[A@5/15]:0+a (ts: 5)",
+ "[A@0/10]:0+a+a (ts: 5)", "[A@5/15]:0+a (ts: 5)",
"[B@0/10]:0+b+b (ts: 6)", "[B@5/15]:0+b (ts: 6)",
- "[D@0/10]:0+d+d (ts: 7)", "[D@5/15]:0+d (ts: 7)",
- "[B@0/10]:0+b+b+b (ts: 8)", "[B@5/15]:0+b+b (ts: 8)",
- "[C@0/10]:0+c+c (ts: 9)", "[C@5/15]:0+c (ts: 9)"
+ "[D@0/10]:0+d (ts: 7)", "[D@5/15]:0+d (ts: 7)",
+ "[D@10/20]:0+d (ts: 18)", "[D@15/25]:0+d+d (ts: 20)",
+ "[A@15/25]:0+a+a (ts: 21)", "[A@20/30]:0+a+a (ts: 21)"
);
processors.get(2).checkAndClearProcessResult(
- "[A@0/10]:0+1+1+1%0+a+a+a (ts: 5)", "[A@5/15]:0+1%0+a (ts: 5)",
- "[B@0/10]:0+2+2+2%0+b+b (ts: 6)", "[B@5/15]:0+2+2%0+b (ts: 6)",
- "[D@0/10]:0+4+4%0+d+d (ts: 7)", "[D@5/15]:0+4%0+d (ts: 7)",
- "[B@0/10]:0+2+2+2%0+b+b+b (ts: 8)", "[B@5/15]:0+2+2%0+b+b (ts: 8)",
- "[C@0/10]:0+3+3%0+c+c (ts: 9)", "[C@5/15]:0+3%0+c (ts: 9)"
+ "[A@0/10]:0+1+1+1%0+a+a (ts: 9)", "[A@5/15]:0+1+1%0+a (ts: 9)",
+ "[B@0/10]:0+2+2+2%0+b+b (ts: 8)", "[B@5/15]:0+2+2%0+b (ts: 8)",
+ "[D@0/10]:0+4+4%0+d (ts: 7)", "[D@5/15]:0+4%0+d (ts: 7)"
);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 87360a1..6144051 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -85,33 +85,41 @@ public class KTableAggregateTest {
driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
+ driver.setTime(10L);
driver.process(topic1, "A", "1");
driver.flushState();
+ driver.setTime(15L);
driver.process(topic1, "B", "2");
driver.flushState();
+ driver.setTime(20L);
driver.process(topic1, "A", "3");
driver.flushState();
+ driver.setTime(18L);
driver.process(topic1, "B", "4");
driver.flushState();
+ driver.setTime(5L);
driver.process(topic1, "C", "5");
driver.flushState();
+ driver.setTime(25L);
driver.process(topic1, "D", "6");
driver.flushState();
+ driver.setTime(15L);
driver.process(topic1, "B", "7");
driver.flushState();
+ driver.setTime(10L);
driver.process(topic1, "C", "8");
driver.flushState();
assertEquals(
asList(
- "A:0+1 (ts: 0)",
- "B:0+2 (ts: 0)",
- "A:0+1-1+3 (ts: 0)",
- "B:0+2-2+4 (ts: 0)",
- "C:0+5 (ts: 0)",
- "D:0+6 (ts: 0)",
- "B:0+2-2+4-4+7 (ts: 0)",
- "C:0+5-5+8 (ts: 0)"),
+ "A:0+1 (ts: 10)",
+ "B:0+2 (ts: 15)",
+ "A:0+1-1+3 (ts: 20)",
+ "B:0+2-2+4 (ts: 18)",
+ "C:0+5 (ts: 5)",
+ "D:0+6 (ts: 25)",
+ "B:0+2-2+4-4+7 (ts: 18)",
+ "C:0+5-5+8 (ts: 10)"),
supplier.theCapturedProcessor().processed);
}
@@ -136,12 +144,15 @@ public class KTableAggregateTest {
driver.setUp(builder, stateDir);
+ driver.setTime(10L);
driver.process(topic1, "A", "1");
+ driver.setTime(20L);
driver.process(topic1, "A", "3");
+ driver.setTime(15L);
driver.process(topic1, "A", "4");
driver.flushState();
- assertEquals(Collections.singletonList("A:0+4 (ts: 0)"), supplier.theCapturedProcessor().processed);
+ assertEquals(Collections.singletonList("A:0+4 (ts: 15)"), supplier.theCapturedProcessor().processed);
}
@Test
@@ -174,33 +185,41 @@ public class KTableAggregateTest {
driver.setUp(builder, stateDir);
+ driver.setTime(10L);
driver.process(topic1, "A", "1");
driver.flushState();
+ driver.setTime(15L);
driver.process(topic1, "A", null);
driver.flushState();
+ driver.setTime(12L);
driver.process(topic1, "A", "1");
driver.flushState();
+ driver.setTime(20L);
driver.process(topic1, "B", "2");
driver.flushState();
+ driver.setTime(25L);
driver.process(topic1, "null", "3");
driver.flushState();
+ driver.setTime(23L);
driver.process(topic1, "B", "4");
driver.flushState();
+ driver.setTime(24L);
driver.process(topic1, "NULL", "5");
driver.flushState();
+ driver.setTime(22L);
driver.process(topic1, "B", "7");
driver.flushState();
assertEquals(
asList(
- "1:0+1 (ts: 0)",
- "1:0+1-1 (ts: 0)",
- "1:0+1-1+1 (ts: 0)",
- "2:0+2 (ts: 0)",
+ "1:0+1 (ts: 10)",
+ "1:0+1-1 (ts: 15)",
+ "1:0+1-1+1 (ts: 15)",
+ "2:0+2 (ts: 20)",
//noop
- "2:0+2-2 (ts: 0)", "4:0+4 (ts: 0)",
+ "2:0+2-2 (ts: 23)", "4:0+4 (ts: 23)",
//noop
- "4:0+4-4 (ts: 0)", "7:0+7 (ts: 0)"),
+ "4:0+4-4 (ts: 23)", "7:0+7 (ts: 22)"),
supplier.theCapturedProcessor().processed);
}
@@ -209,25 +228,30 @@ public class KTableAggregateTest {
final MockProcessorSupplier<String, Object> supplier) {
driver.setUp(builder, stateDir);
+ driver.setTime(10L);
driver.process(input, "A", "green");
driver.flushState();
+ driver.setTime(9L);
driver.process(input, "B", "green");
driver.flushState();
+ driver.setTime(12L);
driver.process(input, "A", "blue");
driver.flushState();
+ driver.setTime(15L);
driver.process(input, "C", "yellow");
driver.flushState();
+ driver.setTime(11L);
driver.process(input, "D", "green");
driver.flushState();
driver.flushState();
assertEquals(
asList(
- "green:1 (ts: 0)",
- "green:2 (ts: 0)",
- "green:1 (ts: 0)", "blue:1 (ts: 0)",
- "yellow:1 (ts: 0)",
- "green:2 (ts: 0)"),
+ "green:1 (ts: 10)",
+ "green:2 (ts: 10)",
+ "green:1 (ts: 12)", "blue:1 (ts: 12)",
+ "yellow:1 (ts: 15)",
+ "green:2 (ts: 12)"),
supplier.theCapturedProcessor().processed);
}
@@ -278,18 +302,23 @@ public class KTableAggregateTest {
final MockProcessor<String, Long> proc = supplier.theCapturedProcessor();
+ driver.setTime(10L);
driver.process(input, "A", "green");
+ driver.setTime(8L);
driver.process(input, "B", "green");
+ driver.setTime(9L);
driver.process(input, "A", "blue");
+ driver.setTime(10L);
driver.process(input, "C", "yellow");
+ driver.setTime(15L);
driver.process(input, "D", "green");
driver.flushState();
assertEquals(
asList(
- "blue:1 (ts: 0)",
- "yellow:1 (ts: 0)",
- "green:2 (ts: 0)"),
+ "blue:1 (ts: 9)",
+ "yellow:1 (ts: 10)",
+ "green:2 (ts: 15)"),
proc.processed);
}
@@ -319,21 +348,25 @@ public class KTableAggregateTest {
final MockProcessor<String, String> proc = supplier.theCapturedProcessor();
+ driver.setTime(10L);
driver.process(input, "11", "A");
driver.flushState();
+ driver.setTime(8L);
driver.process(input, "12", "B");
driver.flushState();
+ driver.setTime(12L);
driver.process(input, "11", null);
driver.flushState();
+ driver.setTime(6L);
driver.process(input, "12", "C");
driver.flushState();
assertEquals(
asList(
- "1:1 (ts: 0)",
- "1:12 (ts: 0)",
- "1:2 (ts: 0)",
- "1:2 (ts: 0)"),
+ "1:1 (ts: 10)",
+ "1:12 (ts: 10)",
+ "1:2 (ts: 12)",
+ "1:2 (ts: 12)"),
proc.processed);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 5904cd3..6996843 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessor;
@@ -69,18 +70,18 @@ public class KTableFilterTest {
table3.toStream().process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
- driver.pipeInput(recordFactory.create(topic, "A", 1));
- driver.pipeInput(recordFactory.create(topic, "B", 2));
- driver.pipeInput(recordFactory.create(topic, "C", 3));
- driver.pipeInput(recordFactory.create(topic, "D", 4));
- driver.pipeInput(recordFactory.create(topic, "A", null));
- driver.pipeInput(recordFactory.create(topic, "B", null));
+ driver.pipeInput(recordFactory.create(topic, "A", 1, 10L));
+ driver.pipeInput(recordFactory.create(topic, "B", 2, 5L));
+ driver.pipeInput(recordFactory.create(topic, "C", 3, 8L));
+ driver.pipeInput(recordFactory.create(topic, "D", 4, 14L));
+ driver.pipeInput(recordFactory.create(topic, "A", null, 18L));
+ driver.pipeInput(recordFactory.create(topic, "B", null, 15L));
}
final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
- processors.get(0).checkAndClearProcessResult("A:null (ts: 0)", "B:2 (ts: 0)", "C:null (ts: 0)", "D:4 (ts: 0)", "A:null (ts: 0)", "B:null (ts: 0)");
- processors.get(1).checkAndClearProcessResult("A:1 (ts: 0)", "B:null (ts: 0)", "C:3 (ts: 0)", "D:null (ts: 0)", "A:null (ts: 0)", "B:null (ts: 0)");
+ processors.get(0).checkAndClearProcessResult("A:null (ts: 10)", "B:2 (ts: 5)", "C:null (ts: 8)", "D:4 (ts: 14)", "A:null (ts: 18)", "B:null (ts: 15)");
+ processors.get(1).checkAndClearProcessResult("A:1 (ts: 10)", "B:null (ts: 5)", "C:3 (ts: 8)", "D:null (ts: 14)", "A:null (ts: 18)", "B:null (ts: 15)");
}
@Test
@@ -136,41 +137,41 @@ public class KTableFilterTest {
getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
- driver.pipeInput(recordFactory.create(topic1, "A", 1));
- driver.pipeInput(recordFactory.create(topic1, "B", 1));
- driver.pipeInput(recordFactory.create(topic1, "C", 1));
+ driver.pipeInput(recordFactory.create(topic1, "A", 1, 5L));
+ driver.pipeInput(recordFactory.create(topic1, "B", 1, 10L));
+ driver.pipeInput(recordFactory.create(topic1, "C", 1, 15L));
assertNull(getter2.get("A"));
assertNull(getter2.get("B"));
assertNull(getter2.get("C"));
- assertEquals(1, (int) getter3.get("A"));
- assertEquals(1, (int) getter3.get("B"));
- assertEquals(1, (int) getter3.get("C"));
+ assertEquals(ValueAndTimestamp.make(1, 5L), getter3.get("A"));
+ assertEquals(ValueAndTimestamp.make(1, 10L), getter3.get("B"));
+ assertEquals(ValueAndTimestamp.make(1, 15L), getter3.get("C"));
- driver.pipeInput(recordFactory.create(topic1, "A", 2));
- driver.pipeInput(recordFactory.create(topic1, "B", 2));
+ driver.pipeInput(recordFactory.create(topic1, "A", 2, 10L));
+ driver.pipeInput(recordFactory.create(topic1, "B", 2, 5L));
- assertEquals(2, (int) getter2.get("A"));
- assertEquals(2, (int) getter2.get("B"));
+ assertEquals(ValueAndTimestamp.make(2, 10L), getter2.get("A"));
+ assertEquals(ValueAndTimestamp.make(2, 5L), getter2.get("B"));
assertNull(getter2.get("C"));
assertNull(getter3.get("A"));
assertNull(getter3.get("B"));
- assertEquals(1, (int) getter3.get("C"));
+ assertEquals(ValueAndTimestamp.make(1, 15L), getter3.get("C"));
- driver.pipeInput(recordFactory.create(topic1, "A", 3));
+ driver.pipeInput(recordFactory.create(topic1, "A", 3, 15L));
assertNull(getter2.get("A"));
- assertEquals(2, (int) getter2.get("B"));
+ assertEquals(ValueAndTimestamp.make(2, 5L), getter2.get("B"));
assertNull(getter2.get("C"));
- assertEquals(3, (int) getter3.get("A"));
+ assertEquals(ValueAndTimestamp.make(3, 15L), getter3.get("A"));
assertNull(getter3.get("B"));
- assertEquals(1, (int) getter3.get("C"));
+ assertEquals(ValueAndTimestamp.make(1, 15L), getter3.get("C"));
- driver.pipeInput(recordFactory.create(topic1, "A", null));
- driver.pipeInput(recordFactory.create(topic1, "B", null));
+ driver.pipeInput(recordFactory.create(topic1, "A", null, 10L));
+ driver.pipeInput(recordFactory.create(topic1, "B", null, 20L));
assertNull(getter2.get("A"));
assertNull(getter2.get("B"));
@@ -178,7 +179,7 @@ public class KTableFilterTest {
assertNull(getter3.get("A"));
assertNull(getter3.get("B"));
- assertEquals(1, (int) getter3.get("C"));
+ assertEquals(ValueAndTimestamp.make(1, 15L), getter3.get("C"));
}
}
@@ -215,31 +216,31 @@ public class KTableFilterTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
- driver.pipeInput(recordFactory.create(topic1, "A", 1));
- driver.pipeInput(recordFactory.create(topic1, "B", 1));
- driver.pipeInput(recordFactory.create(topic1, "C", 1));
+ driver.pipeInput(recordFactory.create(topic1, "A", 1, 5L));
+ driver.pipeInput(recordFactory.create(topic1, "B", 1, 10L));
+ driver.pipeInput(recordFactory.create(topic1, "C", 1, 15L));
final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
- processors.get(0).checkAndClearProcessResult("A:(1<-null) (ts: 0)", "B:(1<-null) (ts: 0)", "C:(1<-null) (ts: 0)");
- processors.get(1).checkAndClearProcessResult("A:(null<-null) (ts: 0)", "B:(null<-null) (ts: 0)", "C:(null<-null) (ts: 0)");
+ processors.get(0).checkAndClearProcessResult("A:(1<-null) (ts: 5)", "B:(1<-null) (ts: 10)", "C:(1<-null) (ts: 15)");
+ processors.get(1).checkAndClearProcessResult("A:(null<-null) (ts: 5)", "B:(null<-null) (ts: 10)", "C:(null<-null) (ts: 15)");
- driver.pipeInput(recordFactory.create(topic1, "A", 2));
- driver.pipeInput(recordFactory.create(topic1, "B", 2));
+ driver.pipeInput(recordFactory.create(topic1, "A", 2, 15L));
+ driver.pipeInput(recordFactory.create(topic1, "B", 2, 8L));
- processors.get(0).checkAndClearProcessResult("A:(2<-null) (ts: 0)", "B:(2<-null) (ts: 0)");
- processors.get(1).checkAndClearProcessResult("A:(2<-null) (ts: 0)", "B:(2<-null) (ts: 0)");
+ processors.get(0).checkAndClearProcessResult("A:(2<-null) (ts: 15)", "B:(2<-null) (ts: 8)");
+ processors.get(1).checkAndClearProcessResult("A:(2<-null) (ts: 15)", "B:(2<-null) (ts: 8)");
- driver.pipeInput(recordFactory.create(topic1, "A", 3));
+ driver.pipeInput(recordFactory.create(topic1, "A", 3, 20L));
- processors.get(0).checkAndClearProcessResult("A:(3<-null) (ts: 0)");
- processors.get(1).checkAndClearProcessResult("A:(null<-null) (ts: 0)");
+ processors.get(0).checkAndClearProcessResult("A:(3<-null) (ts: 20)");
+ processors.get(1).checkAndClearProcessResult("A:(null<-null) (ts: 20)");
- driver.pipeInput(recordFactory.create(topic1, "A", null));
- driver.pipeInput(recordFactory.create(topic1, "B", null));
+ driver.pipeInput(recordFactory.create(topic1, "A", null, 10L));
+ driver.pipeInput(recordFactory.create(topic1, "B", null, 20L));
- processors.get(0).checkAndClearProcessResult("A:(null<-null) (ts: 0)", "B:(null<-null) (ts: 0)");
- processors.get(1).checkAndClearProcessResult("A:(null<-null) (ts: 0)", "B:(null<-null) (ts: 0)");
+ processors.get(0).checkAndClearProcessResult("A:(null<-null) (ts: 10)", "B:(null<-null) (ts: 20)");
+ processors.get(1).checkAndClearProcessResult("A:(null<-null) (ts: 10)", "B:(null<-null) (ts: 20)");
}
}
@@ -282,31 +283,31 @@ public class KTableFilterTest {
topology.addProcessor("proc2", supplier, table2.name);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
- driver.pipeInput(recordFactory.create(topic1, "A", 1));
- driver.pipeInput(recordFactory.create(topic1, "B", 1));
- driver.pipeInput(recordFactory.create(topic1, "C", 1));
+ driver.pipeInput(recordFactory.create(topic1, "A", 1, 5L));
+ driver.pipeInput(recordFactory.create(topic1, "B", 1, 10L));
+ driver.pipeInput(recordFactory.create(topic1, "C", 1, 15L));
final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
- processors.get(0).checkAndClearProcessResult("A:(1<-null) (ts: 0)", "B:(1<-null) (ts: 0)", "C:(1<-null) (ts: 0)");
+ processors.get(0).checkAndClearProcessResult("A:(1<-null) (ts: 5)", "B:(1<-null) (ts: 10)", "C:(1<-null) (ts: 15)");
processors.get(1).checkEmptyAndClearProcessResult();
- driver.pipeInput(recordFactory.create(topic1, "A", 2));
- driver.pipeInput(recordFactory.create(topic1, "B", 2));
+ driver.pipeInput(recordFactory.create(topic1, "A", 2, 15L));
+ driver.pipeInput(recordFactory.create(topic1, "B", 2, 8L));
- processors.get(0).checkAndClearProcessResult("A:(2<-1) (ts: 0)", "B:(2<-1) (ts: 0)");
- processors.get(1).checkAndClearProcessResult("A:(2<-null) (ts: 0)", "B:(2<-null) (ts: 0)");
+ processors.get(0).checkAndClearProcessResult("A:(2<-1) (ts: 15)", "B:(2<-1) (ts: 8)");
+ processors.get(1).checkAndClearProcessResult("A:(2<-null) (ts: 15)", "B:(2<-null) (ts: 8)");
- driver.pipeInput(recordFactory.create(topic1, "A", 3));
+ driver.pipeInput(recordFactory.create(topic1, "A", 3, 20L));
- processors.get(0).checkAndClearProcessResult("A:(3<-2) (ts: 0)");
- processors.get(1).checkAndClearProcessResult("A:(null<-2) (ts: 0)");
+ processors.get(0).checkAndClearProcessResult("A:(3<-2) (ts: 20)");
+ processors.get(1).checkAndClearProcessResult("A:(null<-2) (ts: 20)");
- driver.pipeInput(recordFactory.create(topic1, "A", null));
- driver.pipeInput(recordFactory.create(topic1, "B", null));
+ driver.pipeInput(recordFactory.create(topic1, "A", null, 10L));
+ driver.pipeInput(recordFactory.create(topic1, "B", null, 20L));
- processors.get(0).checkAndClearProcessResult("A:(null<-3) (ts: 0)", "B:(null<-2) (ts: 0)");
- processors.get(1).checkAndClearProcessResult("B:(null<-2) (ts: 0)");
+ processors.get(0).checkAndClearProcessResult("A:(null<-3) (ts: 10)", "B:(null<-2) (ts: 20)");
+ processors.get(1).checkAndClearProcessResult("B:(null<-2) (ts: 20)");
}
}
@@ -350,13 +351,13 @@ public class KTableFilterTest {
new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer(), 0L);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
- driver.pipeInput(stringRecordFactory.create(topic1, "A", "reject"));
- driver.pipeInput(stringRecordFactory.create(topic1, "B", "reject"));
- driver.pipeInput(stringRecordFactory.create(topic1, "C", "reject"));
+ driver.pipeInput(stringRecordFactory.create(topic1, "A", "reject", 5L));
+ driver.pipeInput(stringRecordFactory.create(topic1, "B", "reject", 10L));
+ driver.pipeInput(stringRecordFactory.create(topic1, "C", "reject", 20L));
}
final List<MockProcessor<String, String>> processors = supplier.capturedProcessors(2);
- processors.get(0).checkAndClearProcessResult("A:(reject<-null) (ts: 0)", "B:(reject<-null) (ts: 0)", "C:(reject<-null) (ts: 0)");
+ processors.get(0).checkAndClearProcessResult("A:(reject<-null) (ts: 5)", "B:(reject<-null) (ts: 10)", "C:(reject<-null) (ts: 20)");
processors.get(1).checkEmptyAndClearProcessResult();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 81ba31a..e9f688b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -103,17 +103,19 @@ public class KTableImplTest {
table4.toStream().process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
- driver.pipeInput(recordFactory.create(topic1, "A", "01"));
- driver.pipeInput(recordFactory.create(topic1, "B", "02"));
- driver.pipeInput(recordFactory.create(topic1, "C", "03"));
- driver.pipeInput(recordFactory.create(topic1, "D", "04"));
+ driver.pipeInput(recordFactory.create(topic1, "A", "01", 5L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "02", 100L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "03", 0L));
+ driver.pipeInput(recordFactory.create(topic1, "D", "04", 0L));
+ driver.pipeInput(recordFactory.create(topic1, "A", "05", 10L));
+ driver.pipeInput(recordFactory.create(topic1, "A", "06", 8L));
}
final List<MockProcessor<String, Object>> processors = supplier.capturedProcessors(4);
- assertEquals(asList("A:01 (ts: 0)", "B:02 (ts: 0)", "C:03 (ts: 0)", "D:04 (ts: 0)"), processors.get(0).processed);
- assertEquals(asList("A:1 (ts: 0)", "B:2 (ts: 0)", "C:3 (ts: 0)", "D:4 (ts: 0)"), processors.get(1).processed);
- assertEquals(asList("A:null (ts: 0)", "B:2 (ts: 0)", "C:null (ts: 0)", "D:4 (ts: 0)"), processors.get(2).processed);
- assertEquals(asList("A:01 (ts: 0)", "B:02 (ts: 0)", "C:03 (ts: 0)", "D:04 (ts: 0)"), processors.get(3).processed);
+ assertEquals(asList("A:01 (ts: 5)", "B:02 (ts: 100)", "C:03 (ts: 0)", "D:04 (ts: 0)", "A:05 (ts: 10)", "A:06 (ts: 8)"), processors.get(0).processed);
+ assertEquals(asList("A:1 (ts: 5)", "B:2 (ts: 100)", "C:3 (ts: 0)", "D:4 (ts: 0)", "A:5 (ts: 10)", "A:6 (ts: 8)"), processors.get(1).processed);
+ assertEquals(asList("A:null (ts: 5)", "B:2 (ts: 100)", "C:null (ts: 0)", "D:4 (ts: 0)", "A:null (ts: 10)", "A:6 (ts: 8)"), processors.get(2).processed);
+ assertEquals(asList("A:01 (ts: 5)", "B:02 (ts: 100)", "C:03 (ts: 0)", "D:04 (ts: 0)", "A:05 (ts: 10)", "A:06 (ts: 8)"), processors.get(3).processed);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index f02d5dd..195e38d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -48,13 +48,14 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class KTableKTableInnerJoinTest {
+ private final static String[] EMPTY = new String[0];
private final String topic1 = "topic1";
private final String topic2 = "topic2";
- final private String output = "output";
-
+ private final String output = "output";
private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
private final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.with(Serdes.Integer(), Serdes.String());
@@ -134,6 +135,106 @@ public class KTableKTableInnerJoinTest {
doTestNotSendingOldValues(builder, expectedKeys, table1, table2, supplier, joined);
}
+ @Test
+ public void testSendingOldValues() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+ final KTable<Integer, String> table1;
+ final KTable<Integer, String> table2;
+ final KTable<Integer, String> joined;
+ final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+
+ table1 = builder.table(topic1, consumed);
+ table2 = builder.table(topic2, consumed);
+ joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
+
+ ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+
+ builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor();
+
+ assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+ assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+ assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+
+ // push two items to the primary stream. the other table is empty
+ for (int i = 0; i < 2; i++) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], 5L + i));
+ }
+ // pass tuple with null key, it will be discarded in join process
+ driver.pipeInput(recordFactory.create(topic1, null, "SomeVal", 42L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right:
+ proc.checkAndClearProcessResult(EMPTY);
+
+ // push two items to the other stream. this should produce two items.
+ for (int i = 0; i < 2; i++) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], 10L * i));
+ }
+ // pass tuple with null key, it will be discarded in join process
+ driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal", 73L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ proc.checkAndClearProcessResult("0:(X0+Y0<-null) (ts: 5)", "1:(X1+Y1<-null) (ts: 10)");
+
+ // push all four items to the primary stream. this should produce two items.
+ for (final int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, 7L));
+ }
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ proc.checkAndClearProcessResult("0:(XX0+Y0<-X0+Y0) (ts: 7)", "1:(XX1+Y1<-X1+Y1) (ts: 10)");
+
+ // push all items to the other stream. this should produce four items.
+ for (final int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, expectedKey * 5L));
+ }
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(XX0+YY0<-XX0+Y0) (ts: 7)", "1:(XX1+YY1<-XX1+Y1) (ts: 7)",
+ "2:(XX2+YY2<-null) (ts: 10)", "3:(XX3+YY3<-null) (ts: 15)");
+
+ // push all four items to the primary stream. this should produce four items.
+ for (final int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXX" + expectedKey, 6L));
+ }
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(XXX0+YY0<-XX0+YY0) (ts: 6)", "1:(XXX1+YY1<-XX1+YY1) (ts: 6)",
+ "2:(XXX2+YY2<-XX2+YY2) (ts: 10)", "3:(XXX3+YY3<-XX3+YY3) (ts: 15)");
+
+ // push two items with null to the other stream as deletes. this should produce two item.
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[0], null, 5L));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[1], null, 7L));
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult("0:(null<-XXX0+YY0) (ts: 6)", "1:(null<-XXX1+YY1) (ts: 7)");
+
+ // push all four items to the primary stream. this should produce two items.
+ for (final int expectedKey : expectedKeys) {
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXXX" + expectedKey, 13L));
+ }
+ // left: XXXX0:0 (ts: 13), XXXX1:1 (ts: 13), XXXX2:2 (ts: 13), XXXX3:3 (ts: 13)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult("2:(XXXX2+YY2<-XXX2+YY2) (ts: 13)", "3:(XXXX3+YY3<-XXX3+YY3) (ts: 15)");
+
+ // push four items to the primary stream with null. this should produce two items.
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[0], null, 0L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[1], null, 42L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[2], null, 5L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[3], null, 20L));
+ // left:
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult("2:(null<-XXXX2+YY2) (ts: 10)", "3:(null<-XXXX3+YY3) (ts: 20)");
+ }
+ }
+
@SuppressWarnings("unchecked")
@Test
public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
@@ -157,11 +258,11 @@ public class KTableKTableInnerJoinTest {
}
private void doTestNotSendingOldValues(final StreamsBuilder builder,
- final int[] expectedKeys,
- final KTable<Integer, String> table1,
- final KTable<Integer, String> table2,
- final MockProcessorSupplier<Integer, String> supplier,
- final KTable<Integer, String> joined) {
+ final int[] expectedKeys,
+ final KTable<Integer, String> table1,
+ final KTable<Integer, String> table2,
+ final MockProcessorSupplier<Integer, String> supplier,
+ final KTable<Integer, String> joined) {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor();
@@ -172,45 +273,75 @@ public class KTableKTableInnerJoinTest {
// push two items to the primary stream. the other table is empty
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], 5L + i));
}
- proc.checkAndClearProcessResult();
+ // pass tuple with null key, it will be discarded in join process
+ driver.pipeInput(recordFactory.create(topic1, null, "SomeVal", 42L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right:
+ proc.checkAndClearProcessResult(EMPTY);
// push two items to the other stream. this should produce two items.
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], 10L * i));
}
- proc.checkAndClearProcessResult("0:(X0+Y0<-null) (ts: 0)", "1:(X1+Y1<-null) (ts: 0)");
+ // pass tuple with null key, it will be discarded in join process
+ driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal", 73L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ proc.checkAndClearProcessResult("0:(X0+Y0<-null) (ts: 5)", "1:(X1+Y1<-null) (ts: 10)");
// push all four items to the primary stream. this should produce two items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, 7L));
}
- proc.checkAndClearProcessResult("0:(XX0+Y0<-null) (ts: 0)", "1:(XX1+Y1<-null) (ts: 0)");
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ proc.checkAndClearProcessResult("0:(XX0+Y0<-null) (ts: 7)", "1:(XX1+Y1<-null) (ts: 10)");
// push all items to the other stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, expectedKey * 5L));
}
- proc.checkAndClearProcessResult("0:(XX0+YY0<-null) (ts: 0)", "1:(XX1+YY1<-null) (ts: 0)", "2:(XX2+YY2<-null) (ts: 0)", "3:(XX3+YY3<-null) (ts: 0)");
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(XX0+YY0<-null) (ts: 7)", "1:(XX1+YY1<-null) (ts: 7)",
+ "2:(XX2+YY2<-null) (ts: 10)", "3:(XX3+YY3<-null) (ts: 15)");
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXX" + expectedKey, 6L));
}
- proc.checkAndClearProcessResult("0:(X0+YY0<-null) (ts: 0)", "1:(X1+YY1<-null) (ts: 0)", "2:(X2+YY2<-null) (ts: 0)", "3:(X3+YY3<-null) (ts: 0)");
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(XXX0+YY0<-null) (ts: 6)", "1:(XXX1+YY1<-null) (ts: 6)",
+ "2:(XXX2+YY2<-null) (ts: 10)", "3:(XXX3+YY3<-null) (ts: 15)");
// push two items with null to the other stream as deletes. this should produce two item.
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null));
- }
- proc.checkAndClearProcessResult("0:(null<-null) (ts: 0)", "1:(null<-null) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[0], null, 5L));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[1], null, 7L));
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult("0:(null<-null) (ts: 6)", "1:(null<-null) (ts: 7)");
// push all four items to the primary stream. this should produce two items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXXX" + expectedKey, 13L));
}
- proc.checkAndClearProcessResult("2:(XX2+YY2<-null) (ts: 0)", "3:(XX3+YY3<-null) (ts: 0)");
+ // left: XXXX0:0 (ts: 13), XXXX1:1 (ts: 13), XXXX2:2 (ts: 13), XXXX3:3 (ts: 13)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult("2:(XXXX2+YY2<-null) (ts: 13)", "3:(XXXX3+YY3<-null) (ts: 15)");
+
+ // push four items to the primary stream with null. this should produce two items.
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[0], null, 0L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[1], null, 42L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[2], null, 5L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[3], null, 20L));
+ // left:
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult("2:(null<-null) (ts: 10)", "3:(null<-null) (ts: 20)");
}
}
@@ -224,75 +355,101 @@ public class KTableKTableInnerJoinTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
// push two items to the primary stream. the other table is empty
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], 5L + i));
}
// pass tuple with null key, it will be discarded in join process
- driver.pipeInput(recordFactory.create(topic1, null, "SomeVal"));
+ driver.pipeInput(recordFactory.create(topic1, null, "SomeVal", 42L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right:
assertNull(driver.readOutput(output));
// push two items to the other stream. this should produce two items.
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], 10L * i));
}
// pass tuple with null key, it will be discarded in join process
- driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal"));
- assertOutputKeyValue(driver, 0, "X0+Y0");
- assertOutputKeyValue(driver, 1, "X1+Y1");
+ driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal", 73L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ assertOutputKeyValueTimestamp(driver, 0, "X0+Y0", 5L);
+ assertOutputKeyValueTimestamp(driver, 1, "X1+Y1", 10L);
assertNull(driver.readOutput(output));
// push all four items to the primary stream. this should produce two items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, 7L));
}
- assertOutputKeyValue(driver, 0, "XX0+Y0");
- assertOutputKeyValue(driver, 1, "XX1+Y1");
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ assertOutputKeyValueTimestamp(driver, 0, "XX0+Y0", 7L);
+ assertOutputKeyValueTimestamp(driver, 1, "XX1+Y1", 10L);
assertNull(driver.readOutput(output));
// push all items to the other stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, expectedKey * 5L));
}
- assertOutputKeyValue(driver, 0, "XX0+YY0");
- assertOutputKeyValue(driver, 1, "XX1+YY1");
- assertOutputKeyValue(driver, 2, "XX2+YY2");
- assertOutputKeyValue(driver, 3, "XX3+YY3");
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ assertOutputKeyValueTimestamp(driver, 0, "XX0+YY0", 7L);
+ assertOutputKeyValueTimestamp(driver, 1, "XX1+YY1", 7L);
+ assertOutputKeyValueTimestamp(driver, 2, "XX2+YY2", 10L);
+ assertOutputKeyValueTimestamp(driver, 3, "XX3+YY3", 15L);
assertNull(driver.readOutput(output));
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXX" + expectedKey, 6L));
}
- assertOutputKeyValue(driver, 0, "X0+YY0");
- assertOutputKeyValue(driver, 1, "X1+YY1");
- assertOutputKeyValue(driver, 2, "X2+YY2");
- assertOutputKeyValue(driver, 3, "X3+YY3");
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ assertOutputKeyValueTimestamp(driver, 0, "XXX0+YY0", 6L);
+ assertOutputKeyValueTimestamp(driver, 1, "XXX1+YY1", 6L);
+ assertOutputKeyValueTimestamp(driver, 2, "XXX2+YY2", 10L);
+ assertOutputKeyValueTimestamp(driver, 3, "XXX3+YY3", 15L);
assertNull(driver.readOutput(output));
// push two items with null to the other stream as deletes. this should produce two item.
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null));
- }
- assertOutputKeyValue(driver, 0, null);
- assertOutputKeyValue(driver, 1, null);
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[0], null, 5L));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[1], null, 7L));
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ assertOutputKeyValueTimestamp(driver, 0, null, 6L);
+ assertOutputKeyValueTimestamp(driver, 1, null, 7L);
assertNull(driver.readOutput(output));
// push all four items to the primary stream. this should produce two items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXXX" + expectedKey, 13L));
}
- assertOutputKeyValue(driver, 2, "XX2+YY2");
- assertOutputKeyValue(driver, 3, "XX3+YY3");
+ // left: XXXX0:0 (ts: 13), XXXX1:1 (ts: 13), XXXX2:2 (ts: 13), XXXX3:3 (ts: 13)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ assertOutputKeyValueTimestamp(driver, 2, "XXXX2+YY2", 13L);
+ assertOutputKeyValueTimestamp(driver, 3, "XXXX3+YY3", 15L);
assertNull(driver.readOutput(output));
- driver.pipeInput(recordFactory.create(topic1, null, "XX" + 1));
+ // push fourt items to the primary stream with null. this should produce two items.
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[0], null, 0L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[1], null, 42L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[2], null, 5L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[3], null, 20L));
+ // left:
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ assertOutputKeyValueTimestamp(driver, 2, null, 10L);
+ assertOutputKeyValueTimestamp(driver, 3, null, 20L);
assertNull(driver.readOutput(output));
}
}
- private void assertOutputKeyValue(final TopologyTestDriver driver,
- final Integer expectedKey,
- final String expectedValue) {
- OutputVerifier.compareKeyValue(driver.readOutput(output, Serdes.Integer().deserializer(), Serdes.String().deserializer()), expectedKey, expectedValue);
+ private void assertOutputKeyValueTimestamp(final TopologyTestDriver driver,
+ final Integer expectedKey,
+ final String expectedValue,
+ final long expectedTimestamp) {
+ OutputVerifier.compareKeyValueTimestamp(
+ driver.readOutput(output, Serdes.Integer().deserializer(), Serdes.String().deserializer()),
+ expectedKey,
+ expectedValue,
+ expectedTimestamp);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 3ec03f4..92ff514 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -58,10 +58,9 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class KTableKTableLeftJoinTest {
- final private String topic1 = "topic1";
- final private String topic2 = "topic2";
- final private String output = "output";
-
+ private final String topic1 = "topic1";
+ private final String topic2 = "topic2";
+ private final String output = "output";
private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
private final ConsumerRecordFactory<Integer, String> recordFactory =
new ConsumerRecordFactory<>(Serdes.Integer().serializer(), Serdes.String().serializer(), 0L);
@@ -87,70 +86,96 @@ public class KTableKTableLeftJoinTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
// push two items to the primary stream. the other table is empty
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], 5L + i));
}
// pass tuple with null key, it will be discarded in join process
- driver.pipeInput(recordFactory.create(topic1, null, "SomeVal"));
- assertOutputKeyValue(driver, 0, "X0+null");
- assertOutputKeyValue(driver, 1, "X1+null");
+ driver.pipeInput(recordFactory.create(topic1, null, "SomeVal", 42L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right:
+ assertOutputKeyValueTimestamp(driver, 0, "X0+null", 5L);
+ assertOutputKeyValueTimestamp(driver, 1, "X1+null", 6L);
assertNull(driver.readOutput(output));
// push two items to the other stream. this should produce two items.
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], 10L * i));
}
// pass tuple with null key, it will be discarded in join process
- driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal"));
- assertOutputKeyValue(driver, 0, "X0+Y0");
- assertOutputKeyValue(driver, 1, "X1+Y1");
+ driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal", 73L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ assertOutputKeyValueTimestamp(driver, 0, "X0+Y0", 5L);
+ assertOutputKeyValueTimestamp(driver, 1, "X1+Y1", 10L);
assertNull(driver.readOutput(output));
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, 7L));
}
- assertOutputKeyValue(driver, 0, "X0+Y0");
- assertOutputKeyValue(driver, 1, "X1+Y1");
- assertOutputKeyValue(driver, 2, "X2+null");
- assertOutputKeyValue(driver, 3, "X3+null");
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ assertOutputKeyValueTimestamp(driver, 0, "XX0+Y0", 7L);
+ assertOutputKeyValueTimestamp(driver, 1, "XX1+Y1", 10L);
+ assertOutputKeyValueTimestamp(driver, 2, "XX2+null", 7L);
+ assertOutputKeyValueTimestamp(driver, 3, "XX3+null", 7L);
assertNull(driver.readOutput(output));
// push all items to the other stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, expectedKey * 5L));
}
- assertOutputKeyValue(driver, 0, "X0+YY0");
- assertOutputKeyValue(driver, 1, "X1+YY1");
- assertOutputKeyValue(driver, 2, "X2+YY2");
- assertOutputKeyValue(driver, 3, "X3+YY3");
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ assertOutputKeyValueTimestamp(driver, 0, "XX0+YY0", 7L);
+ assertOutputKeyValueTimestamp(driver, 1, "XX1+YY1", 7L);
+ assertOutputKeyValueTimestamp(driver, 2, "XX2+YY2", 10L);
+ assertOutputKeyValueTimestamp(driver, 3, "XX3+YY3", 15L);
assertNull(driver.readOutput(output));
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXX" + expectedKey, 6L));
}
- assertOutputKeyValue(driver, 0, "X0+YY0");
- assertOutputKeyValue(driver, 1, "X1+YY1");
- assertOutputKeyValue(driver, 2, "X2+YY2");
- assertOutputKeyValue(driver, 3, "X3+YY3");
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ assertOutputKeyValueTimestamp(driver, 0, "XXX0+YY0", 6L);
+ assertOutputKeyValueTimestamp(driver, 1, "XXX1+YY1", 6L);
+ assertOutputKeyValueTimestamp(driver, 2, "XXX2+YY2", 10L);
+ assertOutputKeyValueTimestamp(driver, 3, "XXX3+YY3", 15L);
assertNull(driver.readOutput(output));
// push two items with null to the other stream as deletes. this should produce two item.
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null));
- }
- assertOutputKeyValue(driver, 0, "X0+null");
- assertOutputKeyValue(driver, 1, "X1+null");
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[0], null, 5L));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[1], null, 7L));
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ assertOutputKeyValueTimestamp(driver, 0, "XXX0+null", 6L);
+ assertOutputKeyValueTimestamp(driver, 1, "XXX1+null", 7L);
assertNull(driver.readOutput(output));
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXXX" + expectedKey, 13L));
}
- assertOutputKeyValue(driver, 0, "XX0+null");
- assertOutputKeyValue(driver, 1, "XX1+null");
- assertOutputKeyValue(driver, 2, "XX2+YY2");
- assertOutputKeyValue(driver, 3, "XX3+YY3");
+ // left: XXXX0:0 (ts: 13), XXXX1:1 (ts: 13), XXXX2:2 (ts: 13), XXXX3:3 (ts: 13)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ assertOutputKeyValueTimestamp(driver, 0, "XXXX0+null", 13L);
+ assertOutputKeyValueTimestamp(driver, 1, "XXXX1+null", 13L);
+ assertOutputKeyValueTimestamp(driver, 2, "XXXX2+YY2", 13L);
+ assertOutputKeyValueTimestamp(driver, 3, "XXXX3+YY3", 15L);
+ assertNull(driver.readOutput(output));
+
+ // push three items to the primary stream with null. this should produce four items.
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[0], null, 0L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[1], null, 42L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[2], null, 5L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[3], null, 20L));
+ // left:
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ assertOutputKeyValueTimestamp(driver, 0, null, 0L);
+ assertOutputKeyValueTimestamp(driver, 1, null, 42L);
+ assertOutputKeyValueTimestamp(driver, 2, null, 10L);
+ assertOutputKeyValueTimestamp(driver, 3, null, 20L);
assertNull(driver.readOutput(output));
}
}
@@ -182,45 +207,81 @@ public class KTableKTableLeftJoinTest {
// push two items to the primary stream. the other table is empty
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], 5L + i));
}
- proc.checkAndClearProcessResult("0:(X0+null<-null) (ts: 0)", "1:(X1+null<-null) (ts: 0)");
+ // pass tuple with null key, it will be discarded in join process
+ driver.pipeInput(recordFactory.create(topic1, null, "SomeVal", 42L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right:
+ proc.checkAndClearProcessResult("0:(X0+null<-null) (ts: 5)", "1:(X1+null<-null) (ts: 6)");
// push two items to the other stream. this should produce two items.
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], 10L * i));
}
- proc.checkAndClearProcessResult("0:(X0+Y0<-null) (ts: 0)", "1:(X1+Y1<-null) (ts: 0)");
+ // pass tuple with null key, it will be discarded in join process
+ driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal", 73L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ proc.checkAndClearProcessResult("0:(X0+Y0<-null) (ts: 5)", "1:(X1+Y1<-null) (ts: 10)");
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, 7L));
}
- proc.checkAndClearProcessResult("0:(X0+Y0<-null) (ts: 0)", "1:(X1+Y1<-null) (ts: 0)", "2:(X2+null<-null) (ts: 0)", "3:(X3+null<-null) (ts: 0)");
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ proc.checkAndClearProcessResult(
+ "0:(XX0+Y0<-null) (ts: 7)", "1:(XX1+Y1<-null) (ts: 10)",
+ "2:(XX2+null<-null) (ts: 7)", "3:(XX3+null<-null) (ts: 7)");
// push all items to the other stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, expectedKey * 5L));
}
- proc.checkAndClearProcessResult("0:(X0+YY0<-null) (ts: 0)", "1:(X1+YY1<-null) (ts: 0)", "2:(X2+YY2<-null) (ts: 0)", "3:(X3+YY3<-null) (ts: 0)");
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(XX0+YY0<-null) (ts: 7)", "1:(XX1+YY1<-null) (ts: 7)",
+ "2:(XX2+YY2<-null) (ts: 10)", "3:(XX3+YY3<-null) (ts: 15)");
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXX" + expectedKey, 6L));
}
- proc.checkAndClearProcessResult("0:(X0+YY0<-null) (ts: 0)", "1:(X1+YY1<-null) (ts: 0)", "2:(X2+YY2<-null) (ts: 0)", "3:(X3+YY3<-null) (ts: 0)");
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(XXX0+YY0<-null) (ts: 6)", "1:(XXX1+YY1<-null) (ts: 6)",
+ "2:(XXX2+YY2<-null) (ts: 10)", "3:(XXX3+YY3<-null) (ts: 15)");
// push two items with null to the other stream as deletes. this should produce two item.
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null));
- }
- proc.checkAndClearProcessResult("0:(X0+null<-null) (ts: 0)", "1:(X1+null<-null) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[0], null, 5L));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[1], null, 7L));
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult("0:(XXX0+null<-null) (ts: 6)", "1:(XXX1+null<-null) (ts: 7)");
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXXX" + expectedKey, 13L));
}
- proc.checkAndClearProcessResult("0:(XX0+null<-null) (ts: 0)", "1:(XX1+null<-null) (ts: 0)", "2:(XX2+YY2<-null) (ts: 0)", "3:(XX3+YY3<-null) (ts: 0)");
+ // left: XXXX0:0 (ts: 13), XXXX1:1 (ts: 13), XXXX2:2 (ts: 13), XXXX3:3 (ts: 13)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(XXXX0+null<-null) (ts: 13)", "1:(XXXX1+null<-null) (ts: 13)",
+ "2:(XXXX2+YY2<-null) (ts: 13)", "3:(XXXX3+YY3<-null) (ts: 15)");
+
+ // push four items to the primary stream with null. this should produce four items.
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[0], null, 0L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[1], null, 42L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[2], null, 5L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[3], null, 20L));
+ // left:
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(null<-null) (ts: 0)", "1:(null<-null) (ts: 42)",
+ "2:(null<-null) (ts: 10)", "3:(null<-null) (ts: 20)");
}
}
@@ -253,45 +314,81 @@ public class KTableKTableLeftJoinTest {
// push two items to the primary stream. the other table is empty
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], 5L + i));
}
- proc.checkAndClearProcessResult("0:(X0+null<-null) (ts: 0)", "1:(X1+null<-null) (ts: 0)");
+ // pass tuple with null key, it will be discarded in join process
+ driver.pipeInput(recordFactory.create(topic1, null, "SomeVal", 42L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right:
+ proc.checkAndClearProcessResult("0:(X0+null<-null) (ts: 5)", "1:(X1+null<-null) (ts: 6)");
// push two items to the other stream. this should produce two items.
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], 10L * i));
}
- proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null) (ts: 0)", "1:(X1+Y1<-X1+null) (ts: 0)");
+ // pass tuple with null key, it will be discarded in join process
+ driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal", 73L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null) (ts: 5)", "1:(X1+Y1<-X1+null) (ts: 10)");
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, 7L));
}
- proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0) (ts: 0)", "1:(X1+Y1<-X1+Y1) (ts: 0)", "2:(X2+null<-null) (ts: 0)", "3:(X3+null<-null) (ts: 0)");
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ proc.checkAndClearProcessResult(
+ "0:(XX0+Y0<-X0+Y0) (ts: 7)", "1:(XX1+Y1<-X1+Y1) (ts: 10)",
+ "2:(XX2+null<-null) (ts: 7)", "3:(XX3+null<-null) (ts: 7)");
// push all items to the other stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, expectedKey * 5L));
}
- proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0) (ts: 0)", "1:(X1+YY1<-X1+Y1) (ts: 0)", "2:(X2+YY2<-X2+null) (ts: 0)", "3:(X3+YY3<-X3+null) (ts: 0)");
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(XX0+YY0<-XX0+Y0) (ts: 7)", "1:(XX1+YY1<-XX1+Y1) (ts: 7)",
+ "2:(XX2+YY2<-XX2+null) (ts: 10)", "3:(XX3+YY3<-XX3+null) (ts: 15)");
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXX" + expectedKey, 6L));
}
- proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0) (ts: 0)", "1:(X1+YY1<-X1+YY1) (ts: 0)", "2:(X2+YY2<-X2+YY2) (ts: 0)", "3:(X3+YY3<-X3+YY3) (ts: 0)");
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(XXX0+YY0<-XX0+YY0) (ts: 6)", "1:(XXX1+YY1<-XX1+YY1) (ts: 6)",
+ "2:(XXX2+YY2<-XX2+YY2) (ts: 10)", "3:(XXX3+YY3<-XX3+YY3) (ts: 15)");
// push two items with null to the other stream as deletes. this should produce two item.
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null));
- }
- proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0) (ts: 0)", "1:(X1+null<-X1+YY1) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[0], null, 5L));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[1], null, 7L));
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult("0:(XXX0+null<-XXX0+YY0) (ts: 6)", "1:(XXX1+null<-XXX1+YY1) (ts: 7)");
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXXX" + expectedKey, 13L));
}
- proc.checkAndClearProcessResult("0:(XX0+null<-X0+null) (ts: 0)", "1:(XX1+null<-X1+null) (ts: 0)", "2:(XX2+YY2<-X2+YY2) (ts: 0)", "3:(XX3+YY3<-X3+YY3) (ts: 0)");
+ // left: XXXX0:0 (ts: 13), XXXX1:1 (ts: 13), XXXX2:2 (ts: 13), XXXX3:3 (ts: 13)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(XXXX0+null<-XXX0+null) (ts: 13)", "1:(XXXX1+null<-XXX1+null) (ts: 13)",
+ "2:(XXXX2+YY2<-XXX2+YY2) (ts: 13)", "3:(XXXX3+YY3<-XXX3+YY3) (ts: 15)");
+
+ // push four items to the primary stream with null. this should produce four items.
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[0], null, 0L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[1], null, 42L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[2], null, 5L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[3], null, 20L));
+ // left:
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(null<-XXXX0+null) (ts: 0)", "1:(null<-XXXX1+null) (ts: 42)",
+ "2:(null<-XXXX2+YY2) (ts: 10)", "3:(null<-XXXX3+YY3) (ts: 20)");
}
}
@@ -403,9 +500,14 @@ public class KTableKTableLeftJoinTest {
assertThat(appender.getMessages(), hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
}
- private void assertOutputKeyValue(final TopologyTestDriver driver,
- final Integer expectedKey,
- final String expectedValue) {
- OutputVerifier.compareKeyValue(driver.readOutput(output, Serdes.Integer().deserializer(), Serdes.String().deserializer()), expectedKey, expectedValue);
+ private void assertOutputKeyValueTimestamp(final TopologyTestDriver driver,
+ final Integer expectedKey,
+ final String expectedValue,
+ final long expectedTimestamp) {
+ OutputVerifier.compareKeyValueTimestamp(
+ driver.readOutput(output, Serdes.Integer().deserializer(), Serdes.String().deserializer()),
+ expectedKey,
+ expectedValue,
+ expectedTimestamp);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 93a055d..38c90ce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -49,10 +49,9 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class KTableKTableOuterJoinTest {
- final private String topic1 = "topic1";
- final private String topic2 = "topic2";
- final private String output = "output";
-
+ private final String topic1 = "topic1";
+ private final String topic2 = "topic2";
+ private final String output = "output";
private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
private final ConsumerRecordFactory<Integer, String> recordFactory =
new ConsumerRecordFactory<>(Serdes.Integer().serializer(), Serdes.String().serializer(), 0L);
@@ -82,78 +81,96 @@ public class KTableKTableOuterJoinTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
// push two items to the primary stream. the other table is empty
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], 5L + i));
}
// pass tuple with null key, it will be discarded in join process
- driver.pipeInput(recordFactory.create(topic1, null, "SomeVal"));
- assertOutputKeyValue(driver, 0, "X0+null");
- assertOutputKeyValue(driver, 1, "X1+null");
+ driver.pipeInput(recordFactory.create(topic1, null, "SomeVal", 42L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right:
+ assertOutputKeyValueTimestamp(driver, 0, "X0+null", 5L);
+ assertOutputKeyValueTimestamp(driver, 1, "X1+null", 6L);
assertNull(driver.readOutput(output));
// push two items to the other stream. this should produce two items.
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], 10L * i));
}
// pass tuple with null key, it will be discarded in join process
- driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal"));
- assertOutputKeyValue(driver, 0, "X0+Y0");
- assertOutputKeyValue(driver, 1, "X1+Y1");
+ driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal", 73L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ assertOutputKeyValueTimestamp(driver, 0, "X0+Y0", 5L);
+ assertOutputKeyValueTimestamp(driver, 1, "X1+Y1", 10L);
assertNull(driver.readOutput(output));
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, 7L));
}
- assertOutputKeyValue(driver, 0, "X0+Y0");
- assertOutputKeyValue(driver, 1, "X1+Y1");
- assertOutputKeyValue(driver, 2, "X2+null");
- assertOutputKeyValue(driver, 3, "X3+null");
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ assertOutputKeyValueTimestamp(driver, 0, "XX0+Y0", 7L);
+ assertOutputKeyValueTimestamp(driver, 1, "XX1+Y1", 10L);
+ assertOutputKeyValueTimestamp(driver, 2, "XX2+null", 7L);
+ assertOutputKeyValueTimestamp(driver, 3, "XX3+null", 7L);
assertNull(driver.readOutput(output));
// push all items to the other stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, expectedKey * 5L));
}
- assertOutputKeyValue(driver, 0, "X0+YY0");
- assertOutputKeyValue(driver, 1, "X1+YY1");
- assertOutputKeyValue(driver, 2, "X2+YY2");
- assertOutputKeyValue(driver, 3, "X3+YY3");
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ assertOutputKeyValueTimestamp(driver, 0, "XX0+YY0", 7L);
+ assertOutputKeyValueTimestamp(driver, 1, "XX1+YY1", 7L);
+ assertOutputKeyValueTimestamp(driver, 2, "XX2+YY2", 10L);
+ assertOutputKeyValueTimestamp(driver, 3, "XX3+YY3", 15L);
assertNull(driver.readOutput(output));
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXX" + expectedKey, 6L));
}
- assertOutputKeyValue(driver, 0, "X0+YY0");
- assertOutputKeyValue(driver, 1, "X1+YY1");
- assertOutputKeyValue(driver, 2, "X2+YY2");
- assertOutputKeyValue(driver, 3, "X3+YY3");
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ assertOutputKeyValueTimestamp(driver, 0, "XXX0+YY0", 6L);
+ assertOutputKeyValueTimestamp(driver, 1, "XXX1+YY1", 6L);
+ assertOutputKeyValueTimestamp(driver, 2, "XXX2+YY2", 10L);
+ assertOutputKeyValueTimestamp(driver, 3, "XXX3+YY3", 15L);
assertNull(driver.readOutput(output));
// push two items with null to the other stream as deletes. this should produce two item.
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null));
- }
- assertOutputKeyValue(driver, 0, "X0+null");
- assertOutputKeyValue(driver, 1, "X1+null");
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[0], null, 5L));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[1], null, 7L));
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ assertOutputKeyValueTimestamp(driver, 0, "XXX0+null", 6L);
+ assertOutputKeyValueTimestamp(driver, 1, "XXX1+null", 7L);
assertNull(driver.readOutput(output));
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXXX" + expectedKey, 13L));
}
- assertOutputKeyValue(driver, 0, "XX0+null");
- assertOutputKeyValue(driver, 1, "XX1+null");
- assertOutputKeyValue(driver, 2, "XX2+YY2");
- assertOutputKeyValue(driver, 3, "XX3+YY3");
+ // left: XXXX0:0 (ts: 13), XXXX1:1 (ts: 13), XXXX2:2 (ts: 13), XXXX3:3 (ts: 13)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ assertOutputKeyValueTimestamp(driver, 0, "XXXX0+null", 13L);
+ assertOutputKeyValueTimestamp(driver, 1, "XXXX1+null", 13L);
+ assertOutputKeyValueTimestamp(driver, 2, "XXXX2+YY2", 13L);
+ assertOutputKeyValueTimestamp(driver, 3, "XXXX3+YY3", 15L);
assertNull(driver.readOutput(output));
- // push middle two items to the primary stream with null. this should produce two items.
- for (int i = 1; i < 3; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], null));
- }
- assertOutputKeyValue(driver, 1, null);
- assertOutputKeyValue(driver, 2, "null+YY2");
+ // push four items to the primary stream with null. this should produce four items.
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[0], null, 0L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[1], null, 42L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[2], null, 5L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[3], null, 20L));
+ // left:
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ assertOutputKeyValueTimestamp(driver, 0, null, 0L);
+ assertOutputKeyValueTimestamp(driver, 1, null, 42L);
+ assertOutputKeyValueTimestamp(driver, 2, "null+YY2", 10L);
+ assertOutputKeyValueTimestamp(driver, 3, "null+YY3", 20L);
assertNull(driver.readOutput(output));
}
}
@@ -185,51 +202,81 @@ public class KTableKTableOuterJoinTest {
// push two items to the primary stream. the other table is empty
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], 5L + i));
}
- proc.checkAndClearProcessResult("0:(X0+null<-null) (ts: 0)", "1:(X1+null<-null) (ts: 0)");
+ // pass tuple with null key, it will be discarded in join process
+ driver.pipeInput(recordFactory.create(topic1, null, "SomeVal", 42L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right:
+ proc.checkAndClearProcessResult("0:(X0+null<-null) (ts: 5)", "1:(X1+null<-null) (ts: 6)");
// push two items to the other stream. this should produce two items.
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], 10L * i));
}
- proc.checkAndClearProcessResult("0:(X0+Y0<-null) (ts: 0)", "1:(X1+Y1<-null) (ts: 0)");
+ // pass tuple with null key, it will be discarded in join process
+ driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal", 73L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ proc.checkAndClearProcessResult("0:(X0+Y0<-null) (ts: 5)", "1:(X1+Y1<-null) (ts: 10)");
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, 7L));
}
- proc.checkAndClearProcessResult("0:(X0+Y0<-null) (ts: 0)", "1:(X1+Y1<-null) (ts: 0)", "2:(X2+null<-null) (ts: 0)", "3:(X3+null<-null) (ts: 0)");
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ proc.checkAndClearProcessResult(
+ "0:(XX0+Y0<-null) (ts: 7)", "1:(XX1+Y1<-null) (ts: 10)",
+ "2:(XX2+null<-null) (ts: 7)", "3:(XX3+null<-null) (ts: 7)");
// push all items to the other stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, expectedKey * 5L));
}
- proc.checkAndClearProcessResult("0:(X0+YY0<-null) (ts: 0)", "1:(X1+YY1<-null) (ts: 0)", "2:(X2+YY2<-null) (ts: 0)", "3:(X3+YY3<-null) (ts: 0)");
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(XX0+YY0<-null) (ts: 7)", "1:(XX1+YY1<-null) (ts: 7)",
+ "2:(XX2+YY2<-null) (ts: 10)", "3:(XX3+YY3<-null) (ts: 15)");
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXX" + expectedKey, 6L));
}
- proc.checkAndClearProcessResult("0:(X0+YY0<-null) (ts: 0)", "1:(X1+YY1<-null) (ts: 0)", "2:(X2+YY2<-null) (ts: 0)", "3:(X3+YY3<-null) (ts: 0)");
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(XXX0+YY0<-null) (ts: 6)", "1:(XXX1+YY1<-null) (ts: 6)",
+ "2:(XXX2+YY2<-null) (ts: 10)", "3:(XXX3+YY3<-null) (ts: 15)");
// push two items with null to the other stream as deletes. this should produce two item.
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null));
- }
- proc.checkAndClearProcessResult("0:(X0+null<-null) (ts: 0)", "1:(X1+null<-null) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[0], null, 5L));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[1], null, 7L));
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult("0:(XXX0+null<-null) (ts: 6)", "1:(XXX1+null<-null) (ts: 7)");
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXXX" + expectedKey, 13L));
}
- proc.checkAndClearProcessResult("0:(XX0+null<-null) (ts: 0)", "1:(XX1+null<-null) (ts: 0)", "2:(XX2+YY2<-null) (ts: 0)", "3:(XX3+YY3<-null) (ts: 0)");
-
- // push middle two items to the primary stream with null. this should produce two items.
- for (int i = 1; i < 3; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], null));
- }
- proc.checkAndClearProcessResult("1:(null<-null) (ts: 0)", "2:(null+YY2<-null) (ts: 0)");
+ // left: XXXX0:0 (ts: 13), XXXX1:1 (ts: 13), XXXX2:2 (ts: 13), XXXX3:3 (ts: 13)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(XXXX0+null<-null) (ts: 13)", "1:(XXXX1+null<-null) (ts: 13)",
+ "2:(XXXX2+YY2<-null) (ts: 13)", "3:(XXXX3+YY3<-null) (ts: 15)");
+
+ // push four items to the primary stream with null. this should produce four items.
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[0], null, 0L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[1], null, 42L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[2], null, 5L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[3], null, 20L));
+ // left:
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(null<-null) (ts: 0)", "1:(null<-null) (ts: 42)",
+ "2:(null+YY2<-null) (ts: 10)", "3:(null+YY3<-null) (ts: 20)");
}
}
@@ -262,51 +309,81 @@ public class KTableKTableOuterJoinTest {
// push two items to the primary stream. the other table is empty
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], 5L + i));
}
- proc.checkAndClearProcessResult("0:(X0+null<-null) (ts: 0)", "1:(X1+null<-null) (ts: 0)");
+ // pass tuple with null key, it will be discarded in join process
+ driver.pipeInput(recordFactory.create(topic1, null, "SomeVal", 42L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right:
+ proc.checkAndClearProcessResult("0:(X0+null<-null) (ts: 5)", "1:(X1+null<-null) (ts: 6)");
// push two items to the other stream. this should produce two items.
for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i]));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i], 10L * i));
}
- proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null) (ts: 0)", "1:(X1+Y1<-X1+null) (ts: 0)");
+ // pass tuple with null key, it will be discarded in join process
+ driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal", 73L));
+ // left: X0:0 (ts: 5), X1:1 (ts: 6)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null) (ts: 5)", "1:(X1+Y1<-X1+null) (ts: 10)");
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, 7L));
}
- proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0) (ts: 0)", "1:(X1+Y1<-X1+Y1) (ts: 0)", "2:(X2+null<-null) (ts: 0)", "3:(X3+null<-null) (ts: 0)");
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: Y0:0 (ts: 0), Y1:1 (ts: 10)
+ proc.checkAndClearProcessResult(
+ "0:(XX0+Y0<-X0+Y0) (ts: 7)", "1:(XX1+Y1<-X1+Y1) (ts: 10)",
+ "2:(XX2+null<-null) (ts: 7)", "3:(XX3+null<-null) (ts: 7)");
// push all items to the other stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey, expectedKey * 5L));
}
- proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0) (ts: 0)", "1:(X1+YY1<-X1+Y1) (ts: 0)", "2:(X2+YY2<-X2+null) (ts: 0)", "3:(X3+YY3<-X3+null) (ts: 0)");
+ // left: XX0:0 (ts: 7), XX1:1 (ts: 7), XX2:2 (ts: 7), XX3:3 (ts: 7)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(XX0+YY0<-XX0+Y0) (ts: 7)", "1:(XX1+YY1<-XX1+Y1) (ts: 7)",
+ "2:(XX2+YY2<-XX2+null) (ts: 10)", "3:(XX3+YY3<-XX3+null) (ts: 15)");
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXX" + expectedKey, 6L));
}
- proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0) (ts: 0)", "1:(X1+YY1<-X1+YY1) (ts: 0)", "2:(X2+YY2<-X2+YY2) (ts: 0)", "3:(X3+YY3<-X3+YY3) (ts: 0)");
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY0:0 (ts: 0), YY1:1 (ts: 5), YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(XXX0+YY0<-XX0+YY0) (ts: 6)", "1:(XXX1+YY1<-XX1+YY1) (ts: 6)",
+ "2:(XXX2+YY2<-XX2+YY2) (ts: 10)", "3:(XXX3+YY3<-XX3+YY3) (ts: 15)");
// push two items with null to the other stream as deletes. this should produce two item.
- for (int i = 0; i < 2; i++) {
- driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null));
- }
- proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0) (ts: 0)", "1:(X1+null<-X1+YY1) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[0], null, 5L));
+ driver.pipeInput(recordFactory.create(topic2, expectedKeys[1], null, 7L));
+ // left: XXX0:0 (ts: 6), XXX1:1 (ts: 6), XXX2:2 (ts: 6), XXX3:3 (ts: 6)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult("0:(XXX0+null<-XXX0+YY0) (ts: 6)", "1:(XXX1+null<-XXX1+YY1) (ts: 7)");
// push all four items to the primary stream. this should produce four items.
for (final int expectedKey : expectedKeys) {
- driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey));
- }
- proc.checkAndClearProcessResult("0:(XX0+null<-X0+null) (ts: 0)", "1:(XX1+null<-X1+null) (ts: 0)", "2:(XX2+YY2<-X2+YY2) (ts: 0)", "3:(XX3+YY3<-X3+YY3) (ts: 0)");
-
- // push middle two items to the primary stream with null. this should produce two items.
- for (int i = 1; i < 3; i++) {
- driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], null));
+ driver.pipeInput(recordFactory.create(topic1, expectedKey, "XXXX" + expectedKey, 13L));
}
- proc.checkAndClearProcessResult("1:(null<-XX1+null) (ts: 0)", "2:(null+YY2<-XX2+YY2) (ts: 0)");
+ // left: XXXX0:0 (ts: 13), XXXX1:1 (ts: 13), XXXX2:2 (ts: 13), XXXX3:3 (ts: 13)
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(XXXX0+null<-XXX0+null) (ts: 13)", "1:(XXXX1+null<-XXX1+null) (ts: 13)",
+ "2:(XXXX2+YY2<-XXX2+YY2) (ts: 13)", "3:(XXXX3+YY3<-XXX3+YY3) (ts: 15)");
+
+ // push four items to the primary stream with null. this should produce four items.
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[0], null, 0L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[1], null, 42L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[2], null, 5L));
+ driver.pipeInput(recordFactory.create(topic1, expectedKeys[3], null, 20L));
+ // left:
+ // right: YY2:2 (ts: 10), YY3:3 (ts: 15)
+ proc.checkAndClearProcessResult(
+ "0:(null<-XXXX0+null) (ts: 0)", "1:(null<-XXXX1+null) (ts: 42)",
+ "2:(null+YY2<-XXXX2+YY2) (ts: 10)", "3:(null+YY3<-XXXX3+YY3) (ts: 20)");
}
}
@@ -332,10 +409,15 @@ public class KTableKTableOuterJoinTest {
assertThat(appender.getMessages(), hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
}
- private void assertOutputKeyValue(final TopologyTestDriver driver,
- final Integer expectedKey,
- final String expectedValue) {
- OutputVerifier.compareKeyValue(driver.readOutput(output, Serdes.Integer().deserializer(), Serdes.String().deserializer()), expectedKey, expectedValue);
+ private void assertOutputKeyValueTimestamp(final TopologyTestDriver driver,
+ final Integer expectedKey,
+ final String expectedValue,
+ final long expectedTimestamp) {
+ OutputVerifier.compareKeyValueTimestamp(
+ driver.readOutput(output, Serdes.Integer().deserializer(), Serdes.String().deserializer()),
+ expectedKey,
+ expectedValue,
+ expectedTimestamp);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index 5a008d5..8b6f27a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -54,7 +54,7 @@ public class KTableMapKeysTest {
final KStream<String, String> convertedStream = table1.toStream((key, value) -> keyMap.get(key));
- final String[] expected = new String[]{"ONE:V_ONE (ts: 0)", "TWO:V_TWO (ts: 0)", "THREE:V_THREE (ts: 0)"};
+ final String[] expected = new String[]{"ONE:V_ONE (ts: 5)", "TWO:V_TWO (ts: 10)", "THREE:V_THREE (ts: 15)"};
final int[] originalKeys = new int[]{1, 2, 3};
final String[] values = new String[]{"V_ONE", "V_TWO", "V_THREE"};
@@ -63,7 +63,7 @@ public class KTableMapKeysTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
for (int i = 0; i < originalKeys.length; i++) {
- driver.pipeInput(recordFactory.create(topic1, originalKeys[i], values[i]));
+ driver.pipeInput(recordFactory.create(topic1, originalKeys[i], values[i], 5 + i * 5));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index b472b2d..9b78291 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
@@ -54,11 +55,11 @@ public class KTableMapValuesTest {
final String topic1,
final MockProcessorSupplier<String, Integer> supplier) {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
- driver.pipeInput(recordFactory.create(topic1, "A", "1"));
- driver.pipeInput(recordFactory.create(topic1, "B", "2"));
- driver.pipeInput(recordFactory.create(topic1, "C", "3"));
- driver.pipeInput(recordFactory.create(topic1, "D", "4"));
- assertEquals(asList("A:1 (ts: 0)", "B:2 (ts: 0)", "C:3 (ts: 0)", "D:4 (ts: 0)"), supplier.theCapturedProcessor().processed);
+ driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2", 25L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "3", 20L));
+ driver.pipeInput(recordFactory.create(topic1, "D", "4", 10L));
+ assertEquals(asList("A:1 (ts: 5)", "B:2 (ts: 25)", "C:3 (ts: 20)", "D:4 (ts: 10)"), supplier.theCapturedProcessor().processed);
}
}
@@ -115,48 +116,48 @@ public class KTableMapValuesTest {
getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
- driver.pipeInput(recordFactory.create(topic1, "A", "01"));
- driver.pipeInput(recordFactory.create(topic1, "B", "01"));
- driver.pipeInput(recordFactory.create(topic1, "C", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "A", "01", 50L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "01", 10L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "01", 30L));
- assertEquals(new Integer(1), getter2.get("A"));
- assertEquals(new Integer(1), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
+ assertEquals(ValueAndTimestamp.make(1, 50L), getter2.get("A"));
+ assertEquals(ValueAndTimestamp.make(1, 10L), getter2.get("B"));
+ assertEquals(ValueAndTimestamp.make(1, 30L), getter2.get("C"));
- assertEquals(new Integer(-1), getter3.get("A"));
- assertEquals(new Integer(-1), getter3.get("B"));
- assertEquals(new Integer(-1), getter3.get("C"));
+ assertEquals(ValueAndTimestamp.make(-1, 50L), getter3.get("A"));
+ assertEquals(ValueAndTimestamp.make(-1, 10L), getter3.get("B"));
+ assertEquals(ValueAndTimestamp.make(-1, 30L), getter3.get("C"));
- driver.pipeInput(recordFactory.create(topic1, "A", "02"));
- driver.pipeInput(recordFactory.create(topic1, "B", "02"));
+ driver.pipeInput(recordFactory.create(topic1, "A", "02", 25L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "02", 20L));
- assertEquals(new Integer(2), getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
+ assertEquals(ValueAndTimestamp.make(2, 25L), getter2.get("A"));
+ assertEquals(ValueAndTimestamp.make(2, 20L), getter2.get("B"));
+ assertEquals(ValueAndTimestamp.make(1, 30L), getter2.get("C"));
- assertEquals(new Integer(-2), getter3.get("A"));
- assertEquals(new Integer(-2), getter3.get("B"));
- assertEquals(new Integer(-1), getter3.get("C"));
+ assertEquals(ValueAndTimestamp.make(-2, 25L), getter3.get("A"));
+ assertEquals(ValueAndTimestamp.make(-2, 20L), getter3.get("B"));
+ assertEquals(ValueAndTimestamp.make(-1, 30L), getter3.get("C"));
- driver.pipeInput(recordFactory.create(topic1, "A", "03"));
+ driver.pipeInput(recordFactory.create(topic1, "A", "03", 35L));
- assertEquals(new Integer(3), getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
+ assertEquals(ValueAndTimestamp.make(3, 35L), getter2.get("A"));
+ assertEquals(ValueAndTimestamp.make(2, 20L), getter2.get("B"));
+ assertEquals(ValueAndTimestamp.make(1, 30L), getter2.get("C"));
- assertEquals(new Integer(-3), getter3.get("A"));
- assertEquals(new Integer(-2), getter3.get("B"));
- assertEquals(new Integer(-1), getter3.get("C"));
+ assertEquals(ValueAndTimestamp.make(-3, 35L), getter3.get("A"));
+ assertEquals(ValueAndTimestamp.make(-2, 20L), getter3.get("B"));
+ assertEquals(ValueAndTimestamp.make(-1, 30L), getter3.get("C"));
- driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
+ driver.pipeInput(recordFactory.create(topic1, "A", (String) null, 1L));
assertNull(getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
+ assertEquals(ValueAndTimestamp.make(2, 20L), getter2.get("B"));
+ assertEquals(ValueAndTimestamp.make(1, 30L), getter2.get("C"));
assertNull(getter3.get("A"));
- assertEquals(new Integer(-2), getter3.get("B"));
- assertEquals(new Integer(-1), getter3.get("C"));
+ assertEquals(ValueAndTimestamp.make(-2, 20L), getter3.get("B"));
+ assertEquals(ValueAndTimestamp.make(-1, 30L), getter3.get("C"));
}
}
@@ -208,20 +209,20 @@ public class KTableMapValuesTest {
assertFalse(table1.sendingOldValueEnabled());
assertFalse(table2.sendingOldValueEnabled());
- driver.pipeInput(recordFactory.create(topic1, "A", "01"));
- driver.pipeInput(recordFactory.create(topic1, "B", "01"));
- driver.pipeInput(recordFactory.create(topic1, "C", "01"));
- proc.checkAndClearProcessResult("A:(1<-null) (ts: 0)", "B:(1<-null) (ts: 0)", "C:(1<-null) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic1, "A", "01", 5L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "01", 10L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "01", 15L));
+ proc.checkAndClearProcessResult("A:(1<-null) (ts: 5)", "B:(1<-null) (ts: 10)", "C:(1<-null) (ts: 15)");
- driver.pipeInput(recordFactory.create(topic1, "A", "02"));
- driver.pipeInput(recordFactory.create(topic1, "B", "02"));
- proc.checkAndClearProcessResult("A:(2<-null) (ts: 0)", "B:(2<-null) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic1, "A", "02", 10L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "02", 8L));
+ proc.checkAndClearProcessResult("A:(2<-null) (ts: 10)", "B:(2<-null) (ts: 8)");
- driver.pipeInput(recordFactory.create(topic1, "A", "03"));
- proc.checkAndClearProcessResult("A:(3<-null) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic1, "A", "03", 20L));
+ proc.checkAndClearProcessResult("A:(3<-null) (ts: 20)");
- driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
- proc.checkAndClearProcessResult("A:(null<-null) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic1, "A", (String) null, 30L));
+ proc.checkAndClearProcessResult("A:(null<-null) (ts: 30)");
}
}
@@ -245,20 +246,20 @@ public class KTableMapValuesTest {
assertTrue(table1.sendingOldValueEnabled());
assertTrue(table2.sendingOldValueEnabled());
- driver.pipeInput(recordFactory.create(topic1, "A", "01"));
- driver.pipeInput(recordFactory.create(topic1, "B", "01"));
- driver.pipeInput(recordFactory.create(topic1, "C", "01"));
- proc.checkAndClearProcessResult("A:(1<-null) (ts: 0)", "B:(1<-null) (ts: 0)", "C:(1<-null) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic1, "A", "01", 5L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "01", 10L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "01", 15L));
+ proc.checkAndClearProcessResult("A:(1<-null) (ts: 5)", "B:(1<-null) (ts: 10)", "C:(1<-null) (ts: 15)");
- driver.pipeInput(recordFactory.create(topic1, "A", "02"));
- driver.pipeInput(recordFactory.create(topic1, "B", "02"));
- proc.checkAndClearProcessResult("A:(2<-1) (ts: 0)", "B:(2<-1) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic1, "A", "02", 10L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "02", 8L));
+ proc.checkAndClearProcessResult("A:(2<-1) (ts: 10)", "B:(2<-1) (ts: 8)");
- driver.pipeInput(recordFactory.create(topic1, "A", "03"));
- proc.checkAndClearProcessResult("A:(3<-2) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic1, "A", "03", 20L));
+ proc.checkAndClearProcessResult("A:(3<-2) (ts: 20)");
- driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
- proc.checkAndClearProcessResult("A:(null<-3) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic1, "A", (String) null, 30L));
+ proc.checkAndClearProcessResult("A:(null<-3) (ts: 30)");
}
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
index 600f3db..b360151 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -37,7 +36,7 @@ public class KTableReduceTest {
@Test
public void shouldAddAndSubtract() {
- final AbstractProcessorContext context = new InternalMockProcessorContext();
+ final InternalMockProcessorContext context = new InternalMockProcessorContext();
final Processor<String, Change<Set<String>>> reduceProcessor =
new KTableReduce<String, Set<String>>(
@@ -53,12 +52,15 @@ public class KTableReduceTest {
reduceProcessor.init(context);
context.setCurrentNode(new ProcessorNode<>("reduce", reduceProcessor, singleton("myStore")));
+ context.setTime(10L);
reduceProcessor.process("A", new Change<>(singleton("a"), null));
- assertEquals(ValueAndTimestamp.make(singleton("a"), -1L), myStore.get("A"));
+ assertEquals(ValueAndTimestamp.make(singleton("a"), 10L), myStore.get("A"));
+ context.setTime(15L);
reduceProcessor.process("A", new Change<>(singleton("b"), singleton("a")));
- assertEquals(ValueAndTimestamp.make(singleton("b"), -1L), myStore.get("A"));
+ assertEquals(ValueAndTimestamp.make(singleton("b"), 15L), myStore.get("A"));
+ context.setTime(12L);
reduceProcessor.process("A", new Change<>(null, singleton("b")));
- assertEquals(ValueAndTimestamp.make(emptySet(), -1L), myStore.get("A"));
+ assertEquals(ValueAndTimestamp.make(emptySet(), 15L), myStore.get("A"));
}
private Set<String> differenceNotNullArgs(final Set<String> left, final Set<String> right) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index aa0c8d7..848b290 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
@@ -64,16 +65,16 @@ public class KTableSourceTest {
final ConsumerRecordFactory<String, Integer> integerFactory =
new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer(), 0L);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
- driver.pipeInput(integerFactory.create(topic1, "A", 1));
- driver.pipeInput(integerFactory.create(topic1, "B", 2));
- driver.pipeInput(integerFactory.create(topic1, "C", 3));
- driver.pipeInput(integerFactory.create(topic1, "D", 4));
- driver.pipeInput(integerFactory.create(topic1, "A", null));
- driver.pipeInput(integerFactory.create(topic1, "B", null));
+ driver.pipeInput(integerFactory.create(topic1, "A", 1, 10L));
+ driver.pipeInput(integerFactory.create(topic1, "B", 2, 11L));
+ driver.pipeInput(integerFactory.create(topic1, "C", 3, 12L));
+ driver.pipeInput(integerFactory.create(topic1, "D", 4, 13L));
+ driver.pipeInput(integerFactory.create(topic1, "A", null, 14L));
+ driver.pipeInput(integerFactory.create(topic1, "B", null, 15L));
}
assertEquals(
- asList("A:1 (ts: 0)", "B:2 (ts: 0)", "C:3 (ts: 0)", "D:4 (ts: 0)", "A:null (ts: 0)", "B:null (ts: 0)"),
+ asList("A:1 (ts: 10)", "B:2 (ts: 11)", "C:3 (ts: 12)", "D:4 (ts: 13)", "A:null (ts: 14)", "B:null (ts: 15)"),
supplier.theCapturedProcessor().processed);
}
@@ -112,33 +113,33 @@ public class KTableSourceTest {
final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
- driver.pipeInput(recordFactory.create(topic1, "A", "01"));
- driver.pipeInput(recordFactory.create(topic1, "B", "01"));
- driver.pipeInput(recordFactory.create(topic1, "C", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "A", "01", 10L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "01", 20L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "01", 15L));
- assertEquals("01", getter1.get("A"));
- assertEquals("01", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
+ assertEquals(ValueAndTimestamp.make("01", 10L), getter1.get("A"));
+ assertEquals(ValueAndTimestamp.make("01", 20L), getter1.get("B"));
+ assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
- driver.pipeInput(recordFactory.create(topic1, "A", "02"));
- driver.pipeInput(recordFactory.create(topic1, "B", "02"));
+ driver.pipeInput(recordFactory.create(topic1, "A", "02", 30L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "02", 5L));
- assertEquals("02", getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
+ assertEquals(ValueAndTimestamp.make("02", 30L), getter1.get("A"));
+ assertEquals(ValueAndTimestamp.make("02", 5L), getter1.get("B"));
+ assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
- driver.pipeInput(recordFactory.create(topic1, "A", "03"));
+ driver.pipeInput(recordFactory.create(topic1, "A", "03", 29L));
- assertEquals("03", getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
+ assertEquals(ValueAndTimestamp.make("03", 29L), getter1.get("A"));
+ assertEquals(ValueAndTimestamp.make("02", 5L), getter1.get("B"));
+ assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
- driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
- driver.pipeInput(recordFactory.create(topic1, "B", (String) null));
+ driver.pipeInput(recordFactory.create(topic1, "A", (String) null, 50L));
+ driver.pipeInput(recordFactory.create(topic1, "B", (String) null, 3L));
assertNull(getter1.get("A"));
assertNull(getter1.get("B"));
- assertEquals("01", getter1.get("C"));
+ assertEquals(ValueAndTimestamp.make("01", 15L), getter1.get("C"));
}
}
@@ -156,21 +157,21 @@ public class KTableSourceTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor();
- driver.pipeInput(recordFactory.create(topic1, "A", "01"));
- driver.pipeInput(recordFactory.create(topic1, "B", "01"));
- driver.pipeInput(recordFactory.create(topic1, "C", "01"));
- proc1.checkAndClearProcessResult("A:(01<-null) (ts: 0)", "B:(01<-null) (ts: 0)", "C:(01<-null) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic1, "A", "01", 10L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "01", 20L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "01", 15L));
+ proc1.checkAndClearProcessResult("A:(01<-null) (ts: 10)", "B:(01<-null) (ts: 20)", "C:(01<-null) (ts: 15)");
- driver.pipeInput(recordFactory.create(topic1, "A", "02"));
- driver.pipeInput(recordFactory.create(topic1, "B", "02"));
- proc1.checkAndClearProcessResult("A:(02<-null) (ts: 0)", "B:(02<-null) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic1, "A", "02", 8L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "02", 22L));
+ proc1.checkAndClearProcessResult("A:(02<-null) (ts: 8)", "B:(02<-null) (ts: 22)");
- driver.pipeInput(recordFactory.create(topic1, "A", "03"));
- proc1.checkAndClearProcessResult("A:(03<-null) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic1, "A", "03", 12L));
+ proc1.checkAndClearProcessResult("A:(03<-null) (ts: 12)");
- driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
- driver.pipeInput(recordFactory.create(topic1, "B", (String) null));
- proc1.checkAndClearProcessResult("A:(null<-null) (ts: 0)", "B:(null<-null) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic1, "A", (String) null, 15L));
+ driver.pipeInput(recordFactory.create(topic1, "B", (String) null, 20L));
+ proc1.checkAndClearProcessResult("A:(null<-null) (ts: 15)", "B:(null<-null) (ts: 20)");
}
}
@@ -190,21 +191,21 @@ public class KTableSourceTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor();
- driver.pipeInput(recordFactory.create(topic1, "A", "01"));
- driver.pipeInput(recordFactory.create(topic1, "B", "01"));
- driver.pipeInput(recordFactory.create(topic1, "C", "01"));
- proc1.checkAndClearProcessResult("A:(01<-null) (ts: 0)", "B:(01<-null) (ts: 0)", "C:(01<-null) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic1, "A", "01", 10L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "01", 20L));
+ driver.pipeInput(recordFactory.create(topic1, "C", "01", 15L));
+ proc1.checkAndClearProcessResult("A:(01<-null) (ts: 10)", "B:(01<-null) (ts: 20)", "C:(01<-null) (ts: 15)");
- driver.pipeInput(recordFactory.create(topic1, "A", "02"));
- driver.pipeInput(recordFactory.create(topic1, "B", "02"));
- proc1.checkAndClearProcessResult("A:(02<-01) (ts: 0)", "B:(02<-01) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic1, "A", "02", 8L));
+ driver.pipeInput(recordFactory.create(topic1, "B", "02", 22L));
+ proc1.checkAndClearProcessResult("A:(02<-01) (ts: 8)", "B:(02<-01) (ts: 22)");
- driver.pipeInput(recordFactory.create(topic1, "A", "03"));
- proc1.checkAndClearProcessResult("A:(03<-02) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic1, "A", "03", 12L));
+ proc1.checkAndClearProcessResult("A:(03<-02) (ts: 12)");
- driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
- driver.pipeInput(recordFactory.create(topic1, "B", (String) null));
- proc1.checkAndClearProcessResult("A:(null<-03) (ts: 0)", "B:(null<-02) (ts: 0)");
+ driver.pipeInput(recordFactory.create(topic1, "A", (String) null, 15L));
+ driver.pipeInput(recordFactory.create(topic1, "B", (String) null, 20L));
+ proc1.checkAndClearProcessResult("A:(null<-03) (ts: 15)", "B:(null<-02) (ts: 20)");
}
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index a9bf7f0..c2ddb54 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -205,13 +205,13 @@ public class KTableTransformValuesTest {
expect(parent.valueGetterSupplier()).andReturn(parentGetterSupplier);
expect(parentGetterSupplier.get()).andReturn(parentGetter);
- expect(parentGetter.get("Key")).andReturn("Value");
+ expect(parentGetter.get("Key")).andReturn(ValueAndTimestamp.make("Value", -1L));
replay(parent, parentGetterSupplier, parentGetter);
final KTableValueGetter<String, String> getter = transformValues.view().get();
getter.init(context);
- final String result = getter.get("Key");
+ final String result = getter.get("Key").value();
assertThat(result, is("Key->Value!"));
}
@@ -228,7 +228,7 @@ public class KTableTransformValuesTest {
final KTableValueGetter<String, String> getter = transformValues.view().get();
getter.init(context);
- final String result = getter.get("Key");
+ final String result = getter.get("Key").value();
assertThat(result, is("something"));
}
@@ -327,11 +327,11 @@ public class KTableTransformValuesTest {
driver = new TopologyTestDriver(builder.build(), props());
- driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a"));
- driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b"));
- driver.pipeInput(recordFactory.create(INPUT_TOPIC, "D", (String) null));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 5L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b", 10L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC, "D", (String) null, 15L));
- assertThat(output(), hasItems("A:A->a! (ts: 0)", "B:B->b! (ts: 0)", "D:D->null! (ts: 0)"));
+ assertThat(output(), hasItems("A:A->a! (ts: 5)", "B:B->b! (ts: 10)", "D:D->null! (ts: 15)"));
assertNull("Store should not be materialized", driver.getKeyValueStore(QUERYABLE_NAME));
}
@@ -351,16 +351,24 @@ public class KTableTransformValuesTest {
driver = new TopologyTestDriver(builder.build(), props());
- driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a"));
- driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b"));
- driver.pipeInput(recordFactory.create(INPUT_TOPIC, "C", (String) null));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 5L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b", 10L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC, "C", (String) null, 15L));
- assertThat(output(), hasItems("A:A->a! (ts: 0)", "B:B->b! (ts: 0)", "C:C->null! (ts: 0)"));
+ assertThat(output(), hasItems("A:A->a! (ts: 5)", "B:B->b! (ts: 10)", "C:C->null! (ts: 15)"));
- final KeyValueStore<String, String> keyValueStore = driver.getKeyValueStore(QUERYABLE_NAME);
- assertThat(keyValueStore.get("A"), is("A->a!"));
- assertThat(keyValueStore.get("B"), is("B->b!"));
- assertThat(keyValueStore.get("C"), is("C->null!"));
+ {
+ final KeyValueStore<String, String> keyValueStore = driver.getKeyValueStore(QUERYABLE_NAME);
+ assertThat(keyValueStore.get("A"), is("A->a!"));
+ assertThat(keyValueStore.get("B"), is("B->b!"));
+ assertThat(keyValueStore.get("C"), is("C->null!"));
+ }
+ {
+ final KeyValueStore<String, ValueAndTimestamp<String>> keyValueStore = driver.getTimestampedKeyValueStore(QUERYABLE_NAME);
+ assertThat(keyValueStore.get("A"), is(ValueAndTimestamp.make("A->a!", 5L)));
+ assertThat(keyValueStore.get("B"), is(ValueAndTimestamp.make("B->b!", 10L)));
+ assertThat(keyValueStore.get("C"), is(ValueAndTimestamp.make("C->null!", 15L)));
+ }
}
@Test
@@ -380,11 +388,11 @@ public class KTableTransformValuesTest {
driver = new TopologyTestDriver(builder.build(), props());
- driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "ignore"));
- driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "ignored"));
- driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "ignored"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "ignored", 5L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "ignored", 15L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "ignored", 10L));
- assertThat(output(), hasItems("A:1 (ts: 0)", "A:0 (ts: 0)", "A:2 (ts: 0)", "A:0 (ts: 0)", "A:3 (ts: 0)"));
+ assertThat(output(), hasItems("A:1 (ts: 5)", "A:0 (ts: 15)", "A:2 (ts: 15)", "A:0 (ts: 15)", "A:3 (ts: 15)"));
final KeyValueStore<String, Integer> keyValueStore = driver.getKeyValueStore(QUERYABLE_NAME);
assertThat(keyValueStore.get("A"), is(3));
@@ -403,11 +411,11 @@ public class KTableTransformValuesTest {
driver = new TopologyTestDriver(builder.build(), props());
- driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a"));
- driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "aa"));
- driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "aaa"));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 5L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "aa", 15L));
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "aaa", 10));
- assertThat(output(), hasItems("A:1 (ts: 0)", "A:0 (ts: 0)", "A:2 (ts: 0)", "A:0 (ts: 0)", "A:3 (ts: 0)"));
+ assertThat(output(), hasItems("A:1 (ts: 5)", "A:0 (ts: 15)", "A:2 (ts: 15)", "A:0 (ts: 15)", "A:3 (ts: 15)"));
}
private ArrayList<String> output() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index a7151cd..43ad42a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -392,14 +392,14 @@ public class SuppressScenarioTest {
new KeyValueTimestamp<>("[k1@0/2]", 2L, 1L),
new KeyValueTimestamp<>("[k1@2/4]", 1L, 2L),
new KeyValueTimestamp<>("[k1@0/2]", 3L, 1L),
- new KeyValueTimestamp<>("[k1@0/2]", 4L, 0L),
+ new KeyValueTimestamp<>("[k1@0/2]", 4L, 1L),
new KeyValueTimestamp<>("[k1@4/6]", 1L, 5L)
)
);
verify(
drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
asList(
- new KeyValueTimestamp<>("[k1@0/2]", 4L, 0L),
+ new KeyValueTimestamp<>("[k1@0/2]", 4L, 1L),
new KeyValueTimestamp<>("[k1@2/4]", 1L, 2L)
)
);
@@ -444,9 +444,9 @@ public class SuppressScenarioTest {
new KeyValueTimestamp<>("[k1@0/2]", 1L, 0L),
new KeyValueTimestamp<>("[k1@0/2]", 2L, 1L),
new KeyValueTimestamp<>("[k1@2/4]", 1L, 2L),
- new KeyValueTimestamp<>("[k1@0/2]", 3L, 0L),
+ new KeyValueTimestamp<>("[k1@0/2]", 3L, 1L),
new KeyValueTimestamp<>("[k1@2/4]", 2L, 3L),
- new KeyValueTimestamp<>("[k1@0/2]", 4L, 0L),
+ new KeyValueTimestamp<>("[k1@0/2]", 4L, 1L),
new KeyValueTimestamp<>("[k1@4/6]", 1L, 4L),
new KeyValueTimestamp<>("[k1@30/32]", 1L, 30L)
)
@@ -454,7 +454,7 @@ public class SuppressScenarioTest {
verify(
drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
asList(
- new KeyValueTimestamp<>("[k1@0/2]", 4L, 0L),
+ new KeyValueTimestamp<>("[k1@0/2]", 4L, 1L),
new KeyValueTimestamp<>("[k1@2/4]", 2L, 3L),
new KeyValueTimestamp<>("[k1@4/6]", 1L, 4L)
)
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index 743d710..0c4685a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -30,19 +30,19 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import static java.time.Duration.ofMillis;
@@ -68,53 +68,80 @@ public class TimeWindowedKStreamImplTest {
@Test
public void shouldCountWindowed() {
- final Map<Windowed<String>, Long> results = new HashMap<>();
+ final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
windowedStream
.count()
.toStream()
- .foreach(results::put);
+ .process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(driver);
}
- assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo(2L));
- assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo(1L));
- assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo(1L));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+ .get(new Windowed<>("1", new TimeWindow(0L, 500L))),
+ equalTo(ValueAndTimestamp.make(2L, 15L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+ .get(new Windowed<>("2", new TimeWindow(500L, 1000L))),
+ equalTo(ValueAndTimestamp.make(2L, 550L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+ .get(new Windowed<>("1", new TimeWindow(500L, 1000L))),
+ equalTo(ValueAndTimestamp.make(1L, 500L)));
}
@Test
public void shouldReduceWindowed() {
- final Map<Windowed<String>, String> results = new HashMap<>();
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
windowedStream
.reduce(MockReducer.STRING_ADDER)
.toStream()
- .foreach(results::put);
+ .process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(driver);
}
- assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("1+2"));
- assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("1"));
- assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("3"));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+ .get(new Windowed<>("1", new TimeWindow(0L, 500L))),
+ equalTo(ValueAndTimestamp.make("1+2", 15L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+ .get(new Windowed<>("2", new TimeWindow(500L, 1000L))),
+ equalTo(ValueAndTimestamp.make("10+20", 550L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+ .get(new Windowed<>("1", new TimeWindow(500L, 1000L))),
+ equalTo(ValueAndTimestamp.make("3", 500L)));
}
@Test
public void shouldAggregateWindowed() {
- final Map<Windowed<String>, String> results = new HashMap<>();
+ final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
windowedStream
.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
Materialized.with(Serdes.String(), Serdes.String()))
.toStream()
- .foreach(results::put);
+ .process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(driver);
}
- assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("0+1+2"));
- assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("0+1"));
- assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("0+3"));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+ .get(new Windowed<>("1", new TimeWindow(0L, 500L))),
+ equalTo(ValueAndTimestamp.make("0+1+2", 15L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+ .get(new Windowed<>("2", new TimeWindow(500L, 1000L))),
+ equalTo(ValueAndTimestamp.make("0+10+20", 550L)));
+ assertThat(
+ supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+ .get(new Windowed<>("1", new TimeWindow(500L, 1000L))),
+ equalTo(ValueAndTimestamp.make("0+3", 500L)));
}
@Test
@@ -126,14 +153,27 @@ public class TimeWindowedKStreamImplTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(driver);
- final WindowStore<String, Long> windowStore = driver.getWindowStore("count-store");
- final List<KeyValue<Windowed<String>, Long>> data =
- StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+ {
+ final WindowStore<String, Long> windowStore = driver.getWindowStore("count-store");
+ final List<KeyValue<Windowed<String>, Long>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
- assertThat(data, equalTo(Arrays.asList(
+ assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L))));
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L))));
+ }
+ {
+ final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
+ driver.getTimestampedWindowStore("count-store");
+ final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make(2L, 15L)),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make(1L, 500L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make(2L, 550L)))));
+ }
}
}
@@ -147,14 +187,26 @@ public class TimeWindowedKStreamImplTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(driver);
- final WindowStore<String, String> windowStore = driver.getWindowStore("reduced");
- final List<KeyValue<Windowed<String>, String>> data =
- StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+ {
+ final WindowStore<String, String> windowStore = driver.getWindowStore("reduced");
+ final List<KeyValue<Windowed<String>, String>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
- assertThat(data, equalTo(Arrays.asList(
+ assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1"))));
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "10+20"))));
+ }
+ {
+ final WindowStore<String, ValueAndTimestamp<String>> windowStore = driver.getTimestampedWindowStore("reduced");
+ final List<KeyValue<Windowed<String>, ValueAndTimestamp<String>>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("1+2", 15L)),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("3", 500L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("10+20", 550L)))));
+ }
}
}
@@ -169,14 +221,26 @@ public class TimeWindowedKStreamImplTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
processData(driver);
- final WindowStore<String, String> windowStore = driver.getWindowStore("aggregated");
- final List<KeyValue<Windowed<String>, String>> data =
- StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+ {
+ final WindowStore<String, String> windowStore = driver.getWindowStore("aggregated");
+ final List<KeyValue<Windowed<String>, String>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
- assertThat(data, equalTo(Arrays.asList(
+ assertThat(data, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1"))));
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+10+20"))));
+ }
+ {
+ final WindowStore<String, ValueAndTimestamp<String>> windowStore = driver.getTimestampedWindowStore("aggregated");
+ final List<KeyValue<Windowed<String>, ValueAndTimestamp<String>>> data =
+ StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
+
+ assertThat(data, equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("0+1+2", 15L)),
+ KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+3", 500L)),
+ KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+10+20", 550L)))));
+ }
}
}
@@ -243,7 +307,8 @@ public class TimeWindowedKStreamImplTest {
driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10L));
driver.pipeInput(recordFactory.create(TOPIC, "1", "2", 15L));
driver.pipeInput(recordFactory.create(TOPIC, "1", "3", 500L));
- driver.pipeInput(recordFactory.create(TOPIC, "2", "1", 500L));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "10", 550L));
+ driver.pipeInput(recordFactory.create(TOPIC, "2", "20", 500L));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
index 068cb6b..52a5fcf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.junit.Test;
@@ -60,15 +61,19 @@ public class TimestampedTupleForwarderTest {
expect(store.setFlushListener(null, sendOldValues)).andReturn(false);
if (sendOldValues) {
- context.forward("key", new Change<>("newValue", "oldValue"));
+ context.forward("key1", new Change<>("newValue1", "oldValue1"));
+ context.forward("key2", new Change<>("newValue2", "oldValue2"), To.all().withTimestamp(42L));
} else {
- context.forward("key", new Change<>("newValue", null));
+ context.forward("key1", new Change<>("newValue1", null));
+ context.forward("key2", new Change<>("newValue2", null), To.all().withTimestamp(42L));
}
expectLastCall();
replay(store, context);
- new TimestampedTupleForwarder<>(store, context, null, sendOldValues)
- .maybeForward("key", "newValue", "oldValue");
+ final TimestampedTupleForwarder<String, String> forwarder =
+ new TimestampedTupleForwarder<>(store, context, null, sendOldValues);
+ forwarder.maybeForward("key1", "newValue1", "oldValue1");
+ forwarder.maybeForward("key2", "newValue2", "oldValue2", 42L);
verify(store, context);
}
@@ -81,8 +86,10 @@ public class TimestampedTupleForwarderTest {
expect(store.setFlushListener(null, false)).andReturn(true);
replay(store, context);
- new TimestampedTupleForwarder<>(store, context, null, false)
- .maybeForward("key", "newValue", "oldValue");
+ final TimestampedTupleForwarder<String, String> forwarder =
+ new TimestampedTupleForwarder<>(store, context, null, false);
+ forwarder.maybeForward("key", "newValue", "oldValue");
+ forwarder.maybeForward("key", "newValue", "oldValue", 42L);
verify(store, context);
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index 0ff5c8a..0c7a015 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.test;
+import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -35,6 +36,7 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
public final ArrayList<String> processed = new ArrayList<>();
public final ArrayList<K> processedKeys = new ArrayList<>();
public final ArrayList<V> processedValues = new ArrayList<>();
+ public final ArrayList<KeyValueTimestamp> processedWithTimestamps = new ArrayList<>();
public final Map<K, ValueAndTimestamp<V>> lastValueAndTimestampPerKey = new HashMap<>();
public final ArrayList<Long> punctuatedStreamTime = new ArrayList<>();
@@ -81,6 +83,7 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
public void process(final K key, final V value) {
processedKeys.add(key);
processedValues.add(value);
+ processedWithTimestamps.add(new KeyValueTimestamp<>(key, value, context().timestamp()));
if (value != null) {
lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, context().timestamp()));
} else {
@@ -105,6 +108,17 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
}
processed.clear();
+ processedWithTimestamps.clear();
+ }
+
+ public void checkAndClearProcessResult(final KeyValueTimestamp... expected) {
+ assertEquals("the number of outputs:" + processed, expected.length, processed.size());
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals("output[" + i + "]:", expected[i], processedWithTimestamps.get(i));
+ }
+
+ processed.clear();
+ processedWithTimestamps.clear();
}
public void requestCommit() {
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
index 7f1da6e..749c41f 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
@@ -27,6 +27,8 @@ import org.apache.kafka.streams.state.internals.ReadOnlyKeyValueStoreFacade;
import java.util.List;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
public class KeyValueStoreFacade<K, V> extends ReadOnlyKeyValueStoreFacade<K, V> implements KeyValueStore<K, V> {
public KeyValueStoreFacade(final TimestampedKeyValueStore<K, V> inner) {
@@ -48,8 +50,7 @@ public class KeyValueStoreFacade<K, V> extends ReadOnlyKeyValueStoreFacade<K, V>
@Override
public V putIfAbsent(final K key,
final V value) {
- final ValueAndTimestamp<V> old = inner.putIfAbsent(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
- return old == null ? null : old.value();
+ return getValueOrNull(inner.putIfAbsent(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP)));
}
@Override
@@ -61,8 +62,7 @@ public class KeyValueStoreFacade<K, V> extends ReadOnlyKeyValueStoreFacade<K, V>
@Override
public V delete(final K key) {
- final ValueAndTimestamp<V> old = inner.delete(key);
- return old == null ? null : old.value();
+ return getValueOrNull(inner.delete(key));
}
@Override