You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2017/07/06 00:35:00 UTC

[jira] [Updated] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL

     [ https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Guozhang Wang updated KAFKA-4601:
---------------------------------
    Description: 
Consider the following DSL:

{code}
Stream<String, String> source = builder.stream(Serdes.String(), Serdes.String(), "topic1");
Stream<String, String> mapped = source.map(..);

        KTable<String, Long> counts = mapped
                .groupByKey()
                .count("Counts");

        KStream<String, String> sink = mapped.leftJoin(counts, ..);
{code}

The resulted topology looks like this:

{code}
ProcessorTopology:
				KSTREAM-SOURCE-0000000000:
					topics:		[topic1]
					children:	[KSTREAM-MAP-0000000001]
				KSTREAM-MAP-0000000001:
					children:	[KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000007]
				KSTREAM-FILTER-0000000004:
					children:	[KSTREAM-SINK-0000000003]
				KSTREAM-SINK-0000000003:
					topic:		X-Counts-repartition
				KSTREAM-FILTER-0000000007:
					children:	[KSTREAM-SINK-0000000006]
				KSTREAM-SINK-0000000006:
					topic:		X-KSTREAM-MAP-0000000001-repartition

ProcessorTopology:
				KSTREAM-SOURCE-0000000008:
					topics:		[X-KSTREAM-MAP-0000000001-repartition]
					children:	[KSTREAM-LEFTJOIN-0000000009]
				KSTREAM-LEFTJOIN-0000000009:
					states:		[Counts]
				KSTREAM-SOURCE-0000000005:
					topics:		[X-Counts-repartition]
					children:	[KSTREAM-AGGREGATE-0000000002]
				KSTREAM-AGGREGATE-0000000002:
					states:		[Counts]
{code}

I.e. there are two repartition topics, one for the aggregate and one for the join, which not only introduce unnecessary overheads but also mess up the processing ordering (users are expecting each record to go through aggregation first then the join operator). And in order to get the following simpler topology users today need to add a {{through}} operator after {{map}} manually to enforce repartitioning.

{code}
Stream<String, String> source = builder.stream(Serdes.String(), Serdes.String(), "topic1");
Stream<String, String> repartitioned = source.map(..).through("topic2");

        KTable<String, Long> counts = repartitioned
                .groupByKey()
                .count("Counts");

        KStream<String, String> sink = repartitioned.leftJoin(counts, ..);
{code}

The resulted topology then will look like this:

{code}
ProcessorTopology:
				KSTREAM-SOURCE-0000000000:
					topics:		[topic1]
					children:	[KSTREAM-MAP-0000000001]
				KSTREAM-MAP-0000000001:
					children:	[KSTREAM-SINK-0000000002]
				KSTREAM-SINK-0000000002:
					topic:		topic 2

ProcessorTopology:
				KSTREAM-SOURCE-0000000003:
					topics:		[topic 2]
					children:	[KSTREAM-AGGREGATE-0000000004, KSTREAM-LEFTJOIN-0000000005]
				KSTREAM-AGGREGATE-0000000004:
					states:		[Counts]
				KSTREAM-LEFTJOIN-0000000005:
					states:		[Counts]
{code} 

This kind of optimization should be automatic in Streams, which we can consider doing when extending from one-operator-at-a-time translation.

  was:
Consider the following DSL:

{code}
Stream<String, String> source = builder.stream(Serdes.String(), Serdes.String(), "topic1").map(..);

        KTable<String, Long> counts = source
                .groupByKey()
                .count("Counts");

        KStream<String, String> sink = source.leftJoin(counts, ..);
{code}

The resulted topology looks like this:

{code}
ProcessorTopology:
				KSTREAM-SOURCE-0000000000:
					topics:		[topic1]
					children:	[KSTREAM-MAP-0000000001]
				KSTREAM-MAP-0000000001:
					children:	[KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000007]
				KSTREAM-FILTER-0000000004:
					children:	[KSTREAM-SINK-0000000003]
				KSTREAM-SINK-0000000003:
					topic:		X-Counts-repartition
				KSTREAM-FILTER-0000000007:
					children:	[KSTREAM-SINK-0000000006]
				KSTREAM-SINK-0000000006:
					topic:		X-KSTREAM-MAP-0000000001-repartition

ProcessorTopology:
				KSTREAM-SOURCE-0000000008:
					topics:		[X-KSTREAM-MAP-0000000001-repartition]
					children:	[KSTREAM-LEFTJOIN-0000000009]
				KSTREAM-LEFTJOIN-0000000009:
					states:		[Counts]
				KSTREAM-SOURCE-0000000005:
					topics:		[X-Counts-repartition]
					children:	[KSTREAM-AGGREGATE-0000000002]
				KSTREAM-AGGREGATE-0000000002:
					states:		[Counts]
{code}

I.e. there are two repartition topics, one for the aggregate and one for the join, which not only introduce unnecessary overheads but also mess up the processing ordering (users are expecting each record to go through aggregation first then the join operator). And in order to get the following simpler topology users today need to add a {{through}} operator after {{map}} manually to enforce repartitioning.

{code}
ProcessorTopology:
				KSTREAM-SOURCE-0000000000:
					topics:		[topic1]
					children:	[KSTREAM-MAP-0000000001]
				KSTREAM-MAP-0000000001:
					children:	[KSTREAM-SINK-0000000002]
				KSTREAM-SINK-0000000002:
					topic:		topic 2

ProcessorTopology:
				KSTREAM-SOURCE-0000000003:
					topics:		[topic 2]
					children:	[KSTREAM-AGGREGATE-0000000004, KSTREAM-LEFTJOIN-0000000005]
				KSTREAM-AGGREGATE-0000000004:
					states:		[Counts]
				KSTREAM-LEFTJOIN-0000000005:
					states:		[Counts]
{code} 

This kind of optimization should be automatic in Streams, which we can consider doing when extending from one-operator-at-a-time translation.


> Avoid duplicated repartitioning in KStream DSL
> ----------------------------------------------
>
>                 Key: KAFKA-4601
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4601
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>              Labels: performance
>
> Consider the following DSL:
> {code}
> Stream<String, String> source = builder.stream(Serdes.String(), Serdes.String(), "topic1");
> Stream<String, String> mapped = source.map(..);
>         KTable<String, Long> counts = mapped
>                 .groupByKey()
>                 .count("Counts");
>         KStream<String, String> sink = mapped.leftJoin(counts, ..);
> {code}
> The resulted topology looks like this:
> {code}
> ProcessorTopology:
> 				KSTREAM-SOURCE-0000000000:
> 					topics:		[topic1]
> 					children:	[KSTREAM-MAP-0000000001]
> 				KSTREAM-MAP-0000000001:
> 					children:	[KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000007]
> 				KSTREAM-FILTER-0000000004:
> 					children:	[KSTREAM-SINK-0000000003]
> 				KSTREAM-SINK-0000000003:
> 					topic:		X-Counts-repartition
> 				KSTREAM-FILTER-0000000007:
> 					children:	[KSTREAM-SINK-0000000006]
> 				KSTREAM-SINK-0000000006:
> 					topic:		X-KSTREAM-MAP-0000000001-repartition
> ProcessorTopology:
> 				KSTREAM-SOURCE-0000000008:
> 					topics:		[X-KSTREAM-MAP-0000000001-repartition]
> 					children:	[KSTREAM-LEFTJOIN-0000000009]
> 				KSTREAM-LEFTJOIN-0000000009:
> 					states:		[Counts]
> 				KSTREAM-SOURCE-0000000005:
> 					topics:		[X-Counts-repartition]
> 					children:	[KSTREAM-AGGREGATE-0000000002]
> 				KSTREAM-AGGREGATE-0000000002:
> 					states:		[Counts]
> {code}
> I.e. there are two repartition topics, one for the aggregate and one for the join, which not only introduce unnecessary overheads but also mess up the processing ordering (users are expecting each record to go through aggregation first then the join operator). And in order to get the following simpler topology users today need to add a {{through}} operator after {{map}} manually to enforce repartitioning.
> {code}
> Stream<String, String> source = builder.stream(Serdes.String(), Serdes.String(), "topic1");
> Stream<String, String> repartitioned = source.map(..).through("topic2");
>         KTable<String, Long> counts = repartitioned
>                 .groupByKey()
>                 .count("Counts");
>         KStream<String, String> sink = repartitioned.leftJoin(counts, ..);
> {code}
> The resulted topology then will look like this:
> {code}
> ProcessorTopology:
> 				KSTREAM-SOURCE-0000000000:
> 					topics:		[topic1]
> 					children:	[KSTREAM-MAP-0000000001]
> 				KSTREAM-MAP-0000000001:
> 					children:	[KSTREAM-SINK-0000000002]
> 				KSTREAM-SINK-0000000002:
> 					topic:		topic 2
> ProcessorTopology:
> 				KSTREAM-SOURCE-0000000003:
> 					topics:		[topic 2]
> 					children:	[KSTREAM-AGGREGATE-0000000004, KSTREAM-LEFTJOIN-0000000005]
> 				KSTREAM-AGGREGATE-0000000004:
> 					states:		[Counts]
> 				KSTREAM-LEFTJOIN-0000000005:
> 					states:		[Counts]
> {code} 
> This kind of optimization should be automatic in Streams, which we can consider doing when extending from one-operator-at-a-time translation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)