You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/10/13 18:19:58 UTC

[kafka] branch trunk updated: KAFKA-10494: Eager handling of sending old values (#9415)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 40ad4fe  KAFKA-10494: Eager handling of sending old values (#9415)
40ad4fe is described below

commit 40ad4fe0ae9c687ed3c4d35fb5f5830cb1a867b8
Author: Andy Coates <80...@users.noreply.github.com>
AuthorDate: Tue Oct 13 19:19:05 2020 +0100

    KAFKA-10494: Eager handling of sending old values (#9415)
    
    Nodes that are materialized should not forward requests to `enableSendingOldValues` to parent nodes, as they themselves can handle fulfilling this request. However, some instances of `KTableProcessorSupplier` were still forwarding requests to parent nodes, which was causing unnecessary materialization of table sources.
    
    The following instances of `KTableProcessorSupplier` have been updated to not forward `enableSendingOldValues` to parent nodes if they themselves are materialized and can handle sending old values downstream:
    
     * `KTableFilter`
     * `KTableMapValues`
     * `KTableTransformValues`
    
    Other instances of `KTableProcessorSupplier` have not be modified for reasons given below:
     * `KTableSuppressProcessorSupplier`: though it has a `storeName` field, it didn't seem right for this to handle sending old values itself. Its only job is to suppress output.
     * `KTableKTableAbstractJoin`: doesn't have a store name, i.e. it is never materialized, so can't handle the call itself.
     * `KTableKTableJoinMerger`: table-table joins already have materialized sources, which are sending old values. It would be an unnecessary performance hit to have this class do a lookup to retrieve the old value from its store.
     * `KTableReduce`: is always materialized and already handling the call without forwarding
     * `KTableAggregate`: is always materialized and already handling the call without forwarding
    
    Reviewer: Matthias J. Sax <ma...@confluent.io>
---
 .../streams/kstream/internals/KTableFilter.java    | 19 ++++++-
 .../streams/kstream/internals/KTableMapValues.java | 20 ++++++-
 .../kstream/internals/KTableTransformValues.java   |  5 ++
 .../kstream/internals/KTableFilterTest.java        | 53 ++++++++++++++----
 .../internals/KTableKTableLeftJoinTest.java        |  5 ++
 .../kstream/internals/KTableMapValuesTest.java     | 65 +++++++++++++++++-----
 .../internals/KTableTransformValuesTest.java       | 22 ++++++--
 7 files changed, 158 insertions(+), 31 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 0d41043..1a9d773 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -23,6 +23,8 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
 class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
     private final KTableImpl<K, ?, V> parent;
     private final Predicate<? super K, ? super V> predicate;
@@ -49,6 +51,11 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
 
     @Override
     public boolean enableSendingOldValues(final boolean forceMaterialization) {
+        if (queryableName != null) {
+            sendOldValues = true;
+            return true;
+        }
+
         if (parent.enableSendingOldValues(forceMaterialization)) {
             sendOldValues = true;
         }
@@ -100,7 +107,7 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
         @Override
         public void process(final K key, final Change<V> change) {
             final V newValue = computeValue(key, change.newValue);
-            final V oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;
+            final V oldValue = computeOldValue(key, change);
 
             if (sendOldValues && oldValue == null && newValue == null) {
                 return; // unnecessary to forward here.
@@ -113,6 +120,16 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
                 context().forward(key, new Change<>(newValue, oldValue));
             }
         }
+
+        private V computeOldValue(final K key, final Change<V> change) {
+            if (!sendOldValues) {
+                return null;
+            }
+
+            return queryableName != null
+                ? getValueOrNull(store.get(key))
+                : computeValue(key, change.oldValue);
+        }
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 650cef1..a325920 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -23,6 +23,8 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
 
 class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
     private final KTableImpl<K, ?, V> parent;
@@ -67,9 +69,15 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
 
     @Override
     public boolean enableSendingOldValues(final boolean forceMaterialization) {
+        if (queryableName != null) {
+            sendOldValues = true;
+            return true;
+        }
+
         if (parent.enableSendingOldValues(forceMaterialization)) {
             sendOldValues = true;
         }
+
         return sendOldValues;
     }
 
@@ -117,7 +125,7 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
         @Override
         public void process(final K key, final Change<V> change) {
             final V1 newValue = computeValue(key, change.newValue);
-            final V1 oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;
+            final V1 oldValue = computeOldValue(key, change);
 
             if (queryableName != null) {
                 store.put(key, ValueAndTimestamp.make(newValue, context().timestamp()));
@@ -126,6 +134,16 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
                 context().forward(key, new Change<>(newValue, oldValue));
             }
         }
+
+        private V1 computeOldValue(final K key, final Change<V> change) {
+            if (!sendOldValues) {
+                return null;
+            }
+
+            return queryableName != null
+                ? getValueOrNull(store.get(key))
+                : computeValue(key, change.oldValue);
+        }
     }
 
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index cf0517f..e4c76d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -73,6 +73,11 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
 
     @Override
     public boolean enableSendingOldValues(final boolean forceMaterialization) {
+        if (queryableName != null) {
+            sendOldValues = true;
+            return true;
+        }
+
         if (parent.enableSendingOldValues(forceMaterialization)) {
             sendOldValues = true;
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 57b8bfb..7f772e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -48,6 +48,8 @@ import java.time.Instant;
 import java.util.List;
 import java.util.Properties;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -321,6 +323,8 @@ public class KTableFilterTest {
         topology.addProcessor("proc1", supplier, table1.name);
         topology.addProcessor("proc2", supplier, table2.name);
 
+        final boolean parentSendOldVals = table1.sendingOldValueEnabled();
+
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
             final TestInputTopic<String, Integer> inputTopic =
                     driver.createInputTopic(topic1, new StringSerializer(), new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
@@ -330,31 +334,47 @@ public class KTableFilterTest {
             inputTopic.pipeInput("C", 1, 15L);
 
             final List<MockApiProcessor<String, Integer, Void, Void>> processors = supplier.capturedProcessors(2);
+            final MockApiProcessor<String, Integer, Void, Void> table1Output = processors.get(0);
+            final MockApiProcessor<String, Integer, Void, Void> table2Output = processors.get(1);
 
-            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(1, null), 5),
+            table1Output.checkAndClearProcessResult(
+                new KeyValueTimestamp<>("A", new Change<>(1, null), 5),
                 new KeyValueTimestamp<>("B", new Change<>(1, null), 10),
-                new KeyValueTimestamp<>("C", new Change<>(1, null), 15));
-            processors.get(1).checkEmptyAndClearProcessResult();
+                new KeyValueTimestamp<>("C", new Change<>(1, null), 15)
+            );
+            table2Output.checkEmptyAndClearProcessResult();
 
             inputTopic.pipeInput("A", 2, 15L);
             inputTopic.pipeInput("B", 2, 8L);
 
-            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(2, 1), 15),
-                new KeyValueTimestamp<>("B", new Change<>(2, 1), 8));
-            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(2, null), 15),
-                new KeyValueTimestamp<>("B", new Change<>(2, null), 8));
+            table1Output.checkAndClearProcessResult(
+                new KeyValueTimestamp<>("A", new Change<>(2, parentSendOldVals ? 1 : null), 15),
+                new KeyValueTimestamp<>("B", new Change<>(2, parentSendOldVals ? 1 : null), 8)
+            );
+            table2Output.checkAndClearProcessResult(
+                new KeyValueTimestamp<>("A", new Change<>(2, null), 15),
+                new KeyValueTimestamp<>("B", new Change<>(2, null), 8)
+            );
 
             inputTopic.pipeInput("A", 3, 20L);
 
-            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(3, 2), 20));
-            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(null, 2), 20));
+            table1Output.checkAndClearProcessResult(
+                new KeyValueTimestamp<>("A", new Change<>(3, parentSendOldVals ? 2 : null), 20)
+            );
+            table2Output.checkAndClearProcessResult(
+                new KeyValueTimestamp<>("A", new Change<>(null, 2), 20)
+            );
 
             inputTopic.pipeInput("A", null, 10L);
             inputTopic.pipeInput("B", null, 20L);
 
