You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/10 17:48:54 UTC

[GitHub] [kafka] big-andy-coates opened a new pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

big-andy-coates opened a new pull request #9156:
URL: https://github.com/apache/kafka/pull/9156


   fixes: [KAFKA-10077](https://issues.apache.org/jira/browse/KAFKA-10077).
   
   Enable sending old values on `KTable.filter` call to avoid the filter forwarding tombstones for rows that do not exist in the output.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r480998514



##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -123,7 +123,7 @@ public void shouldAllowJoinMaterializedFilteredKTable() {
 
         assertThat(
             topology.stateStores().size(),
-            equalTo(1));
+            equalTo(2));

Review comment:
       Sounds good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r480997443



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -203,6 +203,10 @@ public String queryableStoreName() {
             processorSupplier,
             tableNode,
             builder);
+
+        kTable.enableSendingOldValues();

Review comment:
       If we call `this.enableSendingOldValues()` then `enableSendingOldValues` is not called on the `KTableFilter` instance. Whereas calling `kTable.enableSendingOldValues()` does call `KTableFilter.enableSendingOldValues()`.
   
   The call to `KTableFilter.enableSendingOldValues()` is needed to ensure the filter expects the old values.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r477622476



##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -123,7 +123,7 @@ public void shouldAllowJoinMaterializedFilteredKTable() {
 
         assertThat(
             topology.stateStores().size(),
-            equalTo(1));
+            equalTo(2));

Review comment:
       @guozhangwang Atm, when `enableSendingOldValues` is set and the upstream store is not materialized already we enforce a materialization. Thus, enabling `enableSendingOldValues` in `KTable#filter()` would be a breaking change as we would start to materialize state that did not exist before if one upgrades a topology.
   
   Instead, we want to say, _iff_ the upstream store exist, please send me the old value, but if the upstream store does not exist, it's ok to just send `old=null` but don't force a materialization.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-696823406


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r489588705



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
##########
@@ -304,6 +363,30 @@ public void testStateStore() {
         }
     }
 
+    @Test
+    public void testSendingOldValuesNotSetIfNotMaterialized() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTableImpl<String, String, String> table =
+            (KTableImpl<String, String, String>) builder.table("topic1", consumed);
+
+        table.enableSendingOldValues(true);
+
+        assertThat(table.sendingOldValueEnabled(), is(false));
+    }
+
+    @Test
+    public void testSendingOldValuesSetIfMaterializedForced() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTableImpl<String, String, String> table =
+            (KTableImpl<String, String, String>) builder.table("topic1", consumed);

Review comment:
       Rename test to make it clear its force materialized due to sending old values, 
   
   `shouldSetSendingOldValuesIfMaterializationForcedInternally`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487166164



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I guess it's subjective. Personally, I would prefer to flip it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r474353504



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -203,6 +203,10 @@ public String queryableStoreName() {
             processorSupplier,
             tableNode,
             builder);
+
+        kTable.enableSendingOldValues();

Review comment:
       Don't we need to call `this.enableSendingOldValues()` as "this" is the upstream KTable the filter is applied to? Why do we need to enable sending old values on the result of the filter() ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r480997443



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -203,6 +203,10 @@ public String queryableStoreName() {
             processorSupplier,
             tableNode,
             builder);
+
+        kTable.enableSendingOldValues();

Review comment:
       Switching to enabling via the filter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487049443



##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Done. (Must sort out Kafka code style settings at some point)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Nope.  The `if` above is handling the `boolean`, there is no need for the `source` to be aware of this.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, as outlined above, is to stick with the pattern that's here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a consensus we should flip, then happy enough to do it.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above. :p

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See https://github.com/apache/kafka/pull/9156#discussion_r487057805

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).  So if we don't call `enableSendingOldValues` on it, it won't swallow the output when things haven't changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used to both signify that the upstream is sending old values, and to control if the filter should forward old values. I'll split these into two variables.

