You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by navneet sharma <na...@gmail.com> on 2013/01/14 19:34:33 UTC

hadoop-consumer code in contrib package

Hi,

I am trying to use the code supplied in hadoop-consumer package. I am
running into following issues:

1) This code is using SimpleConsumer which is actually contacting Kafka
Broker without Zookeeper. Because of which messages are not getting cleared
from broker.
And i am getting duplicate messages in each run.

2) The retention policy specified as log.retention.hours in
server.properties is not working. Not sure if its due to SimpleConsumer.

Is it expected behaviour. Is there any code using high level consumer for
same work?

Thanks,
Navneet Sharma

Re: hadoop-consumer code in contrib package

Posted by Jun Rao <ju...@gmail.com>.
That may be an alternative feasible approach. You can
call ConsumerConnector.shutdown() to close the consumer cleanly.

Thanks,

Jun

On Thu, Jan 17, 2013 at 6:20 AM, navneet sharma <navneetsharma0505@gmail.com
> wrote:

> That makes sense.
>
> I tried an alternate approach- i am using high level consumer and going
> through Hadoop HDFS APIs and pushing data in HDFS.
>
> I am not creating any jobs for that.
>
> The only problem i am seeing here is that the consumer is designed to run
> forever. Which means i need to find out how to close the HDFS file and kill
> consumer.
>
> Is there any way to kill or close high level consumer gracefully?
>
> I am running v0.7.0. I don't mind upgrading to higher version if that
> allows me this kind of consumer handling.
>
> Thanks,
> Navneet
>
>
> On Thu, Jan 17, 2013 at 10:41 AM, Jun Rao <ju...@gmail.com> wrote:
>
> > I think the main reason for using SimpleConsumer is to manage offsets
> > explicitly. For example, this is useful when Hadoop retries failed tasks.
> > Another reason is that Hadoop already does load balancing. So, there is
> not
> > much need to balance the load again using the high level consumer.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Jan 16, 2013 at 4:40 PM, navneet sharma <
> > navneetsharma0505@gmail.com
> > > wrote:
> >
> > > Thanks Felix.
> > >
> > > One question still remains. Why SimpleConsumer?
> > > Why not high level Consumer? If i change the code to high level
> consumer,
> > > will it create any challenges?
> > >
> > >
> > > Navneet
> > >
> > >
> > > On Tue, Jan 15, 2013 at 11:46 PM, Felix GV <fe...@mate1inc.com> wrote:
> > >
> > > > Please read the Kafka design paper <
> > http://kafka.apache.org/design.html
> > > >.
> > > >
> > > > It may look a little long, but it's as short as it can be. Kafka
> > differs
> > > > from other messaging system in a couple of ways, and it's important
> to
> > > > understand the fundamental design choices that were made in order to
> > > > understand the way Kafka works.
> > > >
> > > > I believe my previous email already answers both your offset tracking
> > and
> > > > retention questions, but if my explanation are not clear enough, then
> > the
> > > > next best thing is probably to read the design paper :)
> > > >
> > > > --
> > > > Felix
> > > >
> > > >
> > > > On Tue, Jan 15, 2013 at 12:01 PM, navneet sharma <
> > > > navneetsharma0505@gmail.com> wrote:
> > > >
> > > > > Thanks Felix for sharing your work. Contrib hadoop-consumer looks
> > like
> > > > the
> > > > > same way.
> > > > >
> > > > > I think i need to really understand this offset stuff. So far i
> have
> > > used
> > > > > only high level consumer.When consumer is done reading all the
> > > messages,
> > > > i
> > > > > used to kill the process(because it won't on its own).
> > > > >
> > > > > Again i used Producer to pump more messages and Consumer to read
> the
> > > new
> > > > > messages(which is a new process as i killed the last consumer).
> > > > >
> > > > > But i never saw messages getting duplicating.
> > > > >
> > > > > Now its not very clear for me that how offsets is tracked
> > specifically
> > > > when
> > > > > i am re-launching the consumer?
> > > > > And why retention policy is not working when used with
> > SimpleConsumer?
> > > > For
> > > > > my experiment i made it 4 hours.
> > > > >
> > > > > Please help me understand.
> > > > >
> > > > > Thanks,
> > > > > Navneet
> > > > >
> > > > >
> > > > > On Tue, Jan 15, 2013 at 4:12 AM, Felix GV <fe...@mate1inc.com>
> > wrote:
> > > > >
> > > > > > I think you may be misunderstanding the way Kafka works.
> > > > > >
> > > > > > A kafka broker is never supposed to clear messages just because a
> > > > > consumer
> > > > > > read them.
> > > > > >
> > > > > > The kafka broker will instead clear messages after their
> retention
> > > > period
> > > > > > ends, though it will not delete the messages at the exact time
> when
> > > > they
> > > > > > expire. Instead, a background process will periodically delete a
> > > batch
> > > > of
> > > > > > expired messages. The retention policies guarantee a minimum
> > > retention
> > > > > > time, not an exact retention time.
> > > > > >
> > > > > > It is the responsibility of each consumer to keep track of which
> > > > messages
> > > > > > they have consumed already (by recording an offset for each
> > consumed
> > > > > > partition). The high-level consumer stores these offsets in ZK.
> The
> > > > > simple
> > > > > > consumer has no built-in capability to store and manage offsets,
> so
> > > it
> > > > is
> > > > > > the developer's responsibility to do so. In the case of the
> hadoop
> > > > > consumer
> > > > > > in the contrib package, these offsets are stored in offset files
> > > within
> > > > > > HDFS.
> > > > > >
> > > > > > I wrote a blog post a while ago that explains how to use the
> offset
> > > > files
> > > > > > generated by the contrib consumer to do incremental consumption
> (so
> > > > that
> > > > > > you don't get duplicated messages by re-consuming everything in
> > > > > subsequent
> > > > > > runs).
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://felixgv.com/post/69/automating-incremental-imports-with-the-kafka-hadoop-consumer/
> > > > > >
> > > > > > I'm not sure how up to date this is, regarding the current Kafka
> > > > > versions,
> > > > > > but it may still give you some useful pointers...
> > > > > >
> > > > > > --
> > > > > > Felix
> > > > > >
> > > > > > --
> > > > > > Felix
> > > > > >
> > > > > >
> > > > > > On Mon, Jan 14, 2013 at 1:34 PM, navneet sharma <
> > > > > > navneetsharma0505@gmail.com
> > > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I am trying to use the code supplied in hadoop-consumer
> package.
> > I
> > > am
> > > > > > > running into following issues:
> > > > > > >
> > > > > > > 1) This code is using SimpleConsumer which is actually
> contacting
> > > > Kafka
> > > > > > > Broker without Zookeeper. Because of which messages are not
> > getting
> > > > > > cleared
> > > > > > > from broker.
> > > > > > > And i am getting duplicate messages in each run.
> > > > > > >
> > > > > > > 2) The retention policy specified as log.retention.hours in
> > > > > > > server.properties is not working. Not sure if its due to
> > > > > SimpleConsumer.
> > > > > > >
> > > > > > > Is it expected behaviour. Is there any code using high level
> > > consumer
> > > > > for
> > > > > > > same work?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Navneet Sharma
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: hadoop-consumer code in contrib package