-            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(null, 3), 10),
-                new KeyValueTimestamp<>("B", new Change<>(null, 2), 20));
-            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<>("B", new Change<>(null, 2), 20));
+            table1Output.checkAndClearProcessResult(
+                new KeyValueTimestamp<>("A", new Change<>(null, parentSendOldVals ? 3 : null), 10),
+                new KeyValueTimestamp<>("B", new Change<>(null, parentSendOldVals ? 2 : null), 20)
+            );
+            table2Output.checkAndClearProcessResult(
+                new KeyValueTimestamp<>("B", new Change<>(null, 2), 20)
+            );
         }
     }
 
@@ -370,6 +390,9 @@ public class KTableFilterTest {
 
         table2.enableSendingOldValues(true);
 
+        assertThat(table1.sendingOldValueEnabled(), is(true));
+        assertThat(table2.sendingOldValueEnabled(), is(true));
+
         doTestSendingOldValue(builder, table1, table2, topic1);
     }
 
@@ -385,6 +408,9 @@ public class KTableFilterTest {
 
         table2.enableSendingOldValues(true);
 
+        assertThat(table1.sendingOldValueEnabled(), is(false));
+        assertThat(table2.sendingOldValueEnabled(), is(true));
+
         doTestSendingOldValue(builder, table1, table2, topic1);
     }
 