##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Done. (Must sort out Kafka code style settings at some point)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Nope.  The `if` above is handling the `boolean`, there is no need for the `source` to be aware of this.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, as outlined above, is to stick with the pattern that's here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a consensus we should flip, then happy enough to do it.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above. :p

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See https://github.com/apache/kafka/pull/9156#discussion_r487057805

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).  So if we don't call `enableSendingOldValues` on it, it won't swallow the output when things haven't changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used to both signify that the upstream is sending old values, and to control if the filter should forward old values. I'll split these into two variables.

##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Done. (Must sort out Kafka code style settings at some point)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Nope.  The `if` above is handling the `boolean`, there is no need for the `source` to be aware of this.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, as outlined above, is to stick with the pattern that's here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a consensus we should flip, then happy enough to do it.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above. :p

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See https://github.com/apache/kafka/pull/9156#discussion_r487057805

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).  So if we don't call `enableSendingOldValues` on it, it won't swallow the output when things haven't changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used to both signify that the upstream is sending old values, and to control if the filter should forward old values. I'll split these into two variables.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487059154



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #9156:
URL: https://github.com/apache/kafka/pull/9156


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-696383217






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487059154



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487049443



##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Done. (Must sort out Kafka code style settings at some point)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Nope.  The `if` above is handling the `boolean`, there is no need for the `source` to be aware of this.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, as outlined above, is to stick with the pattern that's here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a consensus we should flip, then happy enough to do it.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above. :p

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See https://github.com/apache/kafka/pull/9156#discussion_r487057805

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).  So if we don't call `enableSendingOldValues` on it, it won't swallow the output when things haven't changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used to both signify that the upstream is sending old values, and to control if the filter should forward old values. I'll split these into two variables.

##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Done. (Must sort out Kafka code style settings at some point)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Nope.  The `if` above is handling the `boolean`, there is no need for the `source` to be aware of this.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, as outlined above, is to stick with the pattern that's here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a consensus we should flip, then happy enough to do it.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above. :p

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See https://github.com/apache/kafka/pull/9156#discussion_r487057805

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).  So if we don't call `enableSendingOldValues` on it, it won't swallow the output when things haven't changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used to both signify that the upstream is sending old values, and to control if the filter should forward old values. I'll split these into two variables.

##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Done. (Must sort out Kafka code style settings at some point)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Nope.  The `if` above is handling the `boolean`, there is no need for the `source` to be aware of this.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, as outlined above, is to stick with the pattern that's here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a consensus we should flip, then happy enough to do it.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above. :p

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See https://github.com/apache/kafka/pull/9156#discussion_r487057805

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).  So if we don't call `enableSendingOldValues` on it, it won't swallow the output when things haven't changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used to both signify that the upstream is sending old values, and to control if the filter should forward old values. I'll split these into two variables.

##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Done. (Must sort out Kafka code style settings at some point)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Nope.  The `if` above is handling the `boolean`, there is no need for the `source` to be aware of this.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, as outlined above, is to stick with the pattern that's here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a consensus we should flip, then happy enough to do it.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above. :p

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See https://github.com/apache/kafka/pull/9156#discussion_r487057805

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).  So if we don't call `enableSendingOldValues` on it, it won't swallow the output when things haven't changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used to both signify that the upstream is sending old values, and to control if the filter should forward old values. I'll split these into two variables.

##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Done. (Must sort out Kafka code style settings at some point)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Nope.  The `if` above is handling the `boolean`, there is no need for the `source` to be aware of this.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, as outlined above, is to stick with the pattern that's here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a consensus we should flip, then happy enough to do it.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above. :p

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See https://github.com/apache/kafka/pull/9156#discussion_r487057805

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).  So if we don't call `enableSendingOldValues` on it, it won't swallow the output when things haven't changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used to both signify that the upstream is sending old values, and to control if the filter should forward old values. I'll split these into two variables.

