You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ricardo Costa <rd...@gmail.com> on 2017/08/12 16:13:30 UTC

Forwarding consumer with kafka streams

Hi,

I've implemented a forwarding consumer which literally just consumes the
messages from a source topic, logs them and then publishes them to a target
topic.

I wanted to keep the implementation simple with very little code so I went
with kafka-streams. I have a really simple topology with a source for the
source topic, a sink for the target topic and a logging processor
in-between.

I'm quite happy with the solution, really simple and elegant, I ran some
basic tests and everything seemed to be working. As I went on to build more
test cases, I found that the stream only does its thing if I push messages
to the source topic *after* creating the stream and waiting until it is
fully initialized. Is this the expected behaviour? I need the stream to be
started at any point in time and forward the messages that were buffered on
the source topic until then. Are kafka-streams not fit for this use case?
Or am I missing something?

Thanks in advance!

--
Ricardo

Re: Forwarding consumer with kafka streams

Posted by Ricardo Costa <rd...@gmail.com>.
Thanks you your help Eno and Guozhang.

Indeed I missed the obvious, I made a bad assumption about defaults, should
have checked the source code. I thought Kafka Streams was setting
AUTO_OFFSET_RESET_CONFIG to "earliest", and it is, but not for the version
I'm using! I'm using version 0.10.0.1 which is not touching the
AUTO_OFFSET_RESET_CONFIG
 default (which as you know is "latest"). Comparing to what's in trunk for
StreamsConfig seems like a few things changed since version 0.10.0.1.

Thanks again guys, really appreciate it.

--
Ricardo

On Mon, Aug 14, 2017 at 1:15 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Richardo,
>
> What you described seems very similar to the demo example code as stated
> here:
> https://github.com/apache/kafka/blob/trunk/streams/
> quickstart/java/src/main/resources/archetype-resources/
> src/main/java/Pipe.java
>
> If you started the program it should just pipe all data starting from the
> earliest offset and pipe it to the target topic, no matter how much data
> the source topic already have stored.
>
>
> Guozhang
>
>
>
> On Sat, Aug 12, 2017 at 2:35 PM, Eno Thereska <en...@gmail.com>
> wrote:
>
> > Hi Ricardo,
> >
> > Kafka Streams should handle that case as well. What streams config are
> you
> > using, could you share it? There is one parameter that is called
> > “ConsumerConfig.AUTO_OFFSET_RESET_CONFIG” and by default it’s set to
> > “earliest”. Any chance your app has changed it to “latest”?
> >
> > Thanks
> > Eno
> >
> > > On Aug 12, 2017, at 5:13 PM, Ricardo Costa <rd...@gmail.com> wrote:
> > >
> > > Hi,
> > >
> > > I've implemented a forwarding consumer which literally just consumes
> the
> > > messages from a source topic, logs them and then publishes them to a
> > target
> > > topic.
> > >
> > > I wanted to keep the implementation simple with very little code so I
> > went
> > > with kafka-streams. I have a really simple topology with a source for
> the
> > > source topic, a sink for the target topic and a logging processor
> > > in-between.
> > >
> > > I'm quite happy with the solution, really simple and elegant, I ran
> some
> > > basic tests and everything seemed to be working. As I went on to build
> > more
> > > test cases, I found that the stream only does its thing if I push
> > messages
> > > to the source topic *after* creating the stream and waiting until it is
> > > fully initialized. Is this the expected behaviour? I need the stream to
> > be
> > > started at any point in time and forward the messages that were
> buffered
> > on
> > > the source topic until then. Are kafka-streams not fit for this use
> case?
> > > Or am I missing something?
> > >
> > > Thanks in advance!
> > >
> > > --
> > > Ricardo
> >
> >
>
>
> --
> -- Guozhang
>

Re: Forwarding consumer with kafka streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Richardo,

What you described seems very similar to the demo example code as stated
here:
https://github.com/apache/kafka/blob/trunk/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/Pipe.java

If you started the program it should just pipe all data starting from the
earliest offset and pipe it to the target topic, no matter how much data
the source topic already have stored.


Guozhang



On Sat, Aug 12, 2017 at 2:35 PM, Eno Thereska <en...@gmail.com>
wrote:

> Hi Ricardo,
>
> Kafka Streams should handle that case as well. What streams config are you
> using, could you share it? There is one parameter that is called
> “ConsumerConfig.AUTO_OFFSET_RESET_CONFIG” and by default it’s set to
> “earliest”. Any chance your app has changed it to “latest”?
>
> Thanks
> Eno
>
> > On Aug 12, 2017, at 5:13 PM, Ricardo Costa <rd...@gmail.com> wrote:
> >
> > Hi,
> >
> > I've implemented a forwarding consumer which literally just consumes the
> > messages from a source topic, logs them and then publishes them to a
> target
> > topic.
> >
> > I wanted to keep the implementation simple with very little code so I
> went
> > with kafka-streams. I have a really simple topology with a source for the
> > source topic, a sink for the target topic and a logging processor
> > in-between.
> >
> > I'm quite happy with the solution, really simple and elegant, I ran some
> > basic tests and everything seemed to be working. As I went on to build
> more
> > test cases, I found that the stream only does its thing if I push
> messages
> > to the source topic *after* creating the stream and waiting until it is
> > fully initialized. Is this the expected behaviour? I need the stream to
> be
> > started at any point in time and forward the messages that were buffered
> on
> > the source topic until then. Are kafka-streams not fit for this use case?
> > Or am I missing something?
> >
> > Thanks in advance!
> >
> > --
> > Ricardo
>
>


-- 
-- Guozhang

Re: Forwarding consumer with kafka streams

Posted by Eno Thereska <en...@gmail.com>.
Hi Ricardo,

Kafka Streams should handle that case as well. What streams config are you using, could you share it? There is one parameter that is called “ConsumerConfig.AUTO_OFFSET_RESET_CONFIG” and by default it’s set to “earliest”. Any chance your app has changed it to “latest”?

Thanks
Eno

> On Aug 12, 2017, at 5:13 PM, Ricardo Costa <rd...@gmail.com> wrote:
> 
> Hi,
> 
> I've implemented a forwarding consumer which literally just consumes the
> messages from a source topic, logs them and then publishes them to a target
> topic.
> 
> I wanted to keep the implementation simple with very little code so I went
> with kafka-streams. I have a really simple topology with a source for the
> source topic, a sink for the target topic and a logging processor
> in-between.
> 
> I'm quite happy with the solution, really simple and elegant, I ran some
> basic tests and everything seemed to be working. As I went on to build more
> test cases, I found that the stream only does its thing if I push messages
> to the source topic *after* creating the stream and waiting until it is
> fully initialized. Is this the expected behaviour? I need the stream to be
> started at any point in time and forward the messages that were buffered on
> the source topic until then. Are kafka-streams not fit for this use case?
> Or am I missing something?
> 
> Thanks in advance!
> 
> --
> Ricardo