Posted by navneet sharma <na...@gmail.com>.
That makes sense.

I tried an alternate approach- i am using high level consumer and going
through Hadoop HDFS APIs and pushing data in HDFS.

I am not creating any jobs for that.

The only problem i am seeing here is that the consumer is designed to run
forever. Which means i need to find out how to close the HDFS file and kill
consumer.

Is there any way to kill or close high level consumer gracefully?

I am running v0.7.0. I don't mind upgrading to higher version if that
allows me this kind of consumer handling.

Thanks,
Navneet


On Thu, Jan 17, 2013 at 10:41 AM, Jun Rao <ju...@gmail.com> wrote:

> I think the main reason for using SimpleConsumer is to manage offsets
> explicitly. For example, this is useful when Hadoop retries failed tasks.
> Another reason is that Hadoop already does load balancing. So, there is not
> much need to balance the load again using the high level consumer.
>
> Thanks,
>
> Jun
>
> On Wed, Jan 16, 2013 at 4:40 PM, navneet sharma <
> navneetsharma0505@gmail.com
> > wrote:
>
> > Thanks Felix.
> >
> > One question still remains. Why SimpleConsumer?
> > Why not high level Consumer? If i change the code to high level consumer,
> > will it create any challenges?
> >
> >
> > Navneet
> >
> >
> > On Tue, Jan 15, 2013 at 11:46 PM, Felix GV <fe...@mate1inc.com> wrote:
> >
> > > Please read the Kafka design paper <
> http://kafka.apache.org/design.html
> > >.
> > >
> > > It may look a little long, but it's as short as it can be. Kafka
> differs
> > > from other messaging system in a couple of ways, and it's important to
> > > understand the fundamental design choices that were made in order to
> > > understand the way Kafka works.
> > >
> > > I believe my previous email already answers both your offset tracking
> and
> > > retention questions, but if my explanation are not clear enough, then
> the
> > > next best thing is probably to read the design paper :)
> > >
> > > --
> > > Felix
> > >
> > >
> > > On Tue, Jan 15, 2013 at 12:01 PM, navneet sharma <
> > > navneetsharma0505@gmail.com> wrote:
> > >
> > > > Thanks Felix for sharing your work. Contrib hadoop-consumer looks
> like
> > > the
> > > > same way.
> > > >
> > > > I think i need to really understand this offset stuff. So far i have
> > used
> > > > only high level consumer.When consumer is done reading all the
> > messages,
> > > i
> > > > used to kill the process(because it won't on its own).
> > > >
> > > > Again i used Producer to pump more messages and Consumer to read the
> > new
> > > > messages(which is a new process as i killed the last consumer).
> > > >
> > > > But i never saw messages getting duplicating.
> > > >
> > > > Now its not very clear for me that how offsets is tracked
> specifically
> > > when
> > > > i am re-launching the consumer?
> > > > And why retention policy is not working when used with
> SimpleConsumer?
> > > For
> > > > my experiment i made it 4 hours.
> > > >
> > > > Please help me understand.
> > > >
> > > > Thanks,
> > > > Navneet
> > > >
> > > >
> > > > On Tue, Jan 15, 2013 at 4:12 AM, Felix GV <fe...@mate1inc.com>
> wrote:
> > > >
> > > > > I think you may be misunderstanding the way Kafka works.
> > > > >
> > > > > A kafka broker is never supposed to clear messages just because a
> > > > consumer
> > > > > read them.
> > > > >
> > > > > The kafka broker will instead clear messages after their retention
> > > period
> > > > > ends, though it will not delete the messages at the exact time when
> > > they
> > > > > expire. Instead, a background process will periodically delete a
> > batch
> > > of
> > > > > expired messages. The retention policies guarantee a minimum
> > retention
> > > > > time, not an exact retention time.
> > > > >
> > > > > It is the responsibility of each consumer to keep track of which
> > > messages
> > > > > they have consumed already (by recording an offset for each
> consumed
> > > > > partition). The high-level consumer stores these offsets in ZK. The
> > > > simple
> > > > > consumer has no built-in capability to store and manage offsets, so
> > it
> > > is
> > > > > the developer's responsibility to do so. In the case of the hadoop
> > > > consumer
> > > > > in the contrib package, these offsets are stored in offset files
> > within
> > > > > HDFS.
> > > > >
> > > > > I wrote a blog post a while ago that explains how to use the offset
> > > files
> > > > > generated by the contrib consumer to do incremental consumption (so
> > > that
> > > > > you don't get duplicated messages by re-consuming everything in
> > > > subsequent
> > > > > runs).
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> http://felixgv.com/post/69/automating-incremental-imports-with-the-kafka-hadoop-consumer/
> > > > >
> > > > > I'm not sure how up to date this is, regarding the current Kafka
> > > > versions,
> > > > > but it may still give you some useful pointers...
> > > > >
> > > > > --
> > > > > Felix
> > > > >
> > > > > --
> > > > > Felix
> > > > >
> > > > >
> > > > > On Mon, Jan 14, 2013 at 1:34 PM, navneet sharma <
> > > > > navneetsharma0505@gmail.com
> > > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I am trying to use the code supplied in hadoop-consumer package.
> I
> > am
> > > > > > running into following issues:
> > > > > >
> > > > > > 1) This code is using SimpleConsumer which is actually contacting
> > > Kafka
> > > > > > Broker without Zookeeper. Because of which messages are not
> getting
> > > > > cleared
> > > > > > from broker.
> > > > > > And i am getting duplicate messages in each run.
> > > > > >
> > > > > > 2) The retention policy specified as log.retention.hours in
> > > > > > server.properties is not working. Not sure if its due to
> > > > SimpleConsumer.
> > > > > >
> > > > > > Is it expected behaviour. Is there any code using high level
> > consumer
> > > > for
> > > > > > same work?
> > > > > >
> > > > > > Thanks,
> > > > > > Navneet Sharma
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: hadoop-consumer code in contrib package

