You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Michael Viamari (Jira)" <ji...@apache.org> on 2019/11/21 17:14:00 UTC

[jira] [Created] (KAFKA-9222) StreamPartitioner for internal repartition topics does not match defaults for to() operation

Michael Viamari created KAFKA-9222:
--------------------------------------

             Summary: StreamPartitioner for internal repartition topics does not match defaults for to() operation
                 Key: KAFKA-9222
                 URL: https://issues.apache.org/jira/browse/KAFKA-9222
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.3.1
            Reporter: Michael Viamari


When a KStream has a Windowed key, different StreamPartitions are selected depending on how the stream sink is generated.

When using `KStream#to()`, the topology uses a `StreamSinkNode`, which chooses a `WindowedStreamPartitioner` when no partitioner is provided when creating a `SinkNode` for the topology.
{code:java}
KTable<> aggResult = inputStream.groupByKey().windowed(...).aggregate(...); aggResult.toStream().to(aggStreamTopic)
{code}
When an internal repartition is created before a stateful operation, an `OptimizableRepartitionNode` is used, which results in a `SinkNode` being added to the topology. This node is created with a null partitioner, which then would always use the Producer default partitioner. This becomes an issue when attempting to join a windowed stream/ktable with a stream that was mapped into a windowed key.
{code:java}
KTable<> windowedAgg = inputStream.groupByKey().windowed(...).aggregate(...); windowedAgg.toStream().to(aggStreamTopic);

KStream<> windowedStream = inputStream.map((k, v) -> {
    Map<Long, TimeWindow> w = windows.windowsFor(v.getTimestamp());
    Window minW = getMinWindow(w.values());
    return KeyValue.pair(new Windowed<>(k, minW), v);
});
windowedStream.leftJoin(windowedAgg, ....);
{code}
The only work around I've found is to either use the default partitioner for the `KStream#to()` operation, or to use `KStream.through()` for the repartition operation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)