@@ -400,6 +426,9 @@ public class KTableFilterTest {
 
         table2.enableSendingOldValues(false);
 
+        assertThat(table1.sendingOldValueEnabled(), is(true));
+        assertThat(table2.sendingOldValueEnabled(), is(true));
+
         doTestSendingOldValue(builder, table1, table2, topic1);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index dc33243..c1bc7fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -58,6 +58,7 @@ import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -323,6 +324,10 @@ public class KTableKTableLeftJoinTest {
 
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(true);
 
+        assertThat(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled(), is(true));
+        assertThat(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled(), is(true));
+        assertThat(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled(), is(true));
+
         final Topology topology = builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name);
 
         try (final TopologyTestDriver driver = new TopologyTestDriverWrapper(topology, props)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index d48b38c..384d579 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -43,10 +43,11 @@ import java.time.Instant;
 import java.util.Properties;
 
 import static java.util.Arrays.asList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
 @SuppressWarnings("unchecked")
 public class KTableMapValuesTest {
@@ -241,7 +242,7 @@ public class KTableMapValuesTest {
     }
 
     @Test
-    public void testSendingOldValue() {
+    public void shouldEnableSendingOldValuesOnParentIfMapValuesNotMaterialized() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic1 = "topic1";
 
@@ -249,36 +250,74 @@ public class KTableMapValuesTest {
             (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         final KTableImpl<String, String, Integer> table2 =
             (KTableImpl<String, String, Integer>) table1.mapValues(s -> Integer.valueOf(s));
+
+        table2.enableSendingOldValues(true);
+
+        assertThat(table1.sendingOldValueEnabled(), is(true));
+        assertThat(table2.sendingOldValueEnabled(), is(true));
+
+        testSendingOldValues(builder, topic1, table2);
+    }
+
+    @Test
+    public void shouldNotEnableSendingOldValuesOnParentIfMapValuesMaterialized() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, String, String> table1 =
+            (KTableImpl<String, String, String>) builder.table(topic1, consumed);
+        final KTableImpl<String, String, Integer> table2 =
+            (KTableImpl<String, String, Integer>) table1.mapValues(
+                s -> Integer.valueOf(s),
+                Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("bob").withValueSerde(Serdes.Integer())
+            );
+
         table2.enableSendingOldValues(true);
 
+        assertThat(table1.sendingOldValueEnabled(), is(false));
+        assertThat(table2.sendingOldValueEnabled(), is(true));
+
+        testSendingOldValues(builder, topic1, table2);
+    }
+
+    private void testSendingOldValues(
+        final StreamsBuilder builder,
+        final String topic1,
+        final KTableImpl<String, String, Integer> table2
+    ) {
         final MockApiProcessorSupplier<String, Integer, Void, Void> supplier = new MockApiProcessorSupplier<>();
         builder.build().addProcessor("proc", supplier, table2.name);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             final TestInputTopic<String, String> inputTopic1 =
-                    driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+                driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final MockApiProcessor<String, Integer, Void, Void> proc = supplier.theCapturedProcessor();
 
-            assertTrue(table1.sendingOldValueEnabled());
-            assertTrue(table2.sendingOldValueEnabled());
-
             inputTopic1.pipeInput("A", "01", 5L);
             inputTopic1.pipeInput("B", "01", 10L);
             inputTopic1.pipeInput("C", "01", 15L);
-            proc.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(1, null), 5),
-                    new KeyValueTimestamp<>("B", new Change<>(1, null), 10),
-                    new KeyValueTimestamp<>("C", new Change<>(1, null), 15));
+            proc.checkAndClearProcessResult(
+                new KeyValueTimestamp<>("A", new Change<>(1, null), 5),
+                new KeyValueTimestamp<>("B", new Change<>(1, null), 10),
+                new KeyValueTimestamp<>("C", new Change<>(1, null), 15)
+            );
 
             inputTopic1.pipeInput("A", "02", 10L);
             inputTopic1.pipeInput("B", "02", 8L);
-            proc.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(2, 1), 10),
-                    new KeyValueTimestamp<>("B", new Change<>(2, 1), 8));
+            proc.checkAndClearProcessResult(
+                new KeyValueTimestamp<>("A", new Change<>(2, 1), 10),
+                new KeyValueTimestamp<>("B", new Change<>(2, 1), 8)
+            );
 
             inputTopic1.pipeInput("A", "03", 20L);
-            proc.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(3, 2), 20));
+            proc.checkAndClearProcessResult(
+                new KeyValueTimestamp<>("A", new Change<>(3, 2), 20)
+            );
 
             inputTopic1.pipeInput("A", (String) null, 30L);
