You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Walker Carlson (Jira)" <ji...@apache.org> on 2019/12/13 22:18:00 UTC
[jira] [Created] (KAFKA-9299) Over eager optimization
Walker Carlson created KAFKA-9299:
-------------------------------------
Summary: Over eager optimization
Key: KAFKA-9299
URL: https://issues.apache.org/jira/browse/KAFKA-9299
Project: Kafka
Issue Type: Task
Components: streams
Reporter: Walker Carlson
There are a few cases where the optimizer will attempt an optimization that can cause a copartitioning failure. Known case of this are related to join and cogroup, however could also effect merge or others.
Take for example three input topics A, B and C with 2, 3 and 4 partitions respectively.
B' = B.map();
B'.join(A)
B'.join(C)
the optimizer will push up the repartition upstream and with will cause the copartitioning to fail.
Can be seen with the following test:
@Test
public void shouldInsertRepartitionsTopicForCogroupsUsedTwice() {
final StreamsBuilder builder = new StreamsBuilder();
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
final KGroupedStream<String, String> groupedOne = stream1.map((k, v) -> new KeyValue<>(v, k)).groupByKey(Grouped.as("foo"));
final CogroupedKStream<String, String> one = groupedOne.cogroup(STRING_AGGREGATOR);
one.aggregate(STRING_INITIALIZER);
one.aggregate(STRING_INITIALIZER);
final String topologyDescription = builder.build(properties).describe().toString();
System.err.println(topologyDescription);
}
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [one])
--> KSTREAM-MAP-0000000001
Processor: KSTREAM-MAP-0000000001 (stores: [])
--> foo-repartition-filter
<-- KSTREAM-SOURCE-0000000000
Processor: foo-repartition-filter (stores: [])
--> foo-repartition-sink
<-- KSTREAM-MAP-0000000001
Sink: foo-repartition-sink (topic: foo-repartition)
<-- foo-repartition-filter
Sub-topology: 1
Source: foo-repartition-source (topics: [foo-repartition])
--> COGROUPKSTREAM-AGGREGATE-0000000006, COGROUPKSTREAM-AGGREGATE-0000000012
Processor: COGROUPKSTREAM-AGGREGATE-0000000006 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000002])
--> COGROUPKSTREAM-MERGE-0000000007
<-- foo-repartition-source
Processor: COGROUPKSTREAM-AGGREGATE-0000000012 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000008])
--> COGROUPKSTREAM-MERGE-0000000013
<-- foo-repartition-source
Processor: COGROUPKSTREAM-MERGE-0000000007 (stores: [])
--> none
<-- COGROUPKSTREAM-AGGREGATE-0000000006
Processor: COGROUPKSTREAM-MERGE-0000000013 (stores: [])
--> none
<-- COGROUPKSTREAM-AGGREGATE-0000000012
--
This message was sent by Atlassian Jira
(v8.3.4#803005)