You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by David Sebastián Manjo <ds...@sonoc.io> on 2020/09/14 13:30:11 UTC

Java Kafka Streams

Dear team,

I have a question but I don’t see documentation about it. I hope you can help me.
I have a Kafka producer and a Kafka consumer, both the first one and the second one are written in Java (Java 11).

The Kafka producer has start moment and end moment. On the other hand Kafka consumer is working with aggregate windowed operations. The business logic is:

final KStream<String, AggregateDto> transactions = builder.stream(kafkaProperties.getTopic(), Consumed.with(Serdes.String(), aggregateSerde));

// Topology
transactions
        .groupBy(this::groupedByTimeStampAndProtocolName)
        .windowedBy( TimeWindows
                .of( Duration.ofSeconds( kafkaProperties.getAggregateSeconds() ))
                .grace( Duration.ofMillis( kafkaProperties.getGraceAggregateMillis() )))
        .aggregate(
                tool::emptyAggregate,
                this::processNewRecord,
                Materialized.<String, AggregateDto, WindowStore<Bytes, byte[]>>as(TRANSACTION_AGGREGATE)
                        .withKeySerde(Serdes.String())
                        .withValueSerde(aggregateSerde)
        )
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
        .toStream()
        .foreach(sendAggregatesToCassandra);

All are working properly and Kafka is helping a lot, but we have a trouble. Like I said before, Kafka producer has start and end moment and Kafka consumer is working with windows.

Please, imagine we need N windowed operations to consume all the data from the Kafka producer, the Kafka consumer consumes all data from moment 0 to moment N-1.
We cannot process the last window because Kafka retains it, (for maintains the data consistency) but the producer will never send data anymore.

Is there any way to consume the last window (for instance after 15 minutes or 1 hour…) in the Kafka consumer?


Thanks in advance for all the information and help.



Best regards.


	
Sebastián Manjón David
Senior Java Software Engineer
dsebastian@sonoc.io <ma...@sonoc.io>
SONOC
C/ Josefa Amar y Borbón, 10, 4ª · 50001 Zaragoza, España
Tlf: +34 917019888 · www.sonoc.io <http://www.sonoc.io/>

Re: Java Kafka Streams

Posted by John Roesler <vv...@apache.org>.
Hello Sebastián,

Thanks for the question. I think the reason you don't find
documentation for this is that it's quite unusual to have a
data stream that just stops at some point and never sends
any more data.

Can you elaborate a little whether you meant that the whole
input topic just "ends" at some point, or are there maybe
just different producers responsible for different keys, and
each key has a finite lifespan?

As you noted, the issue is that the "untilWindowCloses"
directive uses stream time, which stops advancing when the
input stream stops. If it's really the case that the whole
input topic has an "end", and if the producer knows when the
end is reached, one thing it could do is produce one last
"end of topic" message to every partition, which has a
timestamp calculated to advance the stream time of every
partition. This might be tricky if you have other
intermediate repartition operations, though.

I hope this helps,
-John

On Mon, 2020-09-14 at 15:30 +0200, David Sebastián Manjo
wrote:
> Dear team,
> 
> I have a question but I don’t see documentation about it. I hope you can help me.
> I have a Kafka producer and a Kafka consumer, both the first one and the second one are written in Java (Java 11).
> 
> The Kafka producer has start moment and end moment. On the other hand Kafka consumer is working with aggregate windowed operations. The business logic is:
> 
> final KStream<String, AggregateDto> transactions = builder.stream(kafkaProperties.getTopic(), Consumed.with(Serdes.String(), aggregateSerde));
> 
> // Topology
> transactions
>         .groupBy(this::groupedByTimeStampAndProtocolName)
>         .windowedBy( TimeWindows
>                 .of( Duration.ofSeconds( kafkaProperties.getAggregateSeconds() ))
>                 .grace( Duration.ofMillis( kafkaProperties.getGraceAggregateMillis() )))
>         .aggregate(
>                 tool::emptyAggregate,
>                 this::processNewRecord,
>                 Materialized.<String, AggregateDto, WindowStore<Bytes, byte[]>>as(TRANSACTION_AGGREGATE)
>                         .withKeySerde(Serdes.String())
>                         .withValueSerde(aggregateSerde)
>         )
>         .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>         .toStream()
>         .foreach(sendAggregatesToCassandra);
> 
> All are working properly and Kafka is helping a lot, but we have a trouble. Like I said before, Kafka producer has start and end moment and Kafka consumer is working with windows.
> 
> Please, imagine we need N windowed operations to consume all the data from the Kafka producer, the Kafka consumer consumes all data from moment 0 to moment N-1.
> We cannot process the last window because Kafka retains it, (for maintains the data consistency) but the producer will never send data anymore.
> 
> Is there any way to consume the last window (for instance after 15 minutes or 1 hour…) in the Kafka consumer?
> 
> 
> Thanks in advance for all the information and help.
> 
> 
> 
> Best regards.
> 
> 
> 	
> Sebastián Manjón David
> Senior Java Software Engineer
> dsebastian@sonoc.io <ma...@sonoc.io>
> SONOC
> C/ Josefa Amar y Borbón, 10, 4ª · 50001 Zaragoza, España
> Tlf: +34 917019888 · www.sonoc.io <http://www.sonoc.io/>