You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/28 16:28:19 UTC

[GitHub] [kafka] Gerrrr opened a new pull request #11447: KAFKA-13024: Use not-null filter only in optimizable repartitions

Gerrrr opened a new pull request #11447:
URL: https://github.com/apache/kafka/pull/11447


   Right now, the `repartition` operator always filters out `null` keys. This behavior is not correct and is a regression compared to the deprecated `through` operator.
   
   This patch fixes the issue by filtering `null` keys only for optimizable repartition nodes. First, it removes unnecessary processor node from the topology for `UnoptimizableRepartitionNode`. Second, it only adds the filtering processor only if the node is optimizable. I introduced an `isOptimizable` boolean method for repartition nodes to make this property more obvious in the code.
   
   To verify that change, I added a new test to the repartition integration suite that produces pairs with `null` keys and expects them to be present after the repartition.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Gerrrr commented on pull request #11447: KAFKA-13024

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on pull request #11447:
URL: https://github.com/apache/kafka/pull/11447#issuecomment-1031269712


   > My rationale is that, we do some optimizations at the end of the building phase which may move some repartition up in the topology etc, which would effectively change the topology. And hence, we would not know if all of the downstream operators after the repartition would be filtering null keys until the optimization that changed the topology is done.
   
   Ah, I missed that! Let me do another iteration on this patch then to make sure that it does not get in the way of suggested optimization.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Gerrrr edited a comment on pull request #11447: KAFKA-13024: Use not-null filter only in optimizable repartitions

