You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Michael Viamari (Jira)" <ji...@apache.org> on 2019/11/21 17:14:00 UTC
[jira] [Created] (KAFKA-9222) StreamPartitioner for internal
repartition topics does not match defaults for to() operation
Michael Viamari created KAFKA-9222:
--------------------------------------
Summary: StreamPartitioner for internal repartition topics does not match defaults for to() operation
Key: KAFKA-9222
URL: https://issues.apache.org/jira/browse/KAFKA-9222
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.3.1
Reporter: Michael Viamari
When a KStream has a Windowed key, different StreamPartitions are selected depending on how the stream sink is generated.
When using `KStream#to()`, the topology uses a `StreamSinkNode`, which chooses a `WindowedStreamPartitioner` when no partitioner is provided when creating a `SinkNode` for the topology.
{code:java}
KTable<> aggResult = inputStream.groupByKey().windowed(...).aggregate(...); aggResult.toStream().to(aggStreamTopic)
{code}
When an internal repartition is created before a stateful operation, an `OptimizableRepartitionNode` is used, which results in a `SinkNode` being added to the topology. This node is created with a null partitioner, which then would always use the Producer default partitioner. This becomes an issue when attempting to join a windowed stream/ktable with a stream that was mapped into a windowed key.
{code:java}
KTable<> windowedAgg = inputStream.groupByKey().windowed(...).aggregate(...); windowedAgg.toStream().to(aggStreamTopic);
KStream<> windowedStream = inputStream.map((k, v) -> {
Map<Long, TimeWindow> w = windows.windowsFor(v.getTimestamp());
Window minW = getMinWindow(w.values());
return KeyValue.pair(new Windowed<>(k, minW), v);
});
windowedStream.leftJoin(windowedAgg, ....);
{code}
The only work around I've found is to either use the default partitioner for the `KStream#to()` operation, or to use `KStream.through()` for the repartition operation.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)