You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/23 10:04:23 UTC

[GitHub] [kafka] g1geordie opened a new pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

g1geordie opened a new pull request #10190:
URL: https://github.com/apache/kafka/pull/10190


   Custom stream naming does not work while calling stream[K, V](topicPattern: Pattern)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-867800402


   cherry-picked to 2.8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck merged pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

Posted by GitBox <gi...@apache.org>.
bbejeck merged pull request #10190:
URL: https://github.com/apache/kafka/pull/10190


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] g1geordie commented on a change in pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

Posted by GitBox <gi...@apache.org>.
g1geordie commented on a change in pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#discussion_r581599900



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
##########
@@ -57,40 +59,28 @@ public String name() {
     public NamedInternal withName(final String name) {
         return new NamedInternal(name);
     }
-    
+
     String suffixWithOrElseGet(final String suffix, final String other) {
-        if (name != null) {
-            return name + suffix;
-        } else {
-            return other;
-        }
+        return Optional.ofNullable(name)
+                // Re-validate generated name as suffixed string could be too large.
+                .map(x -> withName(name + suffix).name)
+                .orElse(other);
     }
 
     String suffixWithOrElseGet(final String suffix, final InternalNameProvider provider, final String prefix) {
         // We actually do not need to generate processor names for operation if a name is specified.
         // But before returning, we still need to burn index for the operation to keep topology backward compatibility.
-        if (name != null) {
-            provider.newProcessorName(prefix);
+        final String defaultName = provider.newProcessorName(prefix);
 
-            final String suffixed = name + suffix;
-            // Re-validate generated name as suffixed string could be too large.
-            Named.validate(suffixed);
-
-            return suffixed;
-        } else {
-            return provider.newProcessorName(prefix);
-        }
+        return suffixWithOrElseGet(suffix, defaultName);
     }
 
     String orElseGenerateWithPrefix(final InternalNameProvider provider, final String prefix) {
         // We actually do not need to generate processor names for operation if a name is specified.
         // But before returning, we still need to burn index for the operation to keep topology backward compatibility.

Review comment:
       The NAMED_INTERMEDIATE_TOPOLOGY
   <pre><code>
   Topologies:
      Sub-topology: 0
       <b>Source: KSTREAM-SOURCE-0000000000 (topics: [input])</b>
         --> KSTREAM-AGGREGATE-0000000002
       <b>Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])</b>
         --> asdf
         <-- KSTREAM-SOURCE-0000000000
       <b>Processor: asdf (stores: [asdf-store])</b>
         --> KTABLE-TOSTREAM-0000000003
         <-- KSTREAM-AGGREGATE-0000000002
       <b>Processor: KTABLE-TOSTREAM-0000000003 (stores: [])</b>
         --> KSTREAM-SINK-0000000004
         <-- asdf
       <b>Sink: KSTREAM-SINK-0000000004 (topic: output)</b>
         <-- KTABLE-TOSTREAM-0000000003
   </code></pre>




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-784444903


   Call for review @bbejeck
   
   @g1geordie there seem to be failing tests (`SuppressTopologyTest` and `RepartitionOptimizingTest `) relevant to this change:


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-866910927


   @g1geordie apologies for letting this go for so long.  I'm looking now and let's see if we can't get this in over the next few days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-867765408


   merged #10190 into trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] g1geordie commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

