You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/23 10:04:23 UTC
[GitHub] [kafka] g1geordie opened a new pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
g1geordie opened a new pull request #10190:
URL: https://github.com/apache/kafka/pull/10190
Custom stream naming does not work while calling stream[K, V](topicPattern: Pattern)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-867800402
cherry-picked to 2.8
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
Posted by GitBox <gi...@apache.org>.
bbejeck merged pull request #10190:
URL: https://github.com/apache/kafka/pull/10190
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] g1geordie commented on a change in pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
Posted by GitBox <gi...@apache.org>.
g1geordie commented on a change in pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#discussion_r581599900
##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
##########
@@ -57,40 +59,28 @@ public String name() {
public NamedInternal withName(final String name) {
return new NamedInternal(name);
}
-
+
String suffixWithOrElseGet(final String suffix, final String other) {
- if (name != null) {
- return name + suffix;
- } else {
- return other;
- }
+ return Optional.ofNullable(name)
+ // Re-validate generated name as suffixed string could be too large.
+ .map(x -> withName(name + suffix).name)
+ .orElse(other);
}
String suffixWithOrElseGet(final String suffix, final InternalNameProvider provider, final String prefix) {
// We actually do not need to generate processor names for operation if a name is specified.
// But before returning, we still need to burn index for the operation to keep topology backward compatibility.
- if (name != null) {
- provider.newProcessorName(prefix);
+ final String defaultName = provider.newProcessorName(prefix);
- final String suffixed = name + suffix;
- // Re-validate generated name as suffixed string could be too large.
- Named.validate(suffixed);
-
- return suffixed;
- } else {
- return provider.newProcessorName(prefix);
- }
+ return suffixWithOrElseGet(suffix, defaultName);
}
String orElseGenerateWithPrefix(final InternalNameProvider provider, final String prefix) {
// We actually do not need to generate processor names for operation if a name is specified.
// But before returning, we still need to burn index for the operation to keep topology backward compatibility.
Review comment:
The NAMED_INTERMEDIATE_TOPOLOGY
<pre><code>
Topologies:
Sub-topology: 0
<b>Source: KSTREAM-SOURCE-0000000000 (topics: [input])</b>
--> KSTREAM-AGGREGATE-0000000002
<b>Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])</b>
--> asdf
<-- KSTREAM-SOURCE-0000000000
<b>Processor: asdf (stores: [asdf-store])</b>
--> KTABLE-TOSTREAM-0000000003
<-- KSTREAM-AGGREGATE-0000000002
<b>Processor: KTABLE-TOSTREAM-0000000003 (stores: [])</b>
--> KSTREAM-SINK-0000000004
<-- asdf
<b>Sink: KSTREAM-SINK-0000000004 (topic: output)</b>
<-- KTABLE-TOSTREAM-0000000003
</code></pre>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-784444903
Call for review @bbejeck
@g1geordie there seem to be failing tests (`SuppressTopologyTest` and `RepartitionOptimizingTest `) relevant to this change:
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-866910927
@g1geordie apologies for letting this go for so long. I'm looking now and let's see if we can't get this in over the next few days.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-867765408
merged #10190 into trunk
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] g1geordie commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
Posted by GitBox <gi...@apache.org>.
g1geordie commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-815594893
@bbejeck
Thanks for your response !!
I see the compatibility claim in NamedInternal.
```
String orElseGenerateWithPrefix(final InternalNameProvider provider, final String prefix) {
// We actually do not need to generate processor names for operation if a name is specified.
// But before returning, we still need to burn index for the operation to keep topology backward compatibility.
...
}
```
The index is not correct in SuppressTopologyTest.shouldApplyNameToSuppressionNode.
```
without `.suppress(... .withName("asdf"))` name option.
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-AGGREGATE-0000000002
Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
--> KTABLE-SUPPRESS-0000000003
<-- KSTREAM-SOURCE-0000000000
Processor: KTABLE-SUPPRESS-0000000003 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000004])
--> KTABLE-TOSTREAM-0000000005
<-- KSTREAM-AGGREGATE-0000000002
Processor: KTABLE-TOSTREAM-0000000005 (stores: [])
--> KSTREAM-SINK-0000000006
<-- KTABLE-SUPPRESS-0000000003
Sink: KSTREAM-SINK-0000000006 (topic: output)
<-- KTABLE-TOSTREAM-0000000005
```
So I change expected index to below .
```
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-AGGREGATE-0000000002
Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
--> asdf
<-- KSTREAM-SOURCE-0000000000
Processor: asdf (stores: [asdf-store])
--> KTABLE-TOSTREAM-0000000005
<-- KSTREAM-AGGREGATE-0000000002
Processor: KTABLE-TOSTREAM-0000000005 (stores: [])
--> KSTREAM-SINK-0000000006
<-- asdf
Sink: KSTREAM-SINK-0000000006 (topic: output)
<-- KTABLE-TOSTREAM-0000000005
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] g1geordie commented on a change in pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
Posted by GitBox <gi...@apache.org>.
g1geordie commented on a change in pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#discussion_r581597952
##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
##########
@@ -57,40 +59,28 @@ public String name() {
public NamedInternal withName(final String name) {
return new NamedInternal(name);
}
-
+
String suffixWithOrElseGet(final String suffix, final String other) {
- if (name != null) {
- return name + suffix;
- } else {
- return other;
- }
+ return Optional.ofNullable(name)
+ // Re-validate generated name as suffixed string could be too large.
+ .map(x -> withName(name + suffix).name)
+ .orElse(other);
}
String suffixWithOrElseGet(final String suffix, final InternalNameProvider provider, final String prefix) {
// We actually do not need to generate processor names for operation if a name is specified.
// But before returning, we still need to burn index for the operation to keep topology backward compatibility.
- if (name != null) {
- provider.newProcessorName(prefix);
+ final String defaultName = provider.newProcessorName(prefix);
- final String suffixed = name + suffix;
- // Re-validate generated name as suffixed string could be too large.
- Named.validate(suffixed);
-
- return suffixed;
- } else {
- return provider.newProcessorName(prefix);
- }
+ return suffixWithOrElseGet(suffix, defaultName);
}
String orElseGenerateWithPrefix(final InternalNameProvider provider, final String prefix) {
// We actually do not need to generate processor names for operation if a name is specified.
// But before returning, we still need to burn index for the operation to keep topology backward compatibility.
Review comment:
May I ask a question?
Is Backward compatibility still in the current version?
I see the hint in other test
<pre><code>
SuppressTopologyTest.shouldApplyNameToSuppressionNode {
...
.suppress(untilTimeLimit(Duration.ofSeconds(1), unbounded()).withName("asdf"))
....
// <del>without</del>(with) the name, <b>the suppression node does not increment the topology index </b>
assertThat(namedNodeTopology, is(NAMED_INTERMEDIATE_TOPOLOGY));
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] g1geordie commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
Posted by GitBox <gi...@apache.org>.
g1geordie commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-867105129
@bbejeck Thanks for the review.
You are right !!!!
Please help me review again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] bbejeck edited a comment on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
Posted by GitBox <gi...@apache.org>.
bbejeck edited a comment on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-815142541
@g1geordie, thanks for the PR and sorry for the delay in the review.
Can you rebase this PR to resolve the conflicts? I plan on reviewing this in the next day or so. Also, there seem to be some failing tests that are related.
Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] g1geordie commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
Posted by GitBox <gi...@apache.org>.
g1geordie commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-784072487
@mjsax Hello~
Can you help me to take a look?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-815142541
@g1geordie, thanks for the PR and sorry for the delay in the review.
Can you rebase this PR? I plan on reviewing this in the next day or so. Also, there seem to be some failing tests that are related.
Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-867765677
Thanks @g1geordie for the contribution!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] g1geordie commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
Posted by GitBox <gi...@apache.org>.
g1geordie commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-867277063
@bbejeck
Failed test also appear in other issues. I think it's unrelated.
```
org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, useInlinePem=false
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
```
Thank you.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #10190: KAFKA-12336 Custom stream naming does not work while calling stream[K…
Posted by GitBox <gi...@apache.org>.
bbejeck commented on pull request #10190:
URL: https://github.com/apache/kafka/pull/10190#issuecomment-867764005
```
Build / JDK 11 and Scala 2.13 / [1] tlsProtocol=TLSv1.2, useInlinePem=false – org.apache.kafka.common.network.SslTransportLayerTest
15s
Build / JDK 11 and Scala 2.13 / [2] tlsProtocol=TLSv1.2, useInlinePem=true – org.apache.kafka.common.network.SslTransportLayerTest
15s
Build / JDK 11 and Scala 2.13 / [3] tlsProtocol=TLSv1.3, useInlinePem=false – org.apache.kafka.common.network.SslTransportLayerTest
15s
Build / JDK 11 and Scala 2.13 / [1] tlsProtocol=TLSv1.2, useInlinePem=false – org.apache.kafka.common.network.SslTransportLayerTest
15s
Build / JDK 11 and Scala 2.13 / [2] tlsProtocol=TLSv1.2, useInlinePem=true – org.apache.kafka.common.network.SslTransportLayerTest
15s
Build / JDK 11 and Scala 2.13 / [3] tlsProtocol=TLSv1.3, useInlinePem=false – org.apache.kafka.common.network.SslTransportLayerTest
15s
Build / JDK 11 and Scala 2.13 / testCreateClusterAndCreateAndManyTopics() – kafka.server.RaftClusterTest
16s
Build / JDK 16 and Scala 2.13 / testCreateClusterAndCreateListDeleteTopic() – kafka.server.RaftClusterTest
23s
Build / JDK 8 and Scala 2.12 / testCreateClusterAndCreateAndManyTopicsWithManyPartitions() – kafka.server.RaftClusterTest
16s
```
Failures unrelated to this PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org