You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Neha Narkhede <ne...@gmail.com> on 2014/04/07 18:21:15 UTC

Review for the new consumer APIs

Hi,

I'm looking for people to review the new consumers APIs. Patch is posted at
https://issues.apache.org/jira/browse/KAFKA-1328

Thanks,
Neha

Re: Review for the new consumer APIs

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks Neha, will do that today.

Guozhang


On Mon, Apr 7, 2014 at 9:21 AM, Neha Narkhede <ne...@gmail.com>wrote:

> Hi,
>
> I'm looking for people to review the new consumers APIs. Patch is posted at
> https://issues.apache.org/jira/browse/KAFKA-1328
>
> Thanks,
> Neha
>



-- 
-- Guozhang

Re: Review for the new consumer APIs

Posted by Neha Narkhede <ne...@gmail.com>.
1) We would like to find out what is latest message per partition that
current consumer is connected?

I'm not sure I understood what you meant by "connected". The new consumer
will expose the
position()<http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html#position%28org.apache.kafka.common.TopicPartition%29>API
that tells you the offset of the next message, per partition, the
consumer will fetch on the next
poll()<http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll%28long,%20java.util.concurrent.TimeUnit%29>
.

2) If the consumer lags behind by certain offset or by time, consumer can
seek to particular offset(which we can use seek method for this).

Yes, you can find the difference between the offset returned by the
committed()<http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html#committed%28org.apache.kafka.common.TopicPartition...%29>API
and diff that with the offset returned by the
offsetsBeforeTime()<http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsBeforeTime%28long,%20org.apache.kafka.common.TopicPartition...%29>API.
This tells you the lag and you can then use
seek()<http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seek%28java.util.Map%29>to
reset the offset to a newer value.

3) How can we start a temp consumer for same partition to read messages
based on offset range (last consume offset from part 2 to current offset
that we jumped to in part 2) ?

You can do this by firing off a new consumer with
auto.offset.commit=disable, then using the commit() API to overwrite the
offset and then using
poll()<http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll%28long,%20java.util.concurrent.TimeUnit%29>until
you reach the target offset.

Basically, is there a QOS concept per partition where consumer always needs
to consume latest message and detect a lag behind and start TEMP consumer
for back-fill.


How does Linked in handle the near real time consumption for operation
metrics ?

We tightly monitor the lag on our consumers and fix the issue causing the
lag to bring the lag down.


On Mon, May 5, 2014 at 5:30 PM, Bhavesh Mistry
<mi...@gmail.com>wrote:

> Hi Neha,
>
> How will new Consumer help us with implementing following use case?
>
>
>
> We have heartbeat as one of topics and all application servers publish
> metric to this topic.  We have to meet near real-time consume SLA (less
> than 30 seconds).
>
> 1) We would like to find out what is latest message per partition that
> current consumer is connected?
>
> 2) If the consumer lags behind by certain offset or by time, consumer can
> seek to particular offset(which we can use seek method for this).
>
> 3) How can we start a temp consumer for same partition to read messages
> based on offset range (last consume offset from part 2 to current offset
> that we jumped to in part 2) ?
>
>
>
> Basically, is there a QOS concept per partition where consumer always needs
> to consume latest message and detect a lag behind and start TEMP consumer
> for back-fill.
>
>
> How does Linked in handle the near real time consumption for operation
> metrics ?
>
>
> Thanks,
>
>
> Bhavesh
>
>
> On Sat, Apr 12, 2014 at 6:58 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > Why cant we pass a callback in subscribe itself?
> >
> > Mainly because it will make the processing kind of awkward since you need
> > to access the other consumer APIs while processing the messages. Your
> > suggestion does point out a problem with the poll() API though. Here is
> the
> > initial proposal of the poll() API-
> >
> > List<ConsumerRecord> poll(long timeout, TimeUnit unit);
> >
> > The application subscribes to topics or partitions and expects to process
> > messages per topic or per partition respectively. By just returning a
> list
> > of ConsumerRecord objects, we make it difficult for the application to
> > process messages naturally grouped by topic or partition. After some
> > thought, I changed it to -
> >
> > Map<String, ConsumerRecordMetadata> poll(long timeout, TimeUnit unit);
> >
> > ConsumerRecordMetadata allows you to get records for a particular
> partition
> > or get records for all partitions.
> >
> > The second change I made is to the commit APIs. To remain consistent with
> > the Producer, I changed commit() to return a Future and got rid of
> > commitAsync(). This will easily support the sync and async commit use
> > cases.
> >
> > Map<TopicPartition,OffsetMetadata
> > <
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/OffsetMetadata.html
> > >>>
> > commit(Map<TopicPartition,Long> offsets);
> >
> > I'm looking for feedback on these changes. I've published the new javadoc
> > to the same location<
> > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc>.
> > Appreciate if someone can take a look.
> >
> > Thanks,
> > Neha
> >
> >
> > On Tue, Apr 8, 2014 at 9:50 PM, pushkar priyadarshi <
> > priyadarshi.pushkar@gmail.com> wrote:
> >
> > > Was trying to understand when we have subscribe then why poll is a
> > separate
> > > API.Why cant we pass a callback in subscribe itself?
> > >
> > >
> > > On Mon, Apr 7, 2014 at 9:51 PM, Neha Narkhede <neha.narkhede@gmail.com
> > > >wrote:
> > >
> > > > Hi,
> > > >
> > > > I'm looking for people to review the new consumers APIs. Patch is
> > posted
> > > at
> > > > https://issues.apache.org/jira/browse/KAFKA-1328
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > >
> >
>