Posted by GitBox <gi...@apache.org>.
g1geordie commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-815594893


   @bbejeck 
   Thanks for your response !!
   
   I see the compatibility claim in NamedInternal.
   ```
   String orElseGenerateWithPrefix(final InternalNameProvider provider, final String prefix) {
           // We actually do not need to generate processor names for operation if a name is specified.
           // But before returning, we still need to burn index for the operation to keep topology backward compatibility.
           ...
   }
   ```
   The index is not correct  in SuppressTopologyTest.shouldApplyNameToSuppressionNode.
   	  
   ```
   without  `.suppress(... .withName("asdf"))` name option.
   	
   Topologies:
      Sub-topology: 0
       Source: KSTREAM-SOURCE-0000000000 (topics: [input])
         --> KSTREAM-AGGREGATE-0000000002
       Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
         --> KTABLE-SUPPRESS-0000000003
         <-- KSTREAM-SOURCE-0000000000
       Processor: KTABLE-SUPPRESS-0000000003 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000004])
         --> KTABLE-TOSTREAM-0000000005
         <-- KSTREAM-AGGREGATE-0000000002
       Processor: KTABLE-TOSTREAM-0000000005 (stores: [])
         --> KSTREAM-SINK-0000000006
         <-- KTABLE-SUPPRESS-0000000003
       Sink: KSTREAM-SINK-0000000006 (topic: output)
         <-- KTABLE-TOSTREAM-0000000005
   ```
   So I change expected index to below .
   ```
   Topologies:
      Sub-topology: 0
       Source: KSTREAM-SOURCE-0000000000 (topics: [input])
         --> KSTREAM-AGGREGATE-0000000002
       Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
         --> asdf
         <-- KSTREAM-SOURCE-0000000000
       Processor: asdf (stores: [asdf-store])
         --> KTABLE-TOSTREAM-0000000005
         <-- KSTREAM-AGGREGATE-0000000002
       Processor: KTABLE-TOSTREAM-0000000005 (stores: [])
         --> KSTREAM-SINK-0000000006
         <-- asdf
       Sink: KSTREAM-SINK-0000000006 (topic: output)
         <-- KTABLE-TOSTREAM-0000000005  
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] g1geordie commented on a change in pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

Posted by GitBox <gi...@apache.org>.
g1geordie commented on a change in pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#discussion_r581597952



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
##########
@@ -57,40 +59,28 @@ public String name() {
     public NamedInternal withName(final String name) {
         return new NamedInternal(name);
     }
-    
+
     String suffixWithOrElseGet(final String suffix, final String other) {
-        if (name != null) {
-            return name + suffix;
-        } else {
-            return other;
-        }
+        return Optional.ofNullable(name)
+                // Re-validate generated name as suffixed string could be too large.
+                .map(x -> withName(name + suffix).name)
+                .orElse(other);
     }
 
     String suffixWithOrElseGet(final String suffix, final InternalNameProvider provider, final String prefix) {
         // We actually do not need to generate processor names for operation if a name is specified.
         // But before returning, we still need to burn index for the operation to keep topology backward compatibility.
-        if (name != null) {
-            provider.newProcessorName(prefix);
+        final String defaultName = provider.newProcessorName(prefix);
 
-            final String suffixed = name + suffix;
-            // Re-validate generated name as suffixed string could be too large.
-            Named.validate(suffixed);
-
-            return suffixed;
-        } else {
-            return provider.newProcessorName(prefix);
-        }
+        return suffixWithOrElseGet(suffix, defaultName);
     }
 
     String orElseGenerateWithPrefix(final InternalNameProvider provider, final String prefix) {
         // We actually do not need to generate processor names for operation if a name is specified.
         // But before returning, we still need to burn index for the operation to keep topology backward compatibility.

Review comment:
       May I ask a question?
    
   Is Backward compatibility still in the current version?
   I see the hint in other test
   
   <pre><code>
   SuppressTopologyTest.shouldApplyNameToSuppressionNode {
           ...
           .suppress(untilTimeLimit(Duration.ofSeconds(1), unbounded()).withName("asdf"))
           ....
          // <del>without</del>(with) the name,  <b>the suppression node does not increment the topology index </b>
           assertThat(namedNodeTopology, is(NAMED_INTERMEDIATE_TOPOLOGY));
   }
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] g1geordie commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

Posted by GitBox <gi...@apache.org>.
g1geordie commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-867105129


   @bbejeck  Thanks for the review.  
   You are right !!!!
   Please help me review again.
    
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck edited a comment on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

Posted by GitBox <gi...@apache.org>.
bbejeck edited a comment on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-815142541


   @g1geordie, thanks for the PR and sorry for the delay in the review.  
   
   Can you rebase this PR to resolve the conflicts?  I plan on reviewing this in the next day or so.  Also, there seem to be some failing tests that are related.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] g1geordie commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

Posted by GitBox <gi...@apache.org>.
g1geordie commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-784072487


   @mjsax Hello~ 
   Can you help me to take a look? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-815142541


   @g1geordie, thanks for the PR and sorry for the delay in the review.  
   
   Can you rebase this PR?  I plan on reviewing this in the next day or so.  Also, there seem to be some failing tests that are related.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-867765677


   Thanks @g1geordie for the contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] g1geordie commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

Posted by GitBox <gi...@apache.org>.
g1geordie commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-867277063


   @bbejeck 
   Failed test also appear in other issues. I think it's unrelated.
   ```
   org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, useInlinePem=false
   kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
   kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   ```
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-867764005


   ```
   Build / JDK 11 and Scala 2.13 / [1] tlsProtocol=TLSv1.2, useInlinePem=false – org.apache.kafka.common.network.SslTransportLayerTest
   15s
   Build / JDK 11 and Scala 2.13 / [2] tlsProtocol=TLSv1.2, useInlinePem=true – org.apache.kafka.common.network.SslTransportLayerTest
   15s
   Build / JDK 11 and Scala 2.13 / [3] tlsProtocol=TLSv1.3, useInlinePem=false – org.apache.kafka.common.network.SslTransportLayerTest
   15s
   Build / JDK 11 and Scala 2.13 / [1] tlsProtocol=TLSv1.2, useInlinePem=false – org.apache.kafka.common.network.SslTransportLayerTest
   15s
   Build / JDK 11 and Scala 2.13 / [2] tlsProtocol=TLSv1.2, useInlinePem=true – org.apache.kafka.common.network.SslTransportLayerTest
   15s
   Build / JDK 11 and Scala 2.13 / [3] tlsProtocol=TLSv1.3, useInlinePem=false – org.apache.kafka.common.network.SslTransportLayerTest
   15s
   Build / JDK 11 and Scala 2.13 / testCreateClusterAndCreateAndManyTopics() – kafka.server.RaftClusterTest
   16s
   Build / JDK 16 and Scala 2.13 / testCreateClusterAndCreateListDeleteTopic() – kafka.server.RaftClusterTest
   23s
   Build / JDK 8 and Scala 2.12 / testCreateClusterAndCreateAndManyTopicsWithManyPartitions() – kafka.server.RaftClusterTest
   16s
   ```
   Failures unrelated to this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org