##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Done. (Must sort out Kafka code style settings at some point)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Nope.  The `if` above is handling the `boolean`, there is no need for the `source` to be aware of this.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, as outlined above, is to stick with the pattern that's here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a consensus we should flip, then happy enough to do it.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to stick with the pattern that's here.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above. :p

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See https://github.com/apache/kafka/pull/9156#discussion_r487057805

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).  So if we don't call `enableSendingOldValues` on it, it won't swallow the output when things haven't changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used to both signify that the upstream is sending old values, and to control if the filter should forward old values. I'll split these into two variables.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r489543472



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
##########
@@ -304,6 +363,30 @@ public void testStateStore() {
         }
     }
 
+    @Test
+    public void testSendingOldValuesNotSetIfNotMaterialized() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTableImpl<String, String, String> table =
+            (KTableImpl<String, String, String>) builder.table("topic1", consumed);
+
+        table.enableSendingOldValues(true);
+
+        assertThat(table.sendingOldValueEnabled(), is(false));
+    }
+
+    @Test
+    public void testSendingOldValuesSetIfMaterializedForced() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTableImpl<String, String, String> table =
+            (KTableImpl<String, String, String>) builder.table("topic1", consumed);

Review comment:
       Why? This test is about forced materialization. If we pass that parameter, then it would work even if not forced, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-671501927


   @mjsax as discussed. Please review.
   
   I think this change makes the output from the table filter semantically correct, i.e. we no longer output tombstones for rows that didn't exist in the output to begin with. However, this comes at a cost!  Now the source table is being materialized, (as you can see from the changes needed to get some other tests to pass).
   
   The cost of better semantics could be very high, and the user has no way of avoiding this. Where as previously the user could choose to 'fix' the bad semantics by manually calling `enableSendingOldValues` themselves, if they cared. I'm left feeling a little uneasy about _forcing_ users to pay the cost of materialization, even if they either don't care about the spurious tombstones, or their use case doesn't generate them.
   
   This leads me to the following questions:
   
   1. Wouldn't this be a breaking change for existing users of the library? If we stick with this solution, how would we handle this?
   1. Might it be better to only enable the sending of old values _if_ the source table is already materialized? This would mean the fix only pays the cost of an additional rocksdb read, which is still not zero, but much lower that forced materialization, and it would also mean this isn't a breaking change.
   
   Or maybe we choose to _not_ fix this. Preferring the current semantically incorrect, but better performing, solution with a known workaround for users that require correct semantics?  i.e. we could document the use of `enableSendingOldValues` in the `filter` method's java docs.
   
   Your thoughts my good man?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r489542022



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
##########
@@ -304,6 +363,30 @@ public void testStateStore() {
         }
     }
 