Re: Review for the new consumer APIs

Posted by Bhavesh Mistry <mi...@gmail.com>.
Hi Neha,

How will new Consumer help us with implementing following use case?



We have heartbeat as one of topics and all application servers publish
metric to this topic.  We have to meet near real-time consume SLA (less
than 30 seconds).

1) We would like to find out what is latest message per partition that
current consumer is connected?

2) If the consumer lags behind by certain offset or by time, consumer can
seek to particular offset(which we can use seek method for this).

3) How can we start a temp consumer for same partition to read messages
based on offset range (last consume offset from part 2 to current offset
that we jumped to in part 2) ?



Basically, is there a QOS concept per partition where consumer always needs
to consume latest message and detect a lag behind and start TEMP consumer
for back-fill.


How does Linked in handle the near real time consumption for operation
metrics ?


Thanks,


Bhavesh


On Sat, Apr 12, 2014 at 6:58 PM, Neha Narkhede <ne...@gmail.com>wrote:

> Why cant we pass a callback in subscribe itself?
>
> Mainly because it will make the processing kind of awkward since you need
> to access the other consumer APIs while processing the messages. Your
> suggestion does point out a problem with the poll() API though. Here is the
> initial proposal of the poll() API-
>
> List<ConsumerRecord> poll(long timeout, TimeUnit unit);
>
> The application subscribes to topics or partitions and expects to process
> messages per topic or per partition respectively. By just returning a list
> of ConsumerRecord objects, we make it difficult for the application to
> process messages naturally grouped by topic or partition. After some
> thought, I changed it to -
>
> Map<String, ConsumerRecordMetadata> poll(long timeout, TimeUnit unit);
>
> ConsumerRecordMetadata allows you to get records for a particular partition
> or get records for all partitions.
>
> The second change I made is to the commit APIs. To remain consistent with
> the Producer, I changed commit() to return a Future and got rid of
> commitAsync(). This will easily support the sync and async commit use
> cases.
>
> Map<TopicPartition,OffsetMetadata
> <
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/OffsetMetadata.html
> >>>
> commit(Map<TopicPartition,Long> offsets);
>
> I'm looking for feedback on these changes. I've published the new javadoc
> to the same location<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc>.
> Appreciate if someone can take a look.
>
> Thanks,
> Neha
>
>
> On Tue, Apr 8, 2014 at 9:50 PM, pushkar priyadarshi <
> priyadarshi.pushkar@gmail.com> wrote:
>
> > Was trying to understand when we have subscribe then why poll is a
> separate
> > API.Why cant we pass a callback in subscribe itself?
> >
> >
> > On Mon, Apr 7, 2014 at 9:51 PM, Neha Narkhede <neha.narkhede@gmail.com
> > >wrote:
> >
> > > Hi,
> > >
> > > I'm looking for people to review the new consumers APIs. Patch is
> posted
> > at
> > > https://issues.apache.org/jira/browse/KAFKA-1328
> > >
> > > Thanks,
> > > Neha
> > >
> >
>

