You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Kai Jiang <ji...@gmail.com> on 2019/01/25 20:44:38 UTC

Re: Kafka stream only consume n messages


On 2018/09/06 00:32:14, "Matthias J. Sax" <ma...@confluent.io> wrote: 
> 1. There is not API for this
> 
> 2. I guess it might be possible, but might not be the best way to do it.
> 
> 3. That is also not possible.
> 
> 
> I would recommend something like this:
> 
> 
> > final AtomicBoolean shutdown = new AtomicBoolean(false);
> > 
> > StreamsBuilder builder = ...
> > 
> > KStream stream = builder.stream(...);
> > stream.foreach(new ForeachAction() {
> >     int processedMessages = 0;
> >     public void apply(K key, V value) {
> >       if (++processedMessages > 100) {
> >           shutdown.set(true);
> >       }
> >     }
> > });
> > 
> > stream.XXXX    // apply regular business logic
> > 
> > KafkaStreams streams = ....
> > 
> > streams.start();
> > 
> > while (!shutdown.get()) {
> >     Thread.sleep(100ms);
> > }
> > 
> > streams.close();
> 
> 
> 
> 
> 
> This would terminate after Streams processed 100 messages from one
> partition. You can make it more fancy of course. Using interceptors, you
> should be able do a similar thing.
> 
> Hope this helps.
> 
> 
> 
> -Matthias
> 
> On 8/7/18 5:18 PM, Kai Jiang wrote:
> > Hi community,
> > 
> > Context:
> > We are using Kafka stream to write an application. Generally, we did transformations on messages from one topic to another topic (not using join). 
> > 
> > In order to sneaking into output results a little bit before flowing it to destination topics, we want to tweak out a debug mode which allows us only consume certain number of messages (~1000) from source topic. And, messages after Kafka stream should be diverged into Stdout or files instead of destination topic. Thus, no message will produce to Kafka and we can get a sense of what result data looks like.
> > 
> > Questions:
> > 1. Is that possible to let Kafka stream consume only n messages from source topic and close stream?
> > 2. I think KafkaConsumerInterceptor is an option to count on messages. But, I don't know if there is a way to close Kafka stream when we reach a certain number.
> > 3. Another potential idea I have is do some changes on topology. Like, source node can only read messages in Topic X from offset A to offset B so that we can manually set.
> > 
> > I was wondering which approach is feasible or if there are other better solutions. Thanks!
> > 
> > Best,
> > Kai
> > 
> >   
> > 
> 
> Thank you!