Posted by GitBox <gi...@apache.org>.
Gerrrr edited a comment on pull request #11447:
URL: https://github.com/apache/kafka/pull/11447#issuecomment-956039330


   CI test failure is caused by a a known flaky test - [KAFKA-8541](https://issues.apache.org/jira/browse/KAFKA-8541]), so probably unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Gerrrr commented on pull request #11447: KAFKA-13024: Use not-null filter only in optimizable repartitions

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on pull request #11447:
URL: https://github.com/apache/kafka/pull/11447#issuecomment-1018475901


   @suhas-satish Good idea! I'll take some time next week to rebase and provide proper description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11447: KAFKA-13024

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11447:
URL: https://github.com/apache/kafka/pull/11447#issuecomment-1023902821


   Hello @Gerrrr @mjsax thanks for kicking starting on this issue! I made a first pass on it and have a few quick thoughts.
   
   1) First I want to make sure I understand the scope of the fix. My take is that we are only fixing `KStream`, since for `KTable` it's always safe to filter null keys; and again it's trying to maintain the original optimization but only when all downstream operators after the repartition are going to filter null keys, i.e. as long as one downstream operator is not a per-key stateful operator that filters null keys then we should not apply this optimization. Is my understanding right?
   
   2) If 1) is right, then I'm wondering if we can just use the `KStreamFilter` still, but only at the end of the topology building as we've constructed and learned about all the nodes in the topology, hence in the `optimize` function then. My main rationale is that, we would not be able to decide whether the optimization on the upstream can be used until the topology has been fully constructed hence only then we would learn about all the downstream nodes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11447: KAFKA-13024

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11447:
URL: https://github.com/apache/kafka/pull/11447#issuecomment-1029335322


   > I might be missing the full picture here as I am a bit new to the project, but, as I see it, the algorithm I described above does not suffer from this issue. There, we essentially extract if (record.key() == null || record.value() == null) { code from the operations (e.g. https://github.com/apache/kafka/pull/11447/files#diff-ad3728ff5f32ed1e88e11e46c0de970bcce01c39767fd3ae666e4a0bfbd97be3L87-L104), except we also don't check for that condition more than necessary.
   
   My rationale is that, we do some optimizations at the end of the building phase which may move some repartition up in the topology etc, which would effectively change the topology. And hence, we would not know if all of the downstream operators after the repartition would be filtering null keys until the optimization that changed the topology is done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Gerrrr commented on pull request #11447: KAFKA-13024: Use not-null filter only in optimizable repartitions

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on pull request #11447:
URL: https://github.com/apache/kafka/pull/11447#issuecomment-956039330


   CI test failure is caused by a a known flaky test - ([KAFKA-8541](https://issues.apache.org/jira/browse/KAFKA-8541]), so probably unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11447: KAFKA-13024: Use not-null filter only in optimizable repartitions

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11447:
URL: https://github.com/apache/kafka/pull/11447#discussion_r741604615



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
##########
@@ -119,5 +119,10 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
                 processorParameters
             );
         }
+
+        @Override
+        public boolean isOptimizable() {
+            return false;

Review comment:
       Why is a table repartitioning not optimizable?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
##########
@@ -672,6 +672,42 @@ public void shouldGoThroughRebalancingCorrectly() throws Exception {
         assertEquals(2, getNumberOfPartitionsForTopic(repartitionTopicName));
     }
 
+    @Test
+    public void shouldNotFilterOutNullKeysOnRepartition() throws Exception {
+        final String repartitionName = "repartition-test";
+        final long timestamp = System.currentTimeMillis();
+        sendEvents(
+            timestamp,
+            Arrays.asList(
+                new KeyValue<>(1, "A"),
+                new KeyValue<>(2, "B"),
+                new KeyValue<>(3, null)
+            )
+        );
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Repartitioned<String, String> repartitioned = Repartitioned.<String, String>as(repartitionName)
+            .withKeySerde(Serdes.String())
+            .withValueSerde(Serdes.String());
+
+        builder.stream(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String()))
+            .selectKey((key, value) -> value == null ? null : key.toString())
+            .repartition(repartitioned)
+            .mapValues(value -> value != null ? "mapped-" + value  : "default-value")

Review comment:
       Why do we need the `mapValues()` step in this test?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/UnoptimizableRepartitionNode.java
##########
@@ -53,19 +53,13 @@ private UnoptimizableRepartitionNode(final String nodeName,
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         topologyBuilder.addInternalTopic(repartitionTopic, internalTopicProperties);
 
-        topologyBuilder.addProcessor(

Review comment:
       Not sure if I understand this change? Can you elaborate?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] suhas-satish commented on pull request #11447: KAFKA-13024: Use not-null filter only in optimizable repartitions

Posted by GitBox <gi...@apache.org>.
suhas-satish commented on pull request #11447:
URL: https://github.com/apache/kafka/pull/11447#issuecomment-1017738075


   @Gerrrr , do you want to mention streams-committers on v2 of this draft PR ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Gerrrr commented on pull request #11447: KAFKA-13024

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on pull request #11447:
URL: https://github.com/apache/kafka/pull/11447#issuecomment-1026044886


   Thanks for taking a look at this PR @guozhangwang !
   
   >  we are only fixing KStream, since for KTable it's always safe to filter null keys
   
   Yes
   
   >  it's trying to maintain the original optimization but only when all downstream operators after the repartition are going to filter null keys, i.e. as long as one downstream operator is not a per-key stateful operator that filters null keys then we should not apply this optimization.
   
   This patch drops `null` keys on repartition only if the downstream operation drops them (see `KStreamRepartitionIntegrationTest#shouldDropNullKeysOnRepartitionWithDownstreamAggregate`).
   
   First, we mark certain operations that filter `null` keys or keys and values as such via corresponding `ProcessorParameters` (see https://github.com/apache/kafka/pull/11447/files#diff-b80820b46c327e9a393d3b2435f3c141745d7c99b02196744dbe2bbdb2fd95acR65-R67). Based on these parameters, we prepend decorator nodes in the logical topology before their graph nodes (see `InternalStreamsBuilder#addGraphNode`). Then, `InternalStreamsBuilder#applyDecorators` traverses the topology and removes redundant decorators (e.g. multiple operations in a row filter our `null` keys). Finally, `InternalStreamsBuilder` removes decorators from the logical topology and attaches them to their successor nodes.
   
   When `ProcessorGraphNodes` call `writeToTopology`, they check that a decorator is attached. If the decorator is attached, it wraps the processing code (e.g. https://github.com/apache/kafka/pull/11447/files#diff-61f97d0e2f4db6ed1fd994da154c3955fe943c76559217e27b0b1e10daba4623R85-R92)
   
   See `InternalStreamsBuilder#applyDecorators` for more details.
   
   > Can we just use the KStreamFilter?
   
   @mjsax was concerned that adding `KStreamFilter` will change the physical topology and might break user workflows if they rely on node names. This patch does not introduce new nodes to the physical topology.
   
   > we would not be able to decide whether the optimization on the upstream
   
   I might be missing the full picture here as I am a bit new to the project, but, as I see it, the algorithm I described above does not suffer from this issue. There, we essentially extract `if (record.key() == null || record.value() == null) {` code from the operations (e.g. https://github.com/apache/kafka/pull/11447/files#diff-ad3728ff5f32ed1e88e11e46c0de970bcce01c39767fd3ae666e4a0bfbd97be3L87-L104), except we also don't check for that condition more than necessary. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11447: KAFKA-13024: Use not-null filter only in optimizable repartitions

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11447:
URL: https://github.com/apache/kafka/pull/11447#discussion_r741604615



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
##########
@@ -119,5 +119,10 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
                 processorParameters
             );
         }
+
+        @Override
+        public boolean isOptimizable() {
+            return false;

Review comment:
       Why is a table repartitioning not optimizable?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
##########
@@ -672,6 +672,42 @@ public void shouldGoThroughRebalancingCorrectly() throws Exception {
         assertEquals(2, getNumberOfPartitionsForTopic(repartitionTopicName));
     }
 
+    @Test
+    public void shouldNotFilterOutNullKeysOnRepartition() throws Exception {
+        final String repartitionName = "repartition-test";
+        final long timestamp = System.currentTimeMillis();
+        sendEvents(
+            timestamp,
+            Arrays.asList(
+                new KeyValue<>(1, "A"),
+                new KeyValue<>(2, "B"),
+                new KeyValue<>(3, null)
+            )
+        );
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Repartitioned<String, String> repartitioned = Repartitioned.<String, String>as(repartitionName)
+            .withKeySerde(Serdes.String())
+            .withValueSerde(Serdes.String());
+
+        builder.stream(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String()))
+            .selectKey((key, value) -> value == null ? null : key.toString())
+            .repartition(repartitioned)
+            .mapValues(value -> value != null ? "mapped-" + value  : "default-value")

Review comment:
       Why do we need the `mapValues()` step in this test?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/UnoptimizableRepartitionNode.java
##########
@@ -53,19 +53,13 @@ private UnoptimizableRepartitionNode(final String nodeName,
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         topologyBuilder.addInternalTopic(repartitionTopic, internalTopicProperties);
 
-        topologyBuilder.addProcessor(

Review comment:
       Not sure if I understand this change? Can you elaborate?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Gerrrr commented on pull request #11447: KAFKA-13024: Use not-null filter only in optimizable repartitions

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on pull request #11447:
URL: https://github.com/apache/kafka/pull/11447#issuecomment-960765623


   Thanks for the review! I agree that solving this issue by removing the not-null optimization completely is very much suboptimal. 8f176cb reverts the first commit. 7c6e97f introduces another approach where `UnoptimizableRepartitionNode` adds the not-null filter iff all downstream operations drop the null keys.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11447: KAFKA-13024: Use not-null filter only in optimizable repartitions

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11447:
URL: https://github.com/apache/kafka/pull/11447#discussion_r741604615



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
##########
@@ -119,5 +119,10 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
                 processorParameters
             );
         }
+
+        @Override
+        public boolean isOptimizable() {
+            return false;

Review comment:
       Why is a table repartitioning not optimizable?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
##########
@@ -672,6 +672,42 @@ public void shouldGoThroughRebalancingCorrectly() throws Exception {
         assertEquals(2, getNumberOfPartitionsForTopic(repartitionTopicName));
     }
 
+    @Test
+    public void shouldNotFilterOutNullKeysOnRepartition() throws Exception {
+        final String repartitionName = "repartition-test";
+        final long timestamp = System.currentTimeMillis();
+        sendEvents(
+            timestamp,
+            Arrays.asList(
+                new KeyValue<>(1, "A"),
+                new KeyValue<>(2, "B"),
+                new KeyValue<>(3, null)
+            )
+        );
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Repartitioned<String, String> repartitioned = Repartitioned.<String, String>as(repartitionName)
+            .withKeySerde(Serdes.String())
+            .withValueSerde(Serdes.String());
+
+        builder.stream(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String()))
+            .selectKey((key, value) -> value == null ? null : key.toString())
+            .repartition(repartitioned)
+            .mapValues(value -> value != null ? "mapped-" + value  : "default-value")

Review comment:
       Why do we need the `mapValues()` step in this test?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/UnoptimizableRepartitionNode.java
##########
@@ -53,19 +53,13 @@ private UnoptimizableRepartitionNode(final String nodeName,
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         topologyBuilder.addInternalTopic(repartitionTopic, internalTopicProperties);
 
-        topologyBuilder.addProcessor(

Review comment:
       Not sure if I understand this change? Can you elaborate?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11447: KAFKA-13024: Use not-null filter only in optimizable repartitions

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11447:
URL: https://github.com/apache/kafka/pull/11447#discussion_r741604615



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
##########
@@ -119,5 +119,10 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
                 processorParameters
             );
         }
+
+        @Override
+        public boolean isOptimizable() {
+            return false;

Review comment:
       Why is a table repartitioning not optimizable?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
##########
@@ -672,6 +672,42 @@ public void shouldGoThroughRebalancingCorrectly() throws Exception {
         assertEquals(2, getNumberOfPartitionsForTopic(repartitionTopicName));
     }
 
+    @Test
+    public void shouldNotFilterOutNullKeysOnRepartition() throws Exception {
+        final String repartitionName = "repartition-test";
+        final long timestamp = System.currentTimeMillis();
+        sendEvents(
+            timestamp,
+            Arrays.asList(
+                new KeyValue<>(1, "A"),
+                new KeyValue<>(2, "B"),
+                new KeyValue<>(3, null)
+            )
+        );
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Repartitioned<String, String> repartitioned = Repartitioned.<String, String>as(repartitionName)
+            .withKeySerde(Serdes.String())
+            .withValueSerde(Serdes.String());
+
+        builder.stream(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String()))
+            .selectKey((key, value) -> value == null ? null : key.toString())
+            .repartition(repartitioned)
+            .mapValues(value -> value != null ? "mapped-" + value  : "default-value")

Review comment:
       Why do we need the `mapValues()` step in this test?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/UnoptimizableRepartitionNode.java
##########
@@ -53,19 +53,13 @@ private UnoptimizableRepartitionNode(final String nodeName,
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         topologyBuilder.addInternalTopic(repartitionTopic, internalTopicProperties);
 
-        topologyBuilder.addProcessor(

Review comment:
       Not sure if I understand this change? Can you elaborate?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org