Posted by Jun Rao <ju...@gmail.com>.
I think the main reason for using SimpleConsumer is to manage offsets
explicitly. For example, this is useful when Hadoop retries failed tasks.
Another reason is that Hadoop already does load balancing. So, there is not
much need to balance the load again using the high level consumer.

Thanks,

Jun

On Wed, Jan 16, 2013 at 4:40 PM, navneet sharma <navneetsharma0505@gmail.com
> wrote:

> Thanks Felix.
>
> One question still remains. Why SimpleConsumer?
> Why not high level Consumer? If i change the code to high level consumer,
> will it create any challenges?
>
>
> Navneet
>
>
> On Tue, Jan 15, 2013 at 11:46 PM, Felix GV <fe...@mate1inc.com> wrote:
>
> > Please read the Kafka design paper <http://kafka.apache.org/design.html
> >.
> >
> > It may look a little long, but it's as short as it can be. Kafka differs
> > from other messaging system in a couple of ways, and it's important to
> > understand the fundamental design choices that were made in order to
> > understand the way Kafka works.
> >
> > I believe my previous email already answers both your offset tracking and
> > retention questions, but if my explanation are not clear enough, then the
> > next best thing is probably to read the design paper :)
> >
> > --
> > Felix
> >
> >
> > On Tue, Jan 15, 2013 at 12:01 PM, navneet sharma <
> > navneetsharma0505@gmail.com> wrote:
> >
> > > Thanks Felix for sharing your work. Contrib hadoop-consumer looks like
> > the
> > > same way.
> > >
> > > I think i need to really understand this offset stuff. So far i have
> used
> > > only high level consumer.When consumer is done reading all the
> messages,
> > i
> > > used to kill the process(because it won't on its own).
> > >
> > > Again i used Producer to pump more messages and Consumer to read the
> new
> > > messages(which is a new process as i killed the last consumer).
> > >
> > > But i never saw messages getting duplicating.
> > >
> > > Now its not very clear for me that how offsets is tracked specifically
> > when
> > > i am re-launching the consumer?
> > > And why retention policy is not working when used with SimpleConsumer?
> > For
> > > my experiment i made it 4 hours.
> > >
> > > Please help me understand.
> > >
> > > Thanks,
> > > Navneet
> > >
> > >
> > > On Tue, Jan 15, 2013 at 4:12 AM, Felix GV <fe...@mate1inc.com> wrote:
> > >
> > > > I think you may be misunderstanding the way Kafka works.
> > > >
> > > > A kafka broker is never supposed to clear messages just because a
> > > consumer
> > > > read them.
> > > >
> > > > The kafka broker will instead clear messages after their retention
> > period
> > > > ends, though it will not delete the messages at the exact time when
> > they
> > > > expire. Instead, a background process will periodically delete a
> batch
> > of
> > > > expired messages. The retention policies guarantee a minimum
> retention
> > > > time, not an exact retention time.
> > > >
> > > > It is the responsibility of each consumer to keep track of which
> > messages
> > > > they have consumed already (by recording an offset for each consumed
> > > > partition). The high-level consumer stores these offsets in ZK. The
> > > simple
> > > > consumer has no built-in capability to store and manage offsets, so
> it
> > is
> > > > the developer's responsibility to do so. In the case of the hadoop
> > > consumer
> > > > in the contrib package, these offsets are stored in offset files
> within
> > > > HDFS.
> > > >
> > > > I wrote a blog post a while ago that explains how to use the offset
> > files
> > > > generated by the contrib consumer to do incremental consumption (so
> > that
> > > > you don't get duplicated messages by re-consuming everything in
> > > subsequent
> > > > runs).
> > > >
> > > >
> > > >
> > >
> >
> http://felixgv.com/post/69/automating-incremental-imports-with-the-kafka-hadoop-consumer/
> > > >
> > > > I'm not sure how up to date this is, regarding the current Kafka
> > > versions,
> > > > but it may still give you some useful pointers...
> > > >
> > > > --
> > > > Felix
> > > >
> > > > --
> > > > Felix
> > > >
> > > >
> > > > On Mon, Jan 14, 2013 at 1:34 PM, navneet sharma <
> > > > navneetsharma0505@gmail.com
> > > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I am trying to use the code supplied in hadoop-consumer package. I
> am
> > > > > running into following issues:
> > > > >
> > > > > 1) This code is using SimpleConsumer which is actually contacting
> > Kafka
> > > > > Broker without Zookeeper. Because of which messages are not getting
> > > > cleared
> > > > > from broker.
> > > > > And i am getting duplicate messages in each run.
> > > > >
> > > > > 2) The retention policy specified as log.retention.hours in
> > > > > server.properties is not working. Not sure if its due to
> > > SimpleConsumer.
> > > > >
> > > > > Is it expected behaviour. Is there any code using high level
> consumer
> > > for
> > > > > same work?
> > > > >
> > > > > Thanks,
> > > > > Navneet Sharma
> > > > >
> > > >
> > >
> >
>