Re: Review for the new consumer APIs

Posted by Bhavesh Mistry <mi...@gmail.com>.
Hi Neha,

How will new Consumer help us with implementing following use case?



We have heartbeat as one of topics and all application servers publish
metric to this topic.  We have to meet near real-time consume SLA (less
than 30 seconds).

1) We would like to find out what is latest message per partition that
current consumer is connected?

2) If the consumer lags behind by certain offset or by time, consumer can
seek to particular offset(which we can use seek method for this).

3) How can we start a temp consumer for same partition to read messages
based on offset range (last consume offset from part 2 to current offset
that we jumped to in part 2) ?



Basically, is there a QOS concept per partition where consumer always needs
to consume latest message and detect a lag behind and start TEMP consumer
for back-fill.


How does Linked in handle the near real time consumption for operation
metrics ?


Thanks,


Bhavesh


On Sat, Apr 12, 2014 at 6:58 PM, Neha Narkhede <ne...@gmail.com>wrote:

> Why cant we pass a callback in subscribe itself?
>
> Mainly because it will make the processing kind of awkward since you need
> to access the other consumer APIs while processing the messages. Your
> suggestion does point out a problem with the poll() API though. Here is the
> initial proposal of the poll() API-
>
> List<ConsumerRecord> poll(long timeout, TimeUnit unit);
>
> The application subscribes to topics or partitions and expects to process
> messages per topic or per partition respectively. By just returning a list
> of ConsumerRecord objects, we make it difficult for the application to
> process messages naturally grouped by topic or partition. After some
> thought, I changed it to -
>
> Map<String, ConsumerRecordMetadata> poll(long timeout, TimeUnit unit);
>
> ConsumerRecordMetadata allows you to get records for a particular partition
> or get records for all partitions.
>
> The second change I made is to the commit APIs. To remain consistent with
> the Producer, I changed commit() to return a Future and got rid of
> commitAsync(). This will easily support the sync and async commit use
> cases.
>
> Map<TopicPartition,OffsetMetadata
> <
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/OffsetMetadata.html
> >>>
> commit(Map<TopicPartition,Long> offsets);
>
> I'm looking for feedback on these changes. I've published the new javadoc
> to the same location<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc>.
> Appreciate if someone can take a look.
>
> Thanks,
> Neha
>
>
> On Tue, Apr 8, 2014 at 9:50 PM, pushkar priyadarshi <
> priyadarshi.pushkar@gmail.com> wrote:
>
> > Was trying to understand when we have subscribe then why poll is a
> separate
> > API.Why cant we pass a callback in subscribe itself?
> >
> >
> > On Mon, Apr 7, 2014 at 9:51 PM, Neha Narkhede <neha.narkhede@gmail.com
> > >wrote:
> >
> > > Hi,
> > >
> > > I'm looking for people to review the new consumers APIs. Patch is
> posted
> > at
> > > https://issues.apache.org/jira/browse/KAFKA-1328
> > >
> > > Thanks,
> > > Neha
> > >
> >
>

Re: Review for the new consumer APIs

Posted by Neha Narkhede <ne...@gmail.com>.
Why cant we pass a callback in subscribe itself?

Mainly because it will make the processing kind of awkward since you need
to access the other consumer APIs while processing the messages. Your
suggestion does point out a problem with the poll() API though. Here is the
initial proposal of the poll() API-

List<ConsumerRecord> poll(long timeout, TimeUnit unit);

The application subscribes to topics or partitions and expects to process
messages per topic or per partition respectively. By just returning a list
of ConsumerRecord objects, we make it difficult for the application to
process messages naturally grouped by topic or partition. After some
thought, I changed it to -

