You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/10/13 18:19:58 UTC
[kafka] branch trunk updated: KAFKA-10494: Eager handling of
sending old values (#9415)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 40ad4fe KAFKA-10494: Eager handling of sending old values (#9415)
40ad4fe is described below
commit 40ad4fe0ae9c687ed3c4d35fb5f5830cb1a867b8
Author: Andy Coates <80...@users.noreply.github.com>
AuthorDate: Tue Oct 13 19:19:05 2020 +0100
KAFKA-10494: Eager handling of sending old values (#9415)
Nodes that are materialized should not forward requests to `enableSendingOldValues` to parent nodes, as they themselves can handle fulfilling this request. However, some instances of `KTableProcessorSupplier` were still forwarding requests to parent nodes, which was causing unnecessary materialization of table sources.
The following instances of `KTableProcessorSupplier` have been updated to not forward `enableSendingOldValues` to parent nodes if they themselves are materialized and can handle sending old values downstream:
* `KTableFilter`
* `KTableMapValues`
* `KTableTransformValues`
Other instances of `KTableProcessorSupplier` have not be modified for reasons given below:
* `KTableSuppressProcessorSupplier`: though it has a `storeName` field, it didn't seem right for this to handle sending old values itself. Its only job is to suppress output.
* `KTableKTableAbstractJoin`: doesn't have a store name, i.e. it is never materialized, so can't handle the call itself.
* `KTableKTableJoinMerger`: table-table joins already have materialized sources, which are sending old values. It would be an unnecessary performance hit to have this class do a lookup to retrieve the old value from its store.
* `KTableReduce`: is always materialized and already handling the call without forwarding
* `KTableAggregate`: is always materialized and already handling the call without forwarding
Reviewer: Matthias J. Sax <ma...@confluent.io>
---
.../streams/kstream/internals/KTableFilter.java | 19 ++++++-
.../streams/kstream/internals/KTableMapValues.java | 20 ++++++-
.../kstream/internals/KTableTransformValues.java | 5 ++
.../kstream/internals/KTableFilterTest.java | 53 ++++++++++++++----
.../internals/KTableKTableLeftJoinTest.java | 5 ++
.../kstream/internals/KTableMapValuesTest.java | 65 +++++++++++++++++-----
.../internals/KTableTransformValuesTest.java | 22 ++++++--
7 files changed, 158 insertions(+), 31 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 0d41043..1a9d773 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -23,6 +23,8 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
private final KTableImpl<K, ?, V> parent;
private final Predicate<? super K, ? super V> predicate;
@@ -49,6 +51,11 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
@Override
public boolean enableSendingOldValues(final boolean forceMaterialization) {
+ if (queryableName != null) {
+ sendOldValues = true;
+ return true;
+ }
+
if (parent.enableSendingOldValues(forceMaterialization)) {
sendOldValues = true;
}
@@ -100,7 +107,7 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
@Override
public void process(final K key, final Change<V> change) {
final V newValue = computeValue(key, change.newValue);
- final V oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;
+ final V oldValue = computeOldValue(key, change);
if (sendOldValues && oldValue == null && newValue == null) {
return; // unnecessary to forward here.
@@ -113,6 +120,16 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
context().forward(key, new Change<>(newValue, oldValue));
}
}
+
+ private V computeOldValue(final K key, final Change<V> change) {
+ if (!sendOldValues) {
+ return null;
+ }
+
+ return queryableName != null
+ ? getValueOrNull(store.get(key))
+ : computeValue(key, change.oldValue);
+ }
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 650cef1..a325920 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -23,6 +23,8 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
private final KTableImpl<K, ?, V> parent;
@@ -67,9 +69,15 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
@Override
public boolean enableSendingOldValues(final boolean forceMaterialization) {
+ if (queryableName != null) {
+ sendOldValues = true;
+ return true;
+ }
+
if (parent.enableSendingOldValues(forceMaterialization)) {
sendOldValues = true;
}
+
return sendOldValues;
}
@@ -117,7 +125,7 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
@Override
public void process(final K key, final Change<V> change) {
final V1 newValue = computeValue(key, change.newValue);
- final V1 oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;
+ final V1 oldValue = computeOldValue(key, change);
if (queryableName != null) {
store.put(key, ValueAndTimestamp.make(newValue, context().timestamp()));
@@ -126,6 +134,16 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
context().forward(key, new Change<>(newValue, oldValue));
}
}
+
+ private V1 computeOldValue(final K key, final Change<V> change) {
+ if (!sendOldValues) {
+ return null;
+ }
+
+ return queryableName != null
+ ? getValueOrNull(store.get(key))
+ : computeValue(key, change.oldValue);
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index cf0517f..e4c76d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -73,6 +73,11 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
@Override
public boolean enableSendingOldValues(final boolean forceMaterialization) {
+ if (queryableName != null) {
+ sendOldValues = true;
+ return true;
+ }
+
if (parent.enableSendingOldValues(forceMaterialization)) {
sendOldValues = true;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 57b8bfb..7f772e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -48,6 +48,8 @@ import java.time.Instant;
import java.util.List;
import java.util.Properties;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -321,6 +323,8 @@ public class KTableFilterTest {
topology.addProcessor("proc1", supplier, table1.name);
topology.addProcessor("proc2", supplier, table2.name);
+ final boolean parentSendOldVals = table1.sendingOldValueEnabled();
+
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
final TestInputTopic<String, Integer> inputTopic =
driver.createInputTopic(topic1, new StringSerializer(), new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
@@ -330,31 +334,47 @@ public class KTableFilterTest {
inputTopic.pipeInput("C", 1, 15L);
final List<MockApiProcessor<String, Integer, Void, Void>> processors = supplier.capturedProcessors(2);
+ final MockApiProcessor<String, Integer, Void, Void> table1Output = processors.get(0);
+ final MockApiProcessor<String, Integer, Void, Void> table2Output = processors.get(1);
- processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(1, null), 5),
+ table1Output.checkAndClearProcessResult(
+ new KeyValueTimestamp<>("A", new Change<>(1, null), 5),
new KeyValueTimestamp<>("B", new Change<>(1, null), 10),
- new KeyValueTimestamp<>("C", new Change<>(1, null), 15));
- processors.get(1).checkEmptyAndClearProcessResult();
+ new KeyValueTimestamp<>("C", new Change<>(1, null), 15)
+ );
+ table2Output.checkEmptyAndClearProcessResult();
inputTopic.pipeInput("A", 2, 15L);
inputTopic.pipeInput("B", 2, 8L);
- processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(2, 1), 15),
- new KeyValueTimestamp<>("B", new Change<>(2, 1), 8));
- processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(2, null), 15),
- new KeyValueTimestamp<>("B", new Change<>(2, null), 8));
+ table1Output.checkAndClearProcessResult(
+ new KeyValueTimestamp<>("A", new Change<>(2, parentSendOldVals ? 1 : null), 15),
+ new KeyValueTimestamp<>("B", new Change<>(2, parentSendOldVals ? 1 : null), 8)
+ );
+ table2Output.checkAndClearProcessResult(
+ new KeyValueTimestamp<>("A", new Change<>(2, null), 15),
+ new KeyValueTimestamp<>("B", new Change<>(2, null), 8)
+ );
inputTopic.pipeInput("A", 3, 20L);
- processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(3, 2), 20));
- processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(null, 2), 20));
+ table1Output.checkAndClearProcessResult(
+ new KeyValueTimestamp<>("A", new Change<>(3, parentSendOldVals ? 2 : null), 20)
+ );
+ table2Output.checkAndClearProcessResult(
+ new KeyValueTimestamp<>("A", new Change<>(null, 2), 20)
+ );
inputTopic.pipeInput("A", null, 10L);
inputTopic.pipeInput("B", null, 20L);
- processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(null, 3), 10),
- new KeyValueTimestamp<>("B", new Change<>(null, 2), 20));
- processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>("B", new Change<>(null, 2), 20));
+ table1Output.checkAndClearProcessResult(
+ new KeyValueTimestamp<>("A", new Change<>(null, parentSendOldVals ? 3 : null), 10),
+ new KeyValueTimestamp<>("B", new Change<>(null, parentSendOldVals ? 2 : null), 20)
+ );
+ table2Output.checkAndClearProcessResult(
+ new KeyValueTimestamp<>("B", new Change<>(null, 2), 20)
+ );
}
}
@@ -370,6 +390,9 @@ public class KTableFilterTest {
table2.enableSendingOldValues(true);
+ assertThat(table1.sendingOldValueEnabled(), is(true));
+ assertThat(table2.sendingOldValueEnabled(), is(true));
+
doTestSendingOldValue(builder, table1, table2, topic1);
}
@@ -385,6 +408,9 @@ public class KTableFilterTest {
table2.enableSendingOldValues(true);
+ assertThat(table1.sendingOldValueEnabled(), is(false));
+ assertThat(table2.sendingOldValueEnabled(), is(true));
+
doTestSendingOldValue(builder, table1, table2, topic1);
}
@@ -400,6 +426,9 @@ public class KTableFilterTest {
table2.enableSendingOldValues(false);
+ assertThat(table1.sendingOldValueEnabled(), is(true));
+ assertThat(table2.sendingOldValueEnabled(), is(true));
+
doTestSendingOldValue(builder, table1, table2, topic1);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index dc33243..c1bc7fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -58,6 +58,7 @@ import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -323,6 +324,10 @@ public class KTableKTableLeftJoinTest {
((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(true);
+ assertThat(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled(), is(true));
+ assertThat(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled(), is(true));
+ assertThat(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled(), is(true));
+
final Topology topology = builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
try (final TopologyTestDriver driver = new TopologyTestDriverWrapper(topology, props)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index d48b38c..384d579 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -43,10 +43,11 @@ import java.time.Instant;
import java.util.Properties;
import static java.util.Arrays.asList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
@SuppressWarnings("unchecked")
public class KTableMapValuesTest {
@@ -241,7 +242,7 @@ public class KTableMapValuesTest {
}
@Test
- public void testSendingOldValue() {
+ public void shouldEnableSendingOldValuesOnParentIfMapValuesNotMaterialized() {
final StreamsBuilder builder = new StreamsBuilder();
final String topic1 = "topic1";
@@ -249,36 +250,74 @@ public class KTableMapValuesTest {
(KTableImpl<String, String, String>) builder.table(topic1, consumed);
final KTableImpl<String, String, Integer> table2 =
(KTableImpl<String, String, Integer>) table1.mapValues(s -> Integer.valueOf(s));
+
+ table2.enableSendingOldValues(true);
+
+ assertThat(table1.sendingOldValueEnabled(), is(true));
+ assertThat(table2.sendingOldValueEnabled(), is(true));
+
+ testSendingOldValues(builder, topic1, table2);
+ }
+
+ @Test
+ public void shouldNotEnableSendingOldValuesOnParentIfMapValuesMaterialized() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic1 = "topic1";
+
+ final KTableImpl<String, String, String> table1 =
+ (KTableImpl<String, String, String>) builder.table(topic1, consumed);
+ final KTableImpl<String, String, Integer> table2 =
+ (KTableImpl<String, String, Integer>) table1.mapValues(
+ s -> Integer.valueOf(s),
+ Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("bob").withValueSerde(Serdes.Integer())
+ );
+
table2.enableSendingOldValues(true);
+ assertThat(table1.sendingOldValueEnabled(), is(false));
+ assertThat(table2.sendingOldValueEnabled(), is(true));
+
+ testSendingOldValues(builder, topic1, table2);
+ }
+
+ private void testSendingOldValues(
+ final StreamsBuilder builder,
+ final String topic1,
+ final KTableImpl<String, String, Integer> table2
+ ) {
final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
builder.build().addProcessor("proc", supplier, table2.name);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic1 =
- driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+ driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockApiProcessor<String, Integer, Void, Void> proc = supplier.theCapturedProcessor();
- assertTrue(table1.sendingOldValueEnabled());
- assertTrue(table2.sendingOldValueEnabled());
-
inputTopic1.pipeInput("A", "01", 5L);
inputTopic1.pipeInput("B", "01", 10L);
inputTopic1.pipeInput("C", "01", 15L);
- proc.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(1, null), 5),
- new KeyValueTimestamp<>("B", new Change<>(1, null), 10),
- new KeyValueTimestamp<>("C", new Change<>(1, null), 15));
+ proc.checkAndClearProcessResult(
+ new KeyValueTimestamp<>("A", new Change<>(1, null), 5),
+ new KeyValueTimestamp<>("B", new Change<>(1, null), 10),
+ new KeyValueTimestamp<>("C", new Change<>(1, null), 15)
+ );
inputTopic1.pipeInput("A", "02", 10L);
inputTopic1.pipeInput("B", "02", 8L);
- proc.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(2, 1), 10),
- new KeyValueTimestamp<>("B", new Change<>(2, 1), 8));
+ proc.checkAndClearProcessResult(
+ new KeyValueTimestamp<>("A", new Change<>(2, 1), 10),
+ new KeyValueTimestamp<>("B", new Change<>(2, 1), 8)
+ );
inputTopic1.pipeInput("A", "03", 20L);
- proc.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(3, 2), 20));
+ proc.checkAndClearProcessResult(
+ new KeyValueTimestamp<>("A", new Change<>(3, 2), 20)
+ );
inputTopic1.pipeInput("A", (String) null, 30L);
- proc.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(null, 3), 30));
+ proc.checkAndClearProcessResult(
+ new KeyValueTimestamp<>("A", new Change<>(null, 3), 30)
+ );
}
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 4aac321..f448e31 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -21,8 +21,9 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
@@ -40,7 +41,6 @@ import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.SingletonNoOpValueTransformer;
@@ -58,6 +58,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+import static org.easymock.EasyMock.anyBoolean;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
@@ -188,8 +189,11 @@ public class KTableTransformValuesTest {
}
@Test
- public void shouldSetSendOldValuesOnParent() {
- expect(parent.enableSendingOldValues(true)).andReturn(true);
+ public void shouldNotSetSendOldValuesOnParentIfMaterialized() {
+ expect(parent.enableSendingOldValues(anyBoolean()))
+ .andThrow(new AssertionError("Should not call enableSendingOldValues"))
+ .anyTimes();
+
replay(parent);
new KTableTransformValues<>(parent, new SingletonNoOpValueTransformer<>(), QUERYABLE_NAME).enableSendingOldValues(true);
@@ -197,6 +201,16 @@ public class KTableTransformValuesTest {
verify(parent);
}
+ @Test
+ public void shouldSetSendOldValuesOnParentIfNotMaterialized() {
+ expect(parent.enableSendingOldValues(true)).andReturn(true);
+ replay(parent);
+
+ new KTableTransformValues<>(parent, new SingletonNoOpValueTransformer<>(), null).enableSendingOldValues(true);
+
+ verify(parent);
+ }
+
@SuppressWarnings("unchecked")
@Test
public void shouldTransformOnGetIfNotMaterialized() {