Re: hadoop-consumer code in contrib package

Posted by navneet sharma <na...@gmail.com>.
Thanks Felix.

One question still remains. Why SimpleConsumer?
Why not high level Consumer? If i change the code to high level consumer,
will it create any challenges?


Navneet


On Tue, Jan 15, 2013 at 11:46 PM, Felix GV <fe...@mate1inc.com> wrote:

> Please read the Kafka design paper <http://kafka.apache.org/design.html>.
>
> It may look a little long, but it's as short as it can be. Kafka differs
> from other messaging system in a couple of ways, and it's important to
> understand the fundamental design choices that were made in order to
> understand the way Kafka works.
>
> I believe my previous email already answers both your offset tracking and
> retention questions, but if my explanation are not clear enough, then the
> next best thing is probably to read the design paper :)
>
> --
> Felix
>
>
> On Tue, Jan 15, 2013 at 12:01 PM, navneet sharma <
> navneetsharma0505@gmail.com> wrote:
>
> > Thanks Felix for sharing your work. Contrib hadoop-consumer looks like
> the
> > same way.
> >
> > I think i need to really understand this offset stuff. So far i have used
> > only high level consumer.When consumer is done reading all the messages,
> i
> > used to kill the process(because it won't on its own).
> >
> > Again i used Producer to pump more messages and Consumer to read the new
> > messages(which is a new process as i killed the last consumer).
> >
> > But i never saw messages getting duplicating.
> >
> > Now its not very clear for me that how offsets is tracked specifically
> when
> > i am re-launching the consumer?
> > And why retention policy is not working when used with SimpleConsumer?
> For
> > my experiment i made it 4 hours.
> >
> > Please help me understand.
> >
> > Thanks,
> > Navneet
> >
> >
> > On Tue, Jan 15, 2013 at 4:12 AM, Felix GV <fe...@mate1inc.com> wrote:
> >
> > > I think you may be misunderstanding the way Kafka works.
> > >
> > > A kafka broker is never supposed to clear messages just because a
> > consumer
> > > read them.
> > >
> > > The kafka broker will instead clear messages after their retention
> period
> > > ends, though it will not delete the messages at the exact time when
> they
> > > expire. Instead, a background process will periodically delete a batch
> of
> > > expired messages. The retention policies guarantee a minimum retention
> > > time, not an exact retention time.
> > >
> > > It is the responsibility of each consumer to keep track of which
> messages
> > > they have consumed already (by recording an offset for each consumed
> > > partition). The high-level consumer stores these offsets in ZK. The
> > simple
> > > consumer has no built-in capability to store and manage offsets, so it
> is
> > > the developer's responsibility to do so. In the case of the hadoop
> > consumer
> > > in the contrib package, these offsets are stored in offset files within
> > > HDFS.
> > >
> > > I wrote a blog post a while ago that explains how to use the offset
> files
> > > generated by the contrib consumer to do incremental consumption (so
> that
> > > you don't get duplicated messages by re-consuming everything in
> > subsequent
> > > runs).
> > >
> > >
> > >
> >
> http://felixgv.com/post/69/automating-incremental-imports-with-the-kafka-hadoop-consumer/
> > >
> > > I'm not sure how up to date this is, regarding the current Kafka
> > versions,
> > > but it may still give you some useful pointers...
> > >
> > > --
> > > Felix
> > >
> > > --
> > > Felix
> > >
> > >
> > > On Mon, Jan 14, 2013 at 1:34 PM, navneet sharma <
> > > navneetsharma0505@gmail.com
> > > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I am trying to use the code supplied in hadoop-consumer package. I am
> > > > running into following issues:
> > > >
> > > > 1) This code is using SimpleConsumer which is actually contacting
> Kafka
> > > > Broker without Zookeeper. Because of which messages are not getting
> > > cleared
> > > > from broker.
> > > > And i am getting duplicate messages in each run.
> > > >
> > > > 2) The retention policy specified as log.retention.hours in
> > > > server.properties is not working. Not sure if its due to
> > SimpleConsumer.
> > > >
> > > > Is it expected behaviour. Is there any code using high level consumer
> > for
> > > > same work?
> > > >
> > > > Thanks,
> > > > Navneet Sharma
> > > >
> > >
> >
>

