You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Ivan Ponomarev (Jira)" <ji...@apache.org> on 2021/02/05 08:38:00 UTC
[jira] [Closed] (KAFKA-5488) KStream.branch should not return a
Array of streams we have to access by known index
[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ivan Ponomarev closed KAFKA-5488.
---------------------------------
PR is merged to trunk
to be included in 2.8.0 release
> KStream.branch should not return a Array of streams we have to access by known index
> ------------------------------------------------------------------------------------
>
> Key: KAFKA-5488
> URL: https://issues.apache.org/jira/browse/KAFKA-5488
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Marcel "childNo͡.de" Trautwein
> Assignee: Ivan Ponomarev
> Priority: Major
> Labels: kip
> Fix For: 2.8.0
>
>
> KIP-418: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream]
> long story short: it's a mess to get a {{KStream<>[]}} out from {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces bad code which is not that good to maintain since you have to know the right index for an unnamed branching stream.
> Example
> {code:java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
> public static void main(String... args) {
> KStream<byte[], EventType>[] branchedStreams= new KStreamBuilder()
> .<byte[], EventType>stream("eventTopic")
> .branch(
> (k, v) -> EventType::validData
> (k, v) -> true
> );
>
> branchedStreams[0]
> .to("topicValidData");
>
> branchedStreams[1]
> .to("topicInvalidData");
> }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition<Predicate<>, Consumer<KStream<>>>... branchPredicatesAndHandlers);}} where you can write branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:java}
> new KStreamBuilder()
> .<byte[], EventType>stream("eventTopic")
> .branch(
> Branch.create(
> (k, v) -> EventType::validData,
> stream -> stream.to("topicValidData")
> ),
> Branch.create(
> (k, v) -> true,
> stream -> stream.to("topicInvalidData")
> )
> );
> {code}
> I'll go forward to evaluate some ideas:
> [https://gitlab.com/childno.de/apache_kafka/snippets/1665655]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)