Map<String, ConsumerRecordMetadata> poll(long timeout, TimeUnit unit);

ConsumerRecordMetadata allows you to get records for a particular partition
or get records for all partitions.

The second change I made is to the commit APIs. To remain consistent with
the Producer, I changed commit() to return a Future and got rid of
commitAsync(). This will easily support the sync and async commit use cases.

Map<TopicPartition,OffsetMetadata
<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/OffsetMetadata.html>>>
commit(Map<TopicPartition,Long> offsets);

I'm looking for feedback on these changes. I've published the new javadoc
to the same location<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc>.
Appreciate if someone can take a look.

Thanks,
Neha


On Tue, Apr 8, 2014 at 9:50 PM, pushkar priyadarshi <
priyadarshi.pushkar@gmail.com> wrote:

> Was trying to understand when we have subscribe then why poll is a separate
> API.Why cant we pass a callback in subscribe itself?
>
>
> On Mon, Apr 7, 2014 at 9:51 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > Hi,
> >
> > I'm looking for people to review the new consumers APIs. Patch is posted
> at
> > https://issues.apache.org/jira/browse/KAFKA-1328
> >
> > Thanks,
> > Neha
> >
>

Re: Review for the new consumer APIs

Posted by Neha Narkhede <ne...@gmail.com>.
Why cant we pass a callback in subscribe itself?

Mainly because it will make the processing kind of awkward since you need
to access the other consumer APIs while processing the messages. Your
suggestion does point out a problem with the poll() API though. Here is the
initial proposal of the poll() API-

List<ConsumerRecord> poll(long timeout, TimeUnit unit);

The application subscribes to topics or partitions and expects to process
messages per topic or per partition respectively. By just returning a list
of ConsumerRecord objects, we make it difficult for the application to
process messages naturally grouped by topic or partition. After some
thought, I changed it to -

Map<String, ConsumerRecordMetadata> poll(long timeout, TimeUnit unit);

ConsumerRecordMetadata allows you to get records for a particular partition
or get records for all partitions.

The second change I made is to the commit APIs. To remain consistent with
the Producer, I changed commit() to return a Future and got rid of
commitAsync(). This will easily support the sync and async commit use cases.

Map<TopicPartition,OffsetMetadata
<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/OffsetMetadata.html>>>
commit(Map<TopicPartition,Long> offsets);

I'm looking for feedback on these changes. I've published the new javadoc
to the same location<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc>.
Appreciate if someone can take a look.

Thanks,
Neha


On Tue, Apr 8, 2014 at 9:50 PM, pushkar priyadarshi <
priyadarshi.pushkar@gmail.com> wrote:

> Was trying to understand when we have subscribe then why poll is a separate
> API.Why cant we pass a callback in subscribe itself?
>
>
> On Mon, Apr 7, 2014 at 9:51 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > Hi,
> >
> > I'm looking for people to review the new consumers APIs. Patch is posted
> at
> > https://issues.apache.org/jira/browse/KAFKA-1328
> >
> > Thanks,
> > Neha
> >
>

Re: Review for the new consumer APIs

Posted by pushkar priyadarshi <pr...@gmail.com>.
Was trying to understand when we have subscribe then why poll is a separate
API.Why cant we pass a callback in subscribe itself?


On Mon, Apr 7, 2014 at 9:51 PM, Neha Narkhede <ne...@gmail.com>wrote:

> Hi,
>
> I'm looking for people to review the new consumers APIs. Patch is posted at
> https://issues.apache.org/jira/browse/KAFKA-1328
>
> Thanks,
> Neha
>

Re: Review for the new consumer APIs

Posted by pushkar priyadarshi <pr...@gmail.com>.
Was trying to understand when we have subscribe then why poll is a separate
API.Why cant we pass a callback in subscribe itself?


On Mon, Apr 7, 2014 at 9:51 PM, Neha Narkhede <ne...@gmail.com>wrote:

> Hi,
>
> I'm looking for people to review the new consumers APIs. Patch is posted at
> https://issues.apache.org/jira/browse/KAFKA-1328
>
> Thanks,
> Neha
>