You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/24 03:46:58 UTC

[GitHub] [kafka] g1geordie commented on a change in pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…

g1geordie commented on a change in pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#discussion_r581597952



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
##########
@@ -57,40 +59,28 @@ public String name() {
     public NamedInternal withName(final String name) {
         return new NamedInternal(name);
     }
-    
+
     String suffixWithOrElseGet(final String suffix, final String other) {
-        if (name != null) {
-            return name + suffix;
-        } else {
-            return other;
-        }
+        return Optional.ofNullable(name)
+                // Re-validate generated name as suffixed string could be too large.
+                .map(x -> withName(name + suffix).name)
+                .orElse(other);
     }
 
     String suffixWithOrElseGet(final String suffix, final InternalNameProvider provider, final String prefix) {
         // We actually do not need to generate processor names for operation if a name is specified.
         // But before returning, we still need to burn index for the operation to keep topology backward compatibility.
-        if (name != null) {
-            provider.newProcessorName(prefix);
+        final String defaultName = provider.newProcessorName(prefix);
 
-            final String suffixed = name + suffix;
-            // Re-validate generated name as suffixed string could be too large.
-            Named.validate(suffixed);
-
-            return suffixed;
-        } else {
-            return provider.newProcessorName(prefix);
-        }
+        return suffixWithOrElseGet(suffix, defaultName);
     }
 
     String orElseGenerateWithPrefix(final InternalNameProvider provider, final String prefix) {
         // We actually do not need to generate processor names for operation if a name is specified.
         // But before returning, we still need to burn index for the operation to keep topology backward compatibility.

Review comment:
       May I ask a question?
    
   Is Backward compatibility still in the current version?
   I see the hint in other test
   
   <pre><code>
   SuppressTopologyTest.shouldApplyNameToSuppressionNode {
           ...
           .suppress(untilTimeLimit(Duration.ofSeconds(1), unbounded()).withName("asdf"))
           ....
          // <del>without</del>(with) the name,  <b>the suppression node does not increment the topology index </b>
           assertThat(namedNodeTopology, is(NAMED_INTERMEDIATE_TOPOLOGY));
   }
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org