-            proc.checkAndClearProcessResult(new KeyValueTimestamp<>("A", new Change<>(null, 3), 30));
+            proc.checkAndClearProcessResult(
+                new KeyValueTimestamp<>("A", new Change<>(null, 3), 30)
+            );
         }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 4aac321..f448e31 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -21,8 +21,9 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
@@ -40,7 +41,6 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.SingletonNoOpValueTransformer;
@@ -58,6 +58,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 
+import static org.easymock.EasyMock.anyBoolean;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
@@ -188,8 +189,11 @@ public class KTableTransformValuesTest {
     }
 
     @Test
-    public void shouldSetSendOldValuesOnParent() {
-        expect(parent.enableSendingOldValues(true)).andReturn(true);
+    public void shouldNotSetSendOldValuesOnParentIfMaterialized() {
+        expect(parent.enableSendingOldValues(anyBoolean()))
+            .andThrow(new AssertionError("Should not call enableSendingOldValues"))
+            .anyTimes();
+
         replay(parent);
 
         new KTableTransformValues<>(parent, new SingletonNoOpValueTransformer<>(), QUERYABLE_NAME).enableSendingOldValues(true);
@@ -197,6 +201,16 @@ public class KTableTransformValuesTest {
         verify(parent);
     }
 
+    @Test
+    public void shouldSetSendOldValuesOnParentIfNotMaterialized() {
+        expect(parent.enableSendingOldValues(true)).andReturn(true);
+        replay(parent);
+
+        new KTableTransformValues<>(parent, new SingletonNoOpValueTransformer<>(), null).enableSendingOldValues(true);
+
+        verify(parent);
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void shouldTransformOnGetIfNotMaterialized() {