You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dave Fayram <df...@gmail.com> on 2011/09/23 00:37:27 UTC

Doing aggregating counters in Kafka, how do I get the last consumed offset per partition?

Hi, I'm trying to write a system that takes a stream of many many
small events (a N-way split of a kafka topic over partitions) and
compress these into one larger interval, which is then flushed to a
write-through cache. For consistency, I'd like to turn off the
autocommit of the partition and only update the read offset when I
actually flush the aggregated result down to a write-through cache.
For durability, I'd also like to record the offset as part of the
record I flush.

But I am having trouble trying to figure out exactly how to do this
with the Scala API. It seems like the client code can't easily get
access to this data. I know that the Hadoop consumer does it
(according to the other thread on offsets and message lengths), but I
can't really figure out how.

Any advice on how to do this? I thought I could use
ZookeeperConsumerStats, but this class seemes to have been removed.

-- 
--
Dave Fayram
dfayram@gmail.com

Re: Doing aggregating counters in Kafka, how do I get the last consumed offset per partition?

Posted by Jun Rao <ju...@gmail.com>.
Dave,

If you have to know the offset, currently you have to use SimpleConsumer,
instead of the high level consumer. In that case, you would have to manage
reading from all partitions and load balance among them.

Jun

On Thu, Sep 22, 2011 at 11:02 PM, Dave Fayram <df...@gmail.com> wrote:

> I think I do, though.
>
> The write-through cache does its best to write through, but if that
> fails, then we need to be able to have the offset we committed stored.
> Should the worst case scenario occur, we can simply resume from the
> offset we did manage to commit and things hum along.
>
> Is this considered a bad decision with kafka? I'm trying to make sure
> I do not drop data or replay it.
>
> - dlf
> On Thu, Sep 22, 2011 at 6:39 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> > Kafka internally keeps track the valid offsets. You just need to call
> > commitOffsets() for the offsets to be checkpointed in ZK. You don't need
> to
> > know the offset directly.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Sep 22, 2011 at 3:37 PM, Dave Fayram <df...@gmail.com> wrote:
> >
> >> Hi, I'm trying to write a system that takes a stream of many many
> >> small events (a N-way split of a kafka topic over partitions) and
> >> compress these into one larger interval, which is then flushed to a
> >> write-through cache. For consistency, I'd like to turn off the
> >> autocommit of the partition and only update the read offset when I
> >> actually flush the aggregated result down to a write-through cache.
> >> For durability, I'd also like to record the offset as part of the
> >> record I flush.
> >>
> >> But I am having trouble trying to figure out exactly how to do this
> >> with the Scala API. It seems like the client code can't easily get
> >> access to this data. I know that the Hadoop consumer does it
> >> (according to the other thread on offsets and message lengths), but I
> >> can't really figure out how.
> >>
> >> Any advice on how to do this? I thought I could use
> >> ZookeeperConsumerStats, but this class seemes to have been removed.
> >>
> >> --
> >> --
> >> Dave Fayram
> >> dfayram@gmail.com
> >>
> >
>
>
>
> --
> --
> Dave Fayram
> dfayram@gmail.com
>

Re: Doing aggregating counters in Kafka, how do I get the last consumed offset per partition?

Posted by Dave Fayram <df...@gmail.com>.
I think I do, though.

The write-through cache does its best to write through, but if that
fails, then we need to be able to have the offset we committed stored.
Should the worst case scenario occur, we can simply resume from the
offset we did manage to commit and things hum along.

Is this considered a bad decision with kafka? I'm trying to make sure
I do not drop data or replay it.

- dlf
On Thu, Sep 22, 2011 at 6:39 PM, Jun Rao <ju...@gmail.com> wrote:
>
> Kafka internally keeps track the valid offsets. You just need to call
> commitOffsets() for the offsets to be checkpointed in ZK. You don't need to
> know the offset directly.
>
> Thanks,
>
> Jun
>
> On Thu, Sep 22, 2011 at 3:37 PM, Dave Fayram <df...@gmail.com> wrote:
>
>> Hi, I'm trying to write a system that takes a stream of many many
>> small events (a N-way split of a kafka topic over partitions) and
>> compress these into one larger interval, which is then flushed to a
>> write-through cache. For consistency, I'd like to turn off the
>> autocommit of the partition and only update the read offset when I
>> actually flush the aggregated result down to a write-through cache.
>> For durability, I'd also like to record the offset as part of the
>> record I flush.
>>
>> But I am having trouble trying to figure out exactly how to do this
>> with the Scala API. It seems like the client code can't easily get
>> access to this data. I know that the Hadoop consumer does it
>> (according to the other thread on offsets and message lengths), but I
>> can't really figure out how.
>>
>> Any advice on how to do this? I thought I could use
>> ZookeeperConsumerStats, but this class seemes to have been removed.
>>
>> --
>> --
>> Dave Fayram
>> dfayram@gmail.com
>>
>



-- 
--
Dave Fayram
dfayram@gmail.com

Re: Doing aggregating counters in Kafka, how do I get the last consumed offset per partition?

Posted by Jun Rao <ju...@gmail.com>.
Dave,

Kafka internally keeps track the valid offsets. You just need to call
commitOffsets() for the offsets to be checkpointed in ZK. You don't need to
know the offset directly.

Thanks,

Jun

On Thu, Sep 22, 2011 at 3:37 PM, Dave Fayram <df...@gmail.com> wrote:

> Hi, I'm trying to write a system that takes a stream of many many
> small events (a N-way split of a kafka topic over partitions) and
> compress these into one larger interval, which is then flushed to a
> write-through cache. For consistency, I'd like to turn off the
> autocommit of the partition and only update the read offset when I
> actually flush the aggregated result down to a write-through cache.
> For durability, I'd also like to record the offset as part of the
> record I flush.
>
> But I am having trouble trying to figure out exactly how to do this
> with the Scala API. It seems like the client code can't easily get
> access to this data. I know that the Hadoop consumer does it
> (according to the other thread on offsets and message lengths), but I
> can't really figure out how.
>
> Any advice on how to do this? I thought I could use
> ZookeeperConsumerStats, but this class seemes to have been removed.
>
> --
> --
> Dave Fayram
> dfayram@gmail.com
>