You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Walker Carlson (Jira)" <ji...@apache.org> on 2019/12/13 22:18:00 UTC

[jira] [Created] (KAFKA-9299) Over eager optimization

Walker Carlson created KAFKA-9299:
-------------------------------------

             Summary: Over eager optimization
                 Key: KAFKA-9299
                 URL: https://issues.apache.org/jira/browse/KAFKA-9299
             Project: Kafka
          Issue Type: Task
          Components: streams
            Reporter: Walker Carlson


There are a few cases where the optimizer will attempt an optimization that can cause a copartitioning failure. Known case of this are related to join and cogroup, however could also effect merge or others. 

Take for example three input topics A, B and C  with 2, 3 and 4 partitions respectively.

B' = B.map();

B'.join(A)

B'.join(C)

 

the optimizer will push up the repartition upstream and with will cause the copartitioning to fail.

Can be seen with the following test:

    @Test
    public void shouldInsertRepartitionsTopicForCogroupsUsedTwice() {
        final StreamsBuilder builder = new StreamsBuilder();

        final Properties properties = new Properties();
        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);

        final KStream<String, String> stream1 = builder.stream("one", stringConsumed);

        final KGroupedStream<String, String> groupedOne = stream1.map((k, v) -> new KeyValue<>(v, k)).groupByKey(Grouped.as("foo"));

        final CogroupedKStream<String, String> one = groupedOne.cogroup(STRING_AGGREGATOR);
        one.aggregate(STRING_INITIALIZER);
        one.aggregate(STRING_INITIALIZER);

        final String topologyDescription = builder.build(properties).describe().toString();

        System.err.println(topologyDescription);
    }

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [one])
      --> KSTREAM-MAP-0000000001
    Processor: KSTREAM-MAP-0000000001 (stores: [])
      --> foo-repartition-filter
      <-- KSTREAM-SOURCE-0000000000
    Processor: foo-repartition-filter (stores: [])
      --> foo-repartition-sink
      <-- KSTREAM-MAP-0000000001
    Sink: foo-repartition-sink (topic: foo-repartition)
      <-- foo-repartition-filter

  Sub-topology: 1
    Source: foo-repartition-source (topics: [foo-repartition])
      --> COGROUPKSTREAM-AGGREGATE-0000000006, COGROUPKSTREAM-AGGREGATE-0000000012
    Processor: COGROUPKSTREAM-AGGREGATE-0000000006 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000002])
      --> COGROUPKSTREAM-MERGE-0000000007
      <-- foo-repartition-source
    Processor: COGROUPKSTREAM-AGGREGATE-0000000012 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000008])
      --> COGROUPKSTREAM-MERGE-0000000013
      <-- foo-repartition-source
    Processor: COGROUPKSTREAM-MERGE-0000000007 (stores: [])
      --> none
      <-- COGROUPKSTREAM-AGGREGATE-0000000006
    Processor: COGROUPKSTREAM-MERGE-0000000013 (stores: [])
      --> none
      <-- COGROUPKSTREAM-AGGREGATE-0000000012




--
This message was sent by Atlassian Jira
(v8.3.4#803005)