Re: hadoop-consumer code in contrib package

Posted by Felix GV <fe...@mate1inc.com>.
Please read the Kafka design paper <http://kafka.apache.org/design.html>.

It may look a little long, but it's as short as it can be. Kafka differs
from other messaging system in a couple of ways, and it's important to
understand the fundamental design choices that were made in order to
understand the way Kafka works.

I believe my previous email already answers both your offset tracking and
retention questions, but if my explanation are not clear enough, then the
next best thing is probably to read the design paper :)

--
Felix


On Tue, Jan 15, 2013 at 12:01 PM, navneet sharma <
navneetsharma0505@gmail.com> wrote:

> Thanks Felix for sharing your work. Contrib hadoop-consumer looks like the
> same way.
>
> I think i need to really understand this offset stuff. So far i have used
> only high level consumer.When consumer is done reading all the messages, i
> used to kill the process(because it won't on its own).
>
> Again i used Producer to pump more messages and Consumer to read the new
> messages(which is a new process as i killed the last consumer).
>
> But i never saw messages getting duplicating.
>
> Now its not very clear for me that how offsets is tracked specifically when
> i am re-launching the consumer?
> And why retention policy is not working when used with SimpleConsumer? For
> my experiment i made it 4 hours.
>
> Please help me understand.
>
> Thanks,
> Navneet
>
>
> On Tue, Jan 15, 2013 at 4:12 AM, Felix GV <fe...@mate1inc.com> wrote:
>
> > I think you may be misunderstanding the way Kafka works.
> >
> > A kafka broker is never supposed to clear messages just because a
> consumer
> > read them.
> >
> > The kafka broker will instead clear messages after their retention period
> > ends, though it will not delete the messages at the exact time when they
> > expire. Instead, a background process will periodically delete a batch of
> > expired messages. The retention policies guarantee a minimum retention
> > time, not an exact retention time.
> >
> > It is the responsibility of each consumer to keep track of which messages
> > they have consumed already (by recording an offset for each consumed
> > partition). The high-level consumer stores these offsets in ZK. The
> simple
> > consumer has no built-in capability to store and manage offsets, so it is
> > the developer's responsibility to do so. In the case of the hadoop
> consumer
> > in the contrib package, these offsets are stored in offset files within
> > HDFS.
> >
> > I wrote a blog post a while ago that explains how to use the offset files
> > generated by the contrib consumer to do incremental consumption (so that
> > you don't get duplicated messages by re-consuming everything in
> subsequent
> > runs).
> >
> >
> >
> http://felixgv.com/post/69/automating-incremental-imports-with-the-kafka-hadoop-consumer/
> >
> > I'm not sure how up to date this is, regarding the current Kafka
> versions,
> > but it may still give you some useful pointers...
> >
> > --
> > Felix
> >
> > --
> > Felix
> >
> >
> > On Mon, Jan 14, 2013 at 1:34 PM, navneet sharma <
> > navneetsharma0505@gmail.com
> > > wrote:
> >
> > > Hi,
> > >
> > > I am trying to use the code supplied in hadoop-consumer package. I am
> > > running into following issues:
> > >
> > > 1) This code is using SimpleConsumer which is actually contacting Kafka
> > > Broker without Zookeeper. Because of which messages are not getting
> > cleared
> > > from broker.
> > > And i am getting duplicate messages in each run.
> > >
> > > 2) The retention policy specified as log.retention.hours in
> > > server.properties is not working. Not sure if its due to
> SimpleConsumer.
> > >
> > > Is it expected behaviour. Is there any code using high level consumer
> for
> > > same work?
> > >
> > > Thanks,
> > > Navneet Sharma
> > >
> >
>

