You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Alan Woodward <al...@flax.co.uk> on 2016/04/14 12:14:49 UTC

KStreams - reading from the start of a stream

Hi all,

I spoke at the London Kafka meetup last night about searching streaming data (write-up here: http://www.flax.co.uk/blog/2016/04/14/apache-kafka-london-meetup-real-time-search-insights/), and as part of the preparation for the talk I tried porting some Samza code I have to the KStreams library.

My first impressions are that kstreams is very nice, particularly when it comes to deploying and testing (no need for Yarn, yay!).  I have a couple of questions though:

- I couldn't work out to open a stream and read it from the beginning.  My usecase here is a cached search engine, where stored queries are run over documents in a stream.  Both queries and documents are stored in Kafka topics, and when the processor starts it needs to read the query topic from the beginning to construct its cache.  StreamBuilder.table() and StreamBuilder.stream() seem to create consumers that join the end of topics only.

- There doesn't seem to be be a nice way of closing resources on shutdown.  Is there a plan to add shutdown hooks, or maybe a KafkaStreams.join() method that waits for the internal threads to be interrupted?

My (broken!) code can be found here: https://github.com/romseygeek/luwak-kstream/blob/master/src/main/java/com/flaxsearch/luwak_kstreams/StreamMonitor.java

Thanks,

Alan Woodward
www.flax.co.uk



Re: KStreams - reading from the start of a stream

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Alan,

Kafka Streams library rely on the underlying Kafka Consumer client for
reading data, and you can pass any consumer configs into the StreamsConfig
as well. For example in this example you can see that developers can set
both streams configs as well as underlying client configs into the same
StreamsConfig, which will be pass around into the underlying client as well:

https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java#L56

And for your case you can override the "auto.offset.reset" to earliest so
that consumers will start from the beginning of the log IF there is no
offsets committed to Kafka from your previous run (so for your case, you
need to not commit offsets); but bare in mind that this config is
universal: all streams will be consumed from beginning of the log affected
by this config.

We are thinking of providing some finer grained flow control as well, but
we do not have a concrete proposal yet:

https://issues.apache.org/jira/browse/KAFKA-3478


About closing resources, are these "resources" application specific or
internal to Kafka Streams? Currently KafkaStreams.close() function will
interrupt and stop the stream threads, checkpoint the processing states,
and so on; but internal topics / data are kept just for restoration /
failure over process. And we are planning to add a feature that allows
users to "wipe out" all the internal state if they are sure that the
application can be completely "stopped" or just want to re-run from scratch:

https://issues.apache.org/jira/browse/KAFKA-3185



Guozhang


On Thu, Apr 14, 2016 at 3:14 AM, Alan Woodward <al...@flax.co.uk> wrote:

> Hi all,
>
> I spoke at the London Kafka meetup last night about searching streaming
> data (write-up here:
> http://www.flax.co.uk/blog/2016/04/14/apache-kafka-london-meetup-real-time-search-insights/),
> and as part of the preparation for the talk I tried porting some Samza code
> I have to the KStreams library.
>
> My first impressions are that kstreams is very nice, particularly when it
> comes to deploying and testing (no need for Yarn, yay!).  I have a couple
> of questions though:
>
> - I couldn't work out to open a stream and read it from the beginning.  My
> usecase here is a cached search engine, where stored queries are run over
> documents in a stream.  Both queries and documents are stored in Kafka
> topics, and when the processor starts it needs to read the query topic from
> the beginning to construct its cache.  StreamBuilder.table() and
> StreamBuilder.stream() seem to create consumers that join the end of topics
> only.
>
> - There doesn't seem to be be a nice way of closing resources on
> shutdown.  Is there a plan to add shutdown hooks, or maybe a
> KafkaStreams.join() method that waits for the internal threads to be
> interrupted?
>
> My (broken!) code can be found here:
> https://github.com/romseygeek/luwak-kstream/blob/master/src/main/java/com/flaxsearch/luwak_kstreams/StreamMonitor.java
>
> Thanks,
>
> Alan Woodward
> www.flax.co.uk
>
>
>


-- 
-- Guozhang