You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Abdullah alkhawatrah (Jira)" <ji...@apache.org> on 2023/10/12 10:17:00 UTC

[jira] [Created] (KAFKA-15595) Session window aggregate drops records headers

Abdullah alkhawatrah created KAFKA-15595:
--------------------------------------------

             Summary: Session window aggregate drops records headers
                 Key: KAFKA-15595
                 URL: https://issues.apache.org/jira/browse/KAFKA-15595
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.5.1
            Reporter: Abdullah alkhawatrah


Hey,

While upgrading to 3.5.1 from 3.2.X I noticed a change in SessionWindow aggregate behaviour, it seems now that custom headers added before the aggregate are dropped.

I could reproduce the behaviour with the following test topology:
{code:java}
// code placeholder
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(EARLIEST))
        .process(() -> new Processor<Object, Object, Object, Object>() {
            private ProcessorContext<Object, Object> context;

            @Override
            public void init(final ProcessorContext<Object, Object> context) {
                this.context = context;
            }

            @Override
            public void process(Record<Object, Object> record) {
                record.headers().add("key1", record.value().toString().getBytes());
                context.forward(record);
            }
        })

        .groupByKey()
        .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofDays(1L), Duration.ofDays(1L)))
        .aggregate(() -> 1,
                (key, value, aggregate) -> aggregate,
                (aggKey, aggOne, aggTwo) -> aggTwo)
        .toStream()
        .map((key, value) -> new KeyValue<>(key.key(), value))
        .to(outputTopic); {code}
Checking evens in the `outputTopic` show that the headers are empty. With 3.2.* the same topology would have propagated the headers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)