Re: hadoop-consumer code in contrib package

Posted by navneet sharma <na...@gmail.com>.
Thanks Felix for sharing your work. Contrib hadoop-consumer looks like the
same way.

I think i need to really understand this offset stuff. So far i have used
only high level consumer.When consumer is done reading all the messages, i
used to kill the process(because it won't on its own).

Again i used Producer to pump more messages and Consumer to read the new
messages(which is a new process as i killed the last consumer).

But i never saw messages getting duplicating.

Now its not very clear for me that how offsets is tracked specifically when
i am re-launching the consumer?
And why retention policy is not working when used with SimpleConsumer? For
my experiment i made it 4 hours.

Please help me understand.

Thanks,
Navneet


On Tue, Jan 15, 2013 at 4:12 AM, Felix GV <fe...@mate1inc.com> wrote:

> I think you may be misunderstanding the way Kafka works.
>
> A kafka broker is never supposed to clear messages just because a consumer
> read them.
>
> The kafka broker will instead clear messages after their retention period
> ends, though it will not delete the messages at the exact time when they
> expire. Instead, a background process will periodically delete a batch of
> expired messages. The retention policies guarantee a minimum retention
> time, not an exact retention time.
>
> It is the responsibility of each consumer to keep track of which messages
> they have consumed already (by recording an offset for each consumed
> partition). The high-level consumer stores these offsets in ZK. The simple
> consumer has no built-in capability to store and manage offsets, so it is
> the developer's responsibility to do so. In the case of the hadoop consumer
> in the contrib package, these offsets are stored in offset files within
> HDFS.
>
> I wrote a blog post a while ago that explains how to use the offset files
> generated by the contrib consumer to do incremental consumption (so that
> you don't get duplicated messages by re-consuming everything in subsequent
> runs).
>
>
> http://felixgv.com/post/69/automating-incremental-imports-with-the-kafka-hadoop-consumer/
>
> I'm not sure how up to date this is, regarding the current Kafka versions,
> but it may still give you some useful pointers...
>
> --
> Felix
>
> --
> Felix
>
>
> On Mon, Jan 14, 2013 at 1:34 PM, navneet sharma <
> navneetsharma0505@gmail.com
> > wrote:
>
> > Hi,
> >
> > I am trying to use the code supplied in hadoop-consumer package. I am
> > running into following issues:
> >
> > 1) This code is using SimpleConsumer which is actually contacting Kafka
> > Broker without Zookeeper. Because of which messages are not getting
> cleared
> > from broker.
> > And i am getting duplicate messages in each run.
> >
> > 2) The retention policy specified as log.retention.hours in
> > server.properties is not working. Not sure if its due to SimpleConsumer.
> >
> > Is it expected behaviour. Is there any code using high level consumer for
> > same work?
> >
> > Thanks,
> > Navneet Sharma
> >
>