+    @Test
+    public void testSendingOldValuesNotSetIfNotMaterialized() {

Review comment:
       I prefer `should` too, but a lot of the code users `test`.  Happy to switch. Can I use `should` everywhere?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487060524



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment above](https://github.com/apache/kafka/pull/9156#discussion_r487057805)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-694203001


   @mjsax - ready for another review.
   
   - boolean `onlyIfMaterialized` is now flipped to `forceMaterialization`.
   - `KTableFilter` constructor initialized internal `sendOldValues` flag based on parent tables state. (Not perfect, but works).
   - Removed test that was testing bad behaviour.
   
   The only outstanding piece, as I see it, is the bad behaviour if you call `enableSendingOldValues` without forcing materialization on a table that is itself materialized, but who's upstream is not.  In such a situation, the table will _not_ enable sending old values. 
   
   A similar bug existed before this PR:  if you called `enableSendingOldValues` on a table that is itself materialized, but who's upstream is not, then it will force materialization on its upstream table, which is unnecessary.
   
   With the new code, both of the bad behaviours are true, i.e. the change introduces the first one and the second one is still happening.  
   
   I've raised https://issues.apache.org/jira/browse/KAFKA-10494 to track this.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-685493474


   @guozhangwang @mjsax ready for another review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r480997443



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -203,6 +203,10 @@ public String queryableStoreName() {
             processorSupplier,
             tableNode,
             builder);
+
+        kTable.enableSendingOldValues();

Review comment:
       If we call `this.enableSendingOldValues()` then `enableSendingOldValues` is not called on the `KTableFilter` instance. Whereas calling `kTable.enableSendingOldValues()` does call `KTableFilter.enableSendingOldValues()`.
   
   The call to `KTableFilter.enableSendingOldValues()` is needed to ensure the filter expects the old values.
   
   Is it more correct to call `enableSOV` on both the filter and `this` explicitly, so that the downstream `kTable` doesn't have it set?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487057186



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a consensus we should flip, then happy enough to do it.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-677982605


   > Where as previously the user could choose to 'fix' the bad semantics by manually calling enableSendingOldValues themselves, if they cared.
   
   That is not entirely correct. `enableSendingOldValues` is _not_ part of the public `KTable` interface but only a method of the internal `KTableImpl` class. Thus, users don't not really have the ability to enable it.
   
   > Might it be better to only enable the sending of old values if the source table is already materialized?
   
   This seems to be the "right" fix from my point of view. Note, that people can force an upstream materialization via `Materialized.as()`, and thus the impact is quite clear and it's public API that allows user to opt-in for this optimization. (When we discussed in person, I alway assumed we would implement it this way). -- Doing it this way is also a non-breaking change and thus no backward compatibility concern raise.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487167789



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGTM.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r489539904



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       Feels a little off to me to ignore what the caller is asking.  If they pass `true` for `onlyIfMaterialized`, then we shouldn't silently ignore them IMHO.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487172402



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-696565610


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487166164



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I guess it's subjective. Personally, I would prefer to flip it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGTM.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My comment was about your pattern... (if we flip the logic, we would need to pass in `true` to force a materialization). My point is, shouldn't we pass in a constant? For KTableKTableAbstractJoin we always want that the upstream is sending us old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I guess it's subjective. Personally, I would prefer to flip it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGTM.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My comment was about your pattern... (if we flip the logic, we would need to pass in `true` to force a materialization). My point is, shouldn't we pass in a constant? For KTableKTableAbstractJoin we always want that the upstream is sending us old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487060524



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See https://github.com/apache/kafka/pull/9156#discussion_r487057805




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-696383217


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r489544014



##########
File path: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
##########
@@ -39,29 +39,27 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
     val sinkTopic = "sink"
 
     val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count()
-    table.filter((_, value) => value > 1).toStream.to(sinkTopic)
+    table.filter((key, value) => key.equals("a") && value == 1).toStream.to(sinkTopic)

Review comment:
       So that we can test an existing row being removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r490152263



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       As discussed offline - change the behaviour of `enableSendingOldValues` such that a node that is already materialized will not ask upstream nodes to also send old values, is deemed out of scope of this PR.
   
   @mjsax has requested a ticket to track this work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487166164



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I guess it's subjective. Personally, I would prefer to flip it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGTM.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My comment was about your pattern... (if we flip the logic, we would need to pass in `true` to force a materialization). My point is, shouldn't we pass in a constant? For KTableKTableAbstractJoin we always want that the upstream is sending us old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I guess it's subjective. Personally, I would prefer to flip it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGTM.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My comment was about your pattern... (if we flip the logic, we would need to pass in `true` to force a materialization). My point is, shouldn't we pass in a constant? For KTableKTableAbstractJoin we always want that the upstream is sending us old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I guess it's subjective. Personally, I would prefer to flip it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGTM.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My comment was about your pattern... (if we flip the logic, we would need to pass in `true` to force a materialization). My point is, shouldn't we pass in a constant? For KTableKTableAbstractJoin we always want that the upstream is sending us old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I guess it's subjective. Personally, I would prefer to flip it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGTM.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My comment was about your pattern... (if we flip the logic, we would need to pass in `true` to force a materialization). My point is, shouldn't we pass in a constant? For KTableKTableAbstractJoin we always want that the upstream is sending us old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I guess it's subjective. Personally, I would prefer to flip it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGTM.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My comment was about your pattern... (if we flip the logic, we would need to pass in `true` to force a materialization). My point is, shouldn't we pass in a constant? For KTableKTableAbstractJoin we always want that the upstream is sending us old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I guess it's subjective. Personally, I would prefer to flip it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGTM.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My comment was about your pattern... (if we flip the logic, we would need to pass in `true` to force a materialization). My point is, shouldn't we pass in a constant? For KTableKTableAbstractJoin we always want that the upstream is sending us old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I guess it's subjective. Personally, I would prefer to flip it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       SGTM.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My comment was about your pattern... (if we flip the logic, we would need to pass in `true` to force a materialization). My point is, shouldn't we pass in a constant? For KTableKTableAbstractJoin we always want that the upstream is sending us old values.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r474353890



##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -123,7 +123,7 @@ public void shouldAllowJoinMaterializedFilteredKTable() {
 
         assertThat(
             topology.stateStores().size(),
-            equalTo(1));
+            equalTo(2));

