You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/09/25 20:59:24 UTC
[kafka] branch trunk updated: KAFKA-10077: Filter downstream of
state-store results in spurious tombstones (#9156)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dc81d44 KAFKA-10077: Filter downstream of state-store results in spurious tombstones (#9156)
dc81d44 is described below
commit dc81d442dfc5c3d8d9c069942f6b00e2de377a3c
Author: Andy Coates <80...@users.noreply.github.com>
AuthorDate: Fri Sep 25 21:58:04 2020 +0100
KAFKA-10077: Filter downstream of state-store results in spurious tombstones (#9156)
Reviewers: Guozhang Wang <gu...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../streams/kstream/internals/KTableAggregate.java | 4 +-
.../streams/kstream/internals/KTableFilter.java | 13 +-
.../streams/kstream/internals/KTableImpl.java | 21 ++-
.../internals/KTableKTableAbstractJoin.java | 8 +-
.../kstream/internals/KTableKTableJoinMerger.java | 8 +-
.../streams/kstream/internals/KTableMapValues.java | 8 +-
.../kstream/internals/KTableProcessorSupplier.java | 16 ++-
.../streams/kstream/internals/KTableReduce.java | 4 +-
.../kstream/internals/KTableRepartitionMap.java | 2 +-
.../kstream/internals/KTableTransformValues.java | 9 +-
.../suppress/KTableSuppressProcessorSupplier.java | 6 +-
.../kstream/internals/KTableFilterTest.java | 39 +++++-
.../streams/kstream/internals/KTableImplTest.java | 151 +++++++++++++++++----
.../internals/KTableKTableInnerJoinTest.java | 2 +-
.../internals/KTableKTableLeftJoinTest.java | 2 +-
.../internals/KTableKTableOuterJoinTest.java | 2 +-
.../kstream/internals/KTableMapValuesTest.java | 2 +-
.../kstream/internals/KTableSourceTest.java | 2 +-
.../internals/KTableTransformValuesTest.java | 10 +-
.../kafka/streams/scala/kstream/KTableTest.scala | 20 ++-
20 files changed, 247 insertions(+), 82 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index be0a7a8..1e64746 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -47,8 +47,10 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
}
@Override
- public void enableSendingOldValues() {
+ public boolean enableSendingOldValues(final boolean forceMaterialization) {
+ // Aggregates are always materialized:
sendOldValues = true;
+ return true;
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 6c9d452..0d41043 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -28,7 +28,7 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
private final Predicate<? super K, ? super V> predicate;
private final boolean filterNot;
private final String queryableName;
- private boolean sendOldValues = false;
+ private boolean sendOldValues;
KTableFilter(final KTableImpl<K, ?, V> parent,
final Predicate<? super K, ? super V> predicate,
@@ -38,6 +38,8 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
this.predicate = predicate;
this.filterNot = filterNot;
this.queryableName = queryableName;
+ // If upstream is already materialized, enable sending old values to avoid sending unnecessary tombstones:
+ this.sendOldValues = parent.enableSendingOldValues(false);
}
@Override
@@ -46,9 +48,11 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
}
@Override
- public void enableSendingOldValues() {
- parent.enableSendingOldValues();
- sendOldValues = true;
+ public boolean enableSendingOldValues(final boolean forceMaterialization) {
+ if (parent.enableSendingOldValues(forceMaterialization)) {
+ sendOldValues = true;
+ }
+ return sendOldValues;
}
private V computeValue(final K key, final V value) {
@@ -141,7 +145,6 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
this.parentGetter = parentGetter;
}
- @SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
parentGetter.init(context);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 74939b3..97d7e89 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -701,10 +701,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VO>) other));
if (leftOuter) {
- enableSendingOldValues();
+ enableSendingOldValues(true);
}
if (rightOuter) {
- ((KTableImpl<?, ?, ?>) other).enableSendingOldValues();
+ ((KTableImpl<?, ?, ?>) other).enableSendingOldValues(true);
}
final KTableKTableAbstractJoin<K, VR, V, VO> joinThis;
@@ -807,7 +807,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
builder.addGraphNode(this.streamsGraphNode, groupByMapNode);
- this.enableSendingOldValues();
+ this.enableSendingOldValues(true);
return new KGroupedTableImpl<>(
builder,
selectName,
@@ -832,18 +832,25 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
}
@SuppressWarnings("unchecked")
- public void enableSendingOldValues() {
+ public boolean enableSendingOldValues(final boolean forceMaterialization) {
if (!sendOldValues) {
if (processorSupplier instanceof KTableSource) {
final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+ if (!forceMaterialization && !source.materialized()) {
+ return false;
+ }
source.enableSendingOldValues();
} else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues();
} else {
- ((KTableProcessorSupplier<K, S, V>) processorSupplier).enableSendingOldValues();
+ final KTableProcessorSupplier<K, S, V> tableProcessorSupplier = (KTableProcessorSupplier<K, S, V>) processorSupplier;
+ if (!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) {
+ return false;
+ }
}
sendOldValues = true;
}
+ return true;
}
boolean sendingOldValueEnabled() {
@@ -967,11 +974,11 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
//Old values are a useful optimization. The old values from the foreignKeyTable table are compared to the new values,
//such that identical values do not cause a prefixScan. PrefixScan and propagation can be expensive and should
//not be done needlessly.
- ((KTableImpl<?, ?, ?>) foreignKeyTable).enableSendingOldValues();
+ ((KTableImpl<?, ?, ?>) foreignKeyTable).enableSendingOldValues(true);
//Old values must be sent such that the ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the correct node.
//This occurs whenever the extracted foreignKey changes values.
- enableSendingOldValues();
+ enableSendingOldValues(true);
final NamedInternal renamed = new NamedInternal(joinName);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
index b0189df..ecaaf45 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
@@ -39,9 +39,11 @@ abstract class KTableKTableAbstractJoin<K, R, V1, V2> implements KTableProcessor
}
@Override
- public final void enableSendingOldValues() {
- table1.enableSendingOldValues();
- table2.enableSendingOldValues();
+ public final boolean enableSendingOldValues(final boolean forceMaterialization) {
+ // Table-table joins require upstream materialization:
+ table1.enableSendingOldValues(true);
+ table2.enableSendingOldValues(true);
sendOldValues = true;
+ return true;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 86088c2..c669fb2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -77,10 +77,12 @@ public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
}
@Override
- public void enableSendingOldValues() {
- parent1.enableSendingOldValues();
- parent2.enableSendingOldValues();
+ public boolean enableSendingOldValues(final boolean forceMaterialization) {
+ // Table-table joins require upstream materialization:
+ parent1.enableSendingOldValues(true);
+ parent2.enableSendingOldValues(true);
sendOldValues = true;
+ return true;
}
public static <K, V> KTableKTableJoinMerger<K, V> of(final KTableProcessorSupplier<K, ?, V> parent1,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index e734457..650cef1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -66,9 +66,11 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
}
@Override
- public void enableSendingOldValues() {
- parent.enableSendingOldValues();
- sendOldValues = true;
+ public boolean enableSendingOldValues(final boolean forceMaterialization) {
+ if (parent.enableSendingOldValues(forceMaterialization)) {
+ sendOldValues = true;
+ }
+ return sendOldValues;
}
private V1 computeValue(final K key, final V value) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
index 4be6c5b..ff6f9d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
@@ -22,5 +22,19 @@ public interface KTableProcessorSupplier<K, V, T> extends ProcessorSupplier<K, C
KTableValueGetterSupplier<K, T> view();
- void enableSendingOldValues();
+ /**
+ * Potentially enables sending old values.
+ * <p>
+ * If {@code forceMaterialization} is {@code true}, the method will force the materialization of upstream nodes to
+ * enable sending old values.
+ * <p>
+ * If {@code forceMaterialization} is {@code false}, the method will only enable the sending of old values <i>if</i>
+ * an upstream node is already materialized.
+ *
+ * @param forceMaterialization indicates if an upstream node should be forced to materialize to enable sending old
+ * values.
+ * @return {@code true} is sending old values is enabled, i.e. either because {@code forceMaterialization} was
+ * {@code true} or some upstream node is materialized.
+ */
+ boolean enableSendingOldValues(boolean forceMaterialization);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 171912f..88f62f9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -41,8 +41,10 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
}
@Override
- public void enableSendingOldValues() {
+ public boolean enableSendingOldValues(final boolean forceMaterialization) {
+ // Reduce is always materialized:
sendOldValues = true;
+ return true;
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 22e7dec..d5dc5db 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -67,7 +67,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
* @throws IllegalStateException since this method should never be called
*/
@Override
- public void enableSendingOldValues() {
+ public boolean enableSendingOldValues(final boolean forceMaterialization) {
// this should never be called
throw new IllegalStateException("KTableRepartitionMap should always require sending old values.");
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index 86da063..cf0517f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -72,12 +72,13 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
}
@Override
- public void enableSendingOldValues() {
- parent.enableSendingOldValues();
- sendOldValues = true;
+ public boolean enableSendingOldValues(final boolean forceMaterialization) {
+ if (parent.enableSendingOldValues(forceMaterialization)) {
+ sendOldValues = true;
+ }
+ return sendOldValues;
}
-
private class KTableTransformValuesProcessor extends AbstractProcessor<K, Change<V>> {
private final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;
private TimestampedKeyValueStore<K, V1> store;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
index a08c2b0..6a300b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
@@ -50,7 +50,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
this.storeName = storeName;
this.parentKTable = parentKTable;
// The suppress buffer requires seeing the old values, to support the prior value view.
- parentKTable.enableSendingOldValues();
+ parentKTable.enableSendingOldValues(true);
}
@Override
@@ -108,8 +108,8 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
}
@Override
- public void enableSendingOldValues() {
- parentKTable.enableSendingOldValues();
+ public boolean enableSendingOldValues(final boolean forceMaterialization) {
+ return parentKTable.enableSendingOldValues(forceMaterialization);
}
private static final class KTableSuppressProcessor<K, V> extends AbstractProcessor<K, Change<V>> {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index b61f77b..57b8bfb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -297,12 +297,24 @@ public class KTableFilterTest {
doTestNotSendingOldValue(builder, table1, table2, topic1);
}
+ @Test
+ public void shouldNotEnableSendingOldValuesIfNotAlreadyMaterializedAndNotForcedToMaterialize() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic1 = "topic1";
+
+ final KTableImpl<String, Integer, Integer> table1 =
+ (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+ final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+ table2.enableSendingOldValues(false);
+
+ doTestNotSendingOldValue(builder, table1, table2, topic1);
+ }
+
private void doTestSendingOldValue(final StreamsBuilder builder,
final KTableImpl<String, Integer, Integer> table1,
final KTableImpl<String, Integer, Integer> table2,
final String topic1) {
- table2.enableSendingOldValues();
-
final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
final Topology topology = builder.build();
@@ -347,7 +359,7 @@ public class KTableFilterTest {
}
@Test
- public void shouldSendOldValuesWhenEnabledWithoutMaterialization() {
+ public void shouldEnableSendOldValuesWhenNotMaterializedAlreadyButForcedToMaterialize() {
final StreamsBuilder builder = new StreamsBuilder();
final String topic1 = "topic1";
@@ -356,11 +368,13 @@ public class KTableFilterTest {
final KTableImpl<String, Integer, Integer> table2 =
(KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+ table2.enableSendingOldValues(true);
+
doTestSendingOldValue(builder, table1, table2, topic1);
}
@Test
- public void shouldSendOldValuesWhenEnabledOnMaterialization() {
+ public void shouldEnableSendOldValuesWhenMaterializedAlreadyAndForcedToMaterialize() {
final StreamsBuilder builder = new StreamsBuilder();
final String topic1 = "topic1";
@@ -369,6 +383,23 @@ public class KTableFilterTest {
final KTableImpl<String, Integer, Integer> table2 =
(KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("store2"));
+ table2.enableSendingOldValues(true);
+
+ doTestSendingOldValue(builder, table1, table2, topic1);
+ }
+
+ @Test
+ public void shouldSendOldValuesWhenEnabledOnUpStreamMaterialization() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic1 = "topic1";
+
+ final KTableImpl<String, Integer, Integer> table1 =
+ (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed, Materialized.as("store2"));
+ final KTableImpl<String, Integer, Integer> table2 =
+ (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+ table2.enableSendingOldValues(false);
+
doTestSendingOldValue(builder, table1, table2, topic1);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 3e21ae1..c212df2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.TopologyTestDriver;
@@ -42,7 +43,6 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
@@ -60,6 +60,8 @@ import java.util.Properties;
import static java.util.Arrays.asList;
import static org.easymock.EasyMock.mock;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -103,7 +105,7 @@ public class KTableImplTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic =
- driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
+ driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
inputTopic.pipeInput("A", "01", 5L);
inputTopic.pipeInput("B", "02", 100L);
inputTopic.pipeInput("C", "03", 0L);
@@ -113,30 +115,103 @@ public class KTableImplTest {
}
final List<MockProcessor<String, Object>> processors = supplier.capturedProcessors(4);
- assertEquals(asList(new KeyValueTimestamp<>("A", "01", 5),
- new KeyValueTimestamp<>("B", "02", 100),
- new KeyValueTimestamp<>("C", "03", 0),
- new KeyValueTimestamp<>("D", "04", 0),
- new KeyValueTimestamp<>("A", "05", 10),
- new KeyValueTimestamp<>("A", "06", 8)), processors.get(0).processed());
- assertEquals(asList(new KeyValueTimestamp<>("A", 1, 5),
- new KeyValueTimestamp<>("B", 2, 100),
- new KeyValueTimestamp<>("C", 3, 0),
- new KeyValueTimestamp<>("D", 4, 0),
- new KeyValueTimestamp<>("A", 5, 10),
- new KeyValueTimestamp<>("A", 6, 8)), processors.get(1).processed());
- assertEquals(asList(new KeyValueTimestamp<>("A", null, 5),
- new KeyValueTimestamp<>("B", 2, 100),
- new KeyValueTimestamp<>("C", null, 0),
- new KeyValueTimestamp<>("D", 4, 0),
- new KeyValueTimestamp<>("A", null, 10),
- new KeyValueTimestamp<>("A", 6, 8)), processors.get(2).processed());
- assertEquals(asList(new KeyValueTimestamp<>("A", "01", 5),
- new KeyValueTimestamp<>("B", "02", 100),
- new KeyValueTimestamp<>("C", "03", 0),
- new KeyValueTimestamp<>("D", "04", 0),
- new KeyValueTimestamp<>("A", "05", 10),
- new KeyValueTimestamp<>("A", "06", 8)), processors.get(3).processed());
+ assertEquals(asList(
+ new KeyValueTimestamp<>("A", "01", 5),
+ new KeyValueTimestamp<>("B", "02", 100),
+ new KeyValueTimestamp<>("C", "03", 0),
+ new KeyValueTimestamp<>("D", "04", 0),
+ new KeyValueTimestamp<>("A", "05", 10),
+ new KeyValueTimestamp<>("A", "06", 8)),
+ processors.get(0).processed());
+ assertEquals(asList(
+ new KeyValueTimestamp<>("A", 1, 5),
+ new KeyValueTimestamp<>("B", 2, 100),
+ new KeyValueTimestamp<>("C", 3, 0),
+ new KeyValueTimestamp<>("D", 4, 0),
+ new KeyValueTimestamp<>("A", 5, 10),
+ new KeyValueTimestamp<>("A", 6, 8)),
+ processors.get(1).processed());
+ assertEquals(asList(
+ new KeyValueTimestamp<>("A", null, 5),
+ new KeyValueTimestamp<>("B", 2, 100),
+ new KeyValueTimestamp<>("C", null, 0),
+ new KeyValueTimestamp<>("D", 4, 0),
+ new KeyValueTimestamp<>("A", null, 10),
+ new KeyValueTimestamp<>("A", 6, 8)),
+ processors.get(2).processed());
+ assertEquals(asList(
+ new KeyValueTimestamp<>("A", "01", 5),
+ new KeyValueTimestamp<>("B", "02", 100),
+ new KeyValueTimestamp<>("C", "03", 0),
+ new KeyValueTimestamp<>("D", "04", 0),
+ new KeyValueTimestamp<>("A", "05", 10),
+ new KeyValueTimestamp<>("A", "06", 8)),
+ processors.get(3).processed());
+ }
+
+ @Test
+ public void testMaterializedKTable() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final String topic1 = "topic1";
+ final String topic2 = "topic2";
+
+ final KTable<String, String> table1 = builder.table(topic1, consumed, Materialized.as("fred"));
+
+ final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
+ table1.toStream().process(supplier);
+
+ final KTable<String, Integer> table2 = table1.mapValues(s -> Integer.valueOf(s));
+ table2.toStream().process(supplier);
+
+ final KTable<String, Integer> table3 = table2.filter((key, value) -> (value % 2) == 0);
+ table3.toStream().process(supplier);
+ table1.toStream().to(topic2, produced);
+
+ final KTable<String, String> table4 = builder.table(topic2, consumed);
+ table4.toStream().process(supplier);
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer());
+ inputTopic.pipeInput("A", "01", 5L);
+ inputTopic.pipeInput("B", "02", 100L);
+ inputTopic.pipeInput("C", "03", 0L);
+ inputTopic.pipeInput("D", "04", 0L);
+ inputTopic.pipeInput("A", "05", 10L);
+ inputTopic.pipeInput("A", "06", 8L);
+ }
+
+ final List<MockProcessor<String, Object>> processors = supplier.capturedProcessors(4);
+ assertEquals(asList(
+ new KeyValueTimestamp<>("A", "01", 5),
+ new KeyValueTimestamp<>("B", "02", 100),
+ new KeyValueTimestamp<>("C", "03", 0),
+ new KeyValueTimestamp<>("D", "04", 0),
+ new KeyValueTimestamp<>("A", "05", 10),
+ new KeyValueTimestamp<>("A", "06", 8)),
+ processors.get(0).processed());
+ assertEquals(asList(
+ new KeyValueTimestamp<>("A", 1, 5),
+ new KeyValueTimestamp<>("B", 2, 100),
+ new KeyValueTimestamp<>("C", 3, 0),
+ new KeyValueTimestamp<>("D", 4, 0),
+ new KeyValueTimestamp<>("A", 5, 10),
+ new KeyValueTimestamp<>("A", 6, 8)),
+ processors.get(1).processed());
+ assertEquals(asList(
+ new KeyValueTimestamp<>("B", 2, 100),
+ new KeyValueTimestamp<>("D", 4, 0),
+ new KeyValueTimestamp<>("A", 6, 8)),
+ processors.get(2).processed());
+ assertEquals(asList(
+ new KeyValueTimestamp<>("A", "01", 5),
+ new KeyValueTimestamp<>("B", "02", 100),
+ new KeyValueTimestamp<>("C", "03", 0),
+ new KeyValueTimestamp<>("D", "04", 0),
+ new KeyValueTimestamp<>("A", "05", 10),
+ new KeyValueTimestamp<>("A", "06", 8)),
+ processors.get(3).processed());
}
@Test
@@ -304,6 +379,30 @@ public class KTableImplTest {
}
}
+ @Test
+ public void shouldNotEnableSendingOldValuesIfNotMaterializedAlreadyAndNotForcedToMaterialize() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final KTableImpl<String, String, String> table =
+ (KTableImpl<String, String, String>) builder.table("topic1", consumed);
+
+ table.enableSendingOldValues(false);
+
+ assertThat(table.sendingOldValueEnabled(), is(false));
+ }
+
+ @Test
+ public void shouldEnableSendingOldValuesIfNotMaterializedAlreadyButForcedToMaterialize() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final KTableImpl<String, String, String> table =
+ (KTableImpl<String, String, String>) builder.table("topic1", consumed);
+
+ table.enableSendingOldValues(true);
+
+ assertThat(table.sendingOldValueEnabled(), is(true));
+ }
+
private void assertTopologyContainsProcessor(final Topology topology, final String processorName) {
for (final TopologyDescription.Subtopology subtopology: topology.describe().subtopologies()) {
for (final TopologyDescription.Node node: subtopology.nodes()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 8da8e50..f003b52 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -153,7 +153,7 @@ public class KTableKTableInnerJoinTest {
table2 = builder.table(topic2, consumed);
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
- ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+ ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(true);
builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index e6add21..dc33243 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -321,7 +321,7 @@ public class KTableKTableLeftJoinTest {
table2 = builder.table(topic2, consumed);
joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
- ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+ ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(true);
final Topology topology = builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index b689fea..9f99ae6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -308,7 +308,7 @@ public class KTableKTableOuterJoinTest {
table2 = builder.table(topic2, consumed);
joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
- ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
+ ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(true);
final Topology topology = builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 0bca4ad..d48b38c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -249,7 +249,7 @@ public class KTableMapValuesTest {
(KTableImpl<String, String, String>) builder.table(topic1, consumed);
final KTableImpl<String, String, Integer> table2 =
(KTableImpl<String, String, Integer>) table1.mapValues(s -> Integer.valueOf(s));
- table2.enableSendingOldValues();
+ table2.enableSendingOldValues(true);
final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
builder.build().addProcessor("proc", supplier, table2.name);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index a89167e..e503403 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -324,7 +324,7 @@ public class KTableSourceTest {
@SuppressWarnings("unchecked")
final KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(topic1, stringConsumed);
- table1.enableSendingOldValues();
+ table1.enableSendingOldValues(true);
assertTrue(table1.sendingOldValueEnabled());
final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index fe2306a..4aac321 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -171,7 +171,10 @@ public class KTableTransformValuesTest {
final KTableTransformValues<String, String, String> transformValues =
new KTableTransformValues<>(parent, new ExclamationValueTransformerSupplier(), null);
- transformValues.enableSendingOldValues();
+ expect(parent.enableSendingOldValues(true)).andReturn(true);
+ replay(parent);
+
+ transformValues.enableSendingOldValues(true);
final Processor<String, Change<String>> processor = transformValues.get();
processor.init(context);
@@ -186,11 +189,10 @@ public class KTableTransformValuesTest {
@Test
public void shouldSetSendOldValuesOnParent() {
- parent.enableSendingOldValues();
- expectLastCall();
+ expect(parent.enableSendingOldValues(true)).andReturn(true);
replay(parent);
- new KTableTransformValues<>(parent, new SingletonNoOpValueTransformer<>(), QUERYABLE_NAME).enableSendingOldValues();
+ new KTableTransformValues<>(parent, new SingletonNoOpValueTransformer<>(), QUERYABLE_NAME).enableSendingOldValues(true);
verify(parent);
}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
index 2baca8c..8463cb3 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
@@ -39,29 +39,27 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
val sinkTopic = "sink"
val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count()
- table.filter((_, value) => value > 1).toStream.to(sinkTopic)
+ table.filter((key, value) => key.equals("a") && value == 1).toStream.to(sinkTopic)
val testDriver = createTestDriver(builder)
val testInput = testDriver.createInput[String, String](sourceTopic)
val testOutput = testDriver.createOutput[String, Long](sinkTopic)
{
- testInput.pipeInput("1", "value1")
+ testInput.pipeInput("a", "passes filter : add new row to table")
val record = testOutput.readKeyValue
- record.key shouldBe "1"
- record.value shouldBe (null: java.lang.Long)
+ record.key shouldBe "a"
+ record.value shouldBe 1
}
{
- testInput.pipeInput("1", "value2")
+ testInput.pipeInput("a", "fails filter : remove existing row from table")
val record = testOutput.readKeyValue
- record.key shouldBe "1"
- record.value shouldBe 2
+ record.key shouldBe "a"
+ record.value shouldBe (null: java.lang.Long)
}
{
- testInput.pipeInput("2", "value1")
- val record = testOutput.readKeyValue
- record.key shouldBe "2"
- record.value shouldBe (null: java.lang.Long)
+ testInput.pipeInput("b", "fails filter : no output")
+ testOutput.isEmpty shouldBe true
}
testOutput.isEmpty shouldBe true