Re: hadoop-consumer code in contrib package

Posted by Felix GV <fe...@mate1inc.com>.
I think you may be misunderstanding the way Kafka works.

A kafka broker is never supposed to clear messages just because a consumer
read them.

The kafka broker will instead clear messages after their retention period
ends, though it will not delete the messages at the exact time when they
expire. Instead, a background process will periodically delete a batch of
expired messages. The retention policies guarantee a minimum retention
time, not an exact retention time.

It is the responsibility of each consumer to keep track of which messages
they have consumed already (by recording an offset for each consumed
partition). The high-level consumer stores these offsets in ZK. The simple
consumer has no built-in capability to store and manage offsets, so it is
the developer's responsibility to do so. In the case of the hadoop consumer
in the contrib package, these offsets are stored in offset files within
HDFS.

I wrote a blog post a while ago that explains how to use the offset files
generated by the contrib consumer to do incremental consumption (so that
you don't get duplicated messages by re-consuming everything in subsequent
runs).

http://felixgv.com/post/69/automating-incremental-imports-with-the-kafka-hadoop-consumer/

I'm not sure how up to date this is, regarding the current Kafka versions,
but it may still give you some useful pointers...

--
Felix

--
Felix


On Mon, Jan 14, 2013 at 1:34 PM, navneet sharma <navneetsharma0505@gmail.com
> wrote:

> Hi,
>
> I am trying to use the code supplied in hadoop-consumer package. I am
> running into following issues:
>
> 1) This code is using SimpleConsumer which is actually contacting Kafka
> Broker without Zookeeper. Because of which messages are not getting cleared
> from broker.
> And i am getting duplicate messages in each run.
>
> 2) The retention policy specified as log.retention.hours in
> server.properties is not working. Not sure if its due to SimpleConsumer.
>
> Is it expected behaviour. Is there any code using high level consumer for
> same work?
>
> Thanks,
> Navneet Sharma
>