Review comment:
       This is surprising to me -- need to dig into the code I guess, but I though enabling to send old values would _not_ trigger any upstream materialization but only take effect _if_ the store is materializes already (but would be ignored otherwise). Seems my assumption is wrong though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-699151989


   Thanks for the patch @big-andy-coates!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r474361586



##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -123,7 +123,7 @@ public void shouldAllowJoinMaterializedFilteredKTable() {
 
         assertThat(
             topology.stateStores().size(),
-            equalTo(1));
+            equalTo(2));

Review comment:
       The issue seems to be this line: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L67
   
   In a source-KTable, if we enable sending old values we force an materialization. I guess, we should have this materialization optional?
   
   I would recommend to rename `enableSendingOldValues()` to `maybeEnableSendingOldValues(final boolean forceMaterialization)` or similar?
   
   \cc @guozhangwang WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r490154341



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       Anyway, done...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r490147517



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       OK, will flip.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487057186



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone.  (Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487060078



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487057805



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).  So if we don't call `enableSendingOldValues` on it, it won't swallow the output when things haven't changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used to both signify that the upstream is sending old values, and to control if the filter should forward old values. I'll split these into two variables.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487169320



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My comment was about your pattern... (if we flip the logic, we would need to pass in `true` to force a materialization). My point is, shouldn't we pass in a constant? For KTableKTableAbstractJoin we always want that the upstream is sending us old values.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487058548



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Nope.  The `if` above is handling the `boolean`, there is no need for the `source` to be aware of this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487049443



##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Done. (Must sort out Kafka code style settings at some point)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487060524



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487060171



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above. :p




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r475792671



##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -123,7 +123,7 @@ public void shouldAllowJoinMaterializedFilteredKTable() {
 
         assertThat(
             topology.stateStores().size(),
-            equalTo(1));
+            equalTo(2));

Review comment:
       `enableSendingOldValues` is indicated for sending a pair of <new, old> values, and if it is not set the `old` value would always be null. If we want to `enableSendingOldValues` then we'd have to materialize the source node still?
   
   Maybe I'm not fully understanding the context here. Could you bring me up to date?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r485846736



##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Unrelated change: can we revert this?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       Can we flip the logic of the boolean flag ? -- It makes a knob in my mind to say, `onlyIfMaterailized=false` implies that we need to (ie, enforce to) materialize...  I would prefer the suggest from Guozhang to use a flag like `enforceMaterialization` (or maybe better `materializeIfNeeded`).
   
   Also, the method may or may not enable sending old values and thus, we might want to rename it to `maybeEnableSendOldValues` (or `enableSendOldValuesIfPossible` or similar).

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       Instead of the check, should we pass in `false` to enforce a materialization if necessary?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       Why does the `filter` need to send old values? I though the `filter` needs to _receive_ old values, and thus we should call `this. enableSendingOldValues(true)` to enable sending old values on the filter's input?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       Why do we need to call this (ie, why do we want the result of the filter to send old values)?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
##########
@@ -304,6 +363,30 @@ public void testStateStore() {
         }
     }
 
+    @Test
+    public void testSendingOldValuesNotSetIfNotMaterialized() {

Review comment:
       `shouldNotSetSendingOldValuesIfNotMaterialized`

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
##########
@@ -304,6 +363,30 @@ public void testStateStore() {
         }
     }
 
