You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Renato Marroquín Mogrovejo <re...@gmail.com> on 2015/05/22 17:06:58 UTC

Fwd: Not able to consume produced data

Hi there,

I have developed an RSS reader application using Samza but I am having
problems when using kafka-console-consumer to read what has been gotten
from the RSS feeds.
My RssConsumer uses an object RssFeed which fetches (until stopped by the
RssConsumer) all new rss feeds and hands them back to the RssConsumer which
puts them into a single partition[2]. So from my logs I can see that there
are new feeds being put into my partition but I can not get them with
consoler consumer, I guess I am not putting them correctly or I am
producing some blocking behaviour somehow?
I using hello-samza 0.10 project which uses the latest Samza version as
well. I don't think this is an issue with Samza, it is probably something I
am doing wrong :(
Thanks in advance for any suggestions.


Renato M.

[1] https://github.com/renato2099/hello-samza
[2]
https://github.com/renato2099/hello-samza/blob/master/src/main/java/samza/examples/rss/system/RssConsumer.java#L42

Re: Not able to consume produced data

Posted by Renato Marroquín Mogrovejo <re...@gmail.com>.
Thanks for the input guys!

Yes, I did get that the <consumer> would be putting data into Samza. So
maybe I am not understanding something else :(
I have a RssSystemFactory which creates a RssConsumer (which
extends BlockingEnvelopMap and which in turn extends SystemConsumer). This
"RssConsumer" acts as an observer to an "RssFeed" which checks rss urls and
puts them into it because it my consumer is a BlockingEnvelopMap. My
"RssFeed" should only stop when "RssConsumer" calls its stop method.

This was my problem. I thought that this "RssFeed" created together with
the RssConsumer by the SystemFactory was already a separate thread, but it
wasn't. So now I have created a separate thread to be checking for rss
updates, and letting this thread to put the data into my consumer. This
works as I expected.

I execute the kafka consumer from the console:

deploy/kafka/bin/kafka-console-consumer.sh  --zookeeper localhost:2181
--topic rss-raw

Thanks for the input, this helped me review what I was doing wrong :)


Renato M.

2015-05-22 20:29 GMT+02:00 Yan Fang <ya...@gmail.com>:

> Hi Renato,
>
> There maybe a misunderstanding in the concept. Consumer is to feed the msgs
> into the Samza, while Producer is to send the msg from Samza to other
> systems. So if you implement the Consumer, should be able to see the msgs
> in the StreamTask. That's why you confuse Naveen.
>
> Cheers,
>
> Fang, Yan
> yanfang724@gmail.com
>
> On Fri, May 22, 2015 at 9:59 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Renato,
> >
> > Could you paste your console consumer command here?
> >
> > Guozhang
> >
> > On Fri, May 22, 2015 at 9:43 AM, Naveen S <na...@gmail.com> wrote:
> >
> > > Hey Renato,
> > >                     Is there any specific reason why you are extending
> > the
> > > blocking envelope class instead of implementing a StreamTask ?
> > > http://samza.apache.org/learn/documentation/0.9/api/overview.html
> > >
> > > --Naveen
> > >
> > >
> > > On Fri, May 22, 2015 at 8:06 AM, Renato Marroquín Mogrovejo <
> > > renatoj.marroquin@gmail.com> wrote:
> > >
> > > > Hi there,
> > > >
> > > > I have developed an RSS reader application using Samza but I am
> having
> > > > problems when using kafka-console-consumer to read what has been
> gotten
> > > > from the RSS feeds.
> > > > My RssConsumer uses an object RssFeed which fetches (until stopped by
> > the
> > > > RssConsumer) all new rss feeds and hands them back to the RssConsumer
> > > which
> > > > puts them into a single partition[2]. So from my logs I can see that
> > > there
> > > > are new feeds being put into my partition but I can not get them with
> > > > consoler consumer, I guess I am not putting them correctly or I am
> > > > producing some blocking behaviour somehow?
> > > > I using hello-samza 0.10 project which uses the latest Samza version
> as
> > > > well. I don't think this is an issue with Samza, it is probably
> > > something I
> > > > am doing wrong :(
> > > > Thanks in advance for any suggestions.
> > > >
> > > >
> > > > Renato M.
> > > >
> > > > [1] https://github.com/renato2099/hello-samza
> > > > [2]
> > > >
> > > >
> > >
> >
> https://github.com/renato2099/hello-samza/blob/master/src/main/java/samza/examples/rss/system/RssConsumer.java#L42
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: Not able to consume produced data

Posted by Yan Fang <ya...@gmail.com>.
Hi Renato,

There maybe a misunderstanding in the concept. Consumer is to feed the msgs
into the Samza, while Producer is to send the msg from Samza to other
systems. So if you implement the Consumer, should be able to see the msgs
in the StreamTask. That's why you confuse Naveen.

Cheers,

Fang, Yan
yanfang724@gmail.com

On Fri, May 22, 2015 at 9:59 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Renato,
>
> Could you paste your console consumer command here?
>
> Guozhang
>
> On Fri, May 22, 2015 at 9:43 AM, Naveen S <na...@gmail.com> wrote:
>
> > Hey Renato,
> >                     Is there any specific reason why you are extending
> the
> > blocking envelope class instead of implementing a StreamTask ?
> > http://samza.apache.org/learn/documentation/0.9/api/overview.html
> >
> > --Naveen
> >
> >
> > On Fri, May 22, 2015 at 8:06 AM, Renato Marroquín Mogrovejo <
> > renatoj.marroquin@gmail.com> wrote:
> >
> > > Hi there,
> > >
> > > I have developed an RSS reader application using Samza but I am having
> > > problems when using kafka-console-consumer to read what has been gotten
> > > from the RSS feeds.
> > > My RssConsumer uses an object RssFeed which fetches (until stopped by
> the
> > > RssConsumer) all new rss feeds and hands them back to the RssConsumer
> > which
> > > puts them into a single partition[2]. So from my logs I can see that
> > there
> > > are new feeds being put into my partition but I can not get them with
> > > consoler consumer, I guess I am not putting them correctly or I am
> > > producing some blocking behaviour somehow?
> > > I using hello-samza 0.10 project which uses the latest Samza version as
> > > well. I don't think this is an issue with Samza, it is probably
> > something I
> > > am doing wrong :(
> > > Thanks in advance for any suggestions.
> > >
> > >
> > > Renato M.
> > >
> > > [1] https://github.com/renato2099/hello-samza
> > > [2]
> > >
> > >
> >
> https://github.com/renato2099/hello-samza/blob/master/src/main/java/samza/examples/rss/system/RssConsumer.java#L42
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Not able to consume produced data

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Renato,

Could you paste your console consumer command here?

Guozhang

On Fri, May 22, 2015 at 9:43 AM, Naveen S <na...@gmail.com> wrote:

> Hey Renato,
>                     Is there any specific reason why you are extending the
> blocking envelope class instead of implementing a StreamTask ?
> http://samza.apache.org/learn/documentation/0.9/api/overview.html
>
> --Naveen
>
>
> On Fri, May 22, 2015 at 8:06 AM, Renato Marroquín Mogrovejo <
> renatoj.marroquin@gmail.com> wrote:
>
> > Hi there,
> >
> > I have developed an RSS reader application using Samza but I am having
> > problems when using kafka-console-consumer to read what has been gotten
> > from the RSS feeds.
> > My RssConsumer uses an object RssFeed which fetches (until stopped by the
> > RssConsumer) all new rss feeds and hands them back to the RssConsumer
> which
> > puts them into a single partition[2]. So from my logs I can see that
> there
> > are new feeds being put into my partition but I can not get them with
> > consoler consumer, I guess I am not putting them correctly or I am
> > producing some blocking behaviour somehow?
> > I using hello-samza 0.10 project which uses the latest Samza version as
> > well. I don't think this is an issue with Samza, it is probably
> something I
> > am doing wrong :(
> > Thanks in advance for any suggestions.
> >
> >
> > Renato M.
> >
> > [1] https://github.com/renato2099/hello-samza
> > [2]
> >
> >
> https://github.com/renato2099/hello-samza/blob/master/src/main/java/samza/examples/rss/system/RssConsumer.java#L42
> >
>



-- 
-- Guozhang

Re: Not able to consume produced data

Posted by Naveen S <na...@gmail.com>.
Hey Renato,
                    Is there any specific reason why you are extending the
blocking envelope class instead of implementing a StreamTask ?
http://samza.apache.org/learn/documentation/0.9/api/overview.html

--Naveen


On Fri, May 22, 2015 at 8:06 AM, Renato Marroquín Mogrovejo <
renatoj.marroquin@gmail.com> wrote:

> Hi there,
>
> I have developed an RSS reader application using Samza but I am having
> problems when using kafka-console-consumer to read what has been gotten
> from the RSS feeds.
> My RssConsumer uses an object RssFeed which fetches (until stopped by the
> RssConsumer) all new rss feeds and hands them back to the RssConsumer which
> puts them into a single partition[2]. So from my logs I can see that there
> are new feeds being put into my partition but I can not get them with
> consoler consumer, I guess I am not putting them correctly or I am
> producing some blocking behaviour somehow?
> I using hello-samza 0.10 project which uses the latest Samza version as
> well. I don't think this is an issue with Samza, it is probably something I
> am doing wrong :(
> Thanks in advance for any suggestions.
>
>
> Renato M.
>
> [1] https://github.com/renato2099/hello-samza
> [2]
>
> https://github.com/renato2099/hello-samza/blob/master/src/main/java/samza/examples/rss/system/RssConsumer.java#L42
>