+    @Test
+    public void testSendingOldValuesNotSetIfNotMaterialized() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTableImpl<String, String, String> table =
+            (KTableImpl<String, String, String>) builder.table("topic1", consumed);
+
+        table.enableSendingOldValues(true);
+
+        assertThat(table.sendingOldValueEnabled(), is(false));
+    }
+
+    @Test
+    public void testSendingOldValuesSetIfMaterializedForced() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTableImpl<String, String, String> table =
+            (KTableImpl<String, String, String>) builder.table("topic1", consumed);

Review comment:
       Seems the `Matererialized.as("fred")` parameter is missing?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Seems the boolean input parameter is missing?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);
+
+        doTestNotSendingOldValue(builder, table1, table2, topic1);
+    }
+
+    @Test
+    public void shouldNotSendOldValuesOnMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 =
+            (KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("store2"));
+
+        table2.enableSendingOldValues(true);

Review comment:
       As we enable this on the result `KTable`, and we enforce a materialization of the result, I would expect that we actually do get old-values sent?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);
+
+        doTestNotSendingOldValue(builder, table1, table2, topic1);
+    }
+
+    @Test
+    public void shouldNotSendOldValuesOnMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
+        final KTableImpl<String, Integer, Integer> table2 =
+            (KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("store2"));
+
+        table2.enableSendingOldValues(true);
+
+        doTestNotSendingOldValue(builder, table1, table2, topic1);
+    }
+
     private void doTestSendingOldValue(final StreamsBuilder builder,
                                        final KTableImpl<String, Integer, Integer> table1,
                                        final KTableImpl<String, Integer, Integer> table2,
                                        final String topic1) {
-        table2.enableSendingOldValues();

Review comment:
       Should we actually keep this call within this method and pass in the boolean flag as parameter to `doTestSendingOldValue` ?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
##########
@@ -304,6 +363,30 @@ public void testStateStore() {
         }
     }
 
+    @Test
+    public void testSendingOldValuesNotSetIfNotMaterialized() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTableImpl<String, String, String> table =
+            (KTableImpl<String, String, String>) builder.table("topic1", consumed);
+
+        table.enableSendingOldValues(true);
+
+        assertThat(table.sendingOldValueEnabled(), is(false));
+    }
+
+    @Test
+    public void testSendingOldValuesSetIfMaterializedForced() {

Review comment:
       `shouldSetSendingOldValuesIfMaterializationForced`

##########
File path: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
##########
@@ -39,29 +39,27 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
     val sinkTopic = "sink"
 
     val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count()
-    table.filter((_, value) => value > 1).toStream.to(sinkTopic)
+    table.filter((key, value) => key.equals("a") && value == 1).toStream.to(sinkTopic)

Review comment:
       Why do we need to change the filter condition?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-696383217


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487059154



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, as outlined above, is to stick with the pattern that's here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487060078



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above :)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r490152263



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       As discussed offline - change the behaviour of `enableSendingOldValues` such that a node that is already materialized will not ask upstream nodes to also send old values, is deemed out of scope of this PR.
   
   @mjsax has requested a ticket to track this work: https://issues.apache.org/jira/browse/KAFKA-10494




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r489538195



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       > If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values.
   
   That's not how the code was working. It always asked the parent to send old values:
   
   ```java
       @Override
       public void enableSendingOldValues() {
           parent.enableSendingOldValues();
           sendOldValues = true;
       }
   ```
   
   Do we want to change this as part of this PR to only call `parent.enableSendingOldValues` is the filter is _not_ materialized?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487057805



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r478605835



##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -123,7 +123,7 @@ public void shouldAllowJoinMaterializedFilteredKTable() {
 
         assertThat(
             topology.stateStores().size(),
-            equalTo(1));
+            equalTo(2));

Review comment:
       Ah I see.. what about changing the function as `enableSendingOldValues(boolean enforced)` in that case? And then only ktable aggregations which would definitely need this would set it to `true`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-696823406


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org