You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Joris Peeters <j....@wintoncapital.com> on 2016/01/07 11:02:23 UTC

committing particular offset with high level consumer?

We are using Kafka 8.2.1 (*), and have kafka.javaapi.consumer.ConsumerConnector connected to a single topic (and are coding in scala). We're essentially doing something as simple as:

while (it.hasNext) {
  val msg = it.next
  // do stuff
  consumer.commitOffsets
}

This works great, but for some optimisation reasons (**) we want to refactor and have some control over which offsets we commit back. Is there a way to do that within the high level consumer? The API didn't immediately show anything, but I imagine this is a fairly common operator so perhaps there is a way that I'm missing. (Sorry if this was very clearly documented somewhere - I didn't stumble on it).

Thanks!
-Joris.

(*) We could migrate to 0.9.0 if that helps.
(**) Right now we have M workers (in one consumer group) taking from N partitions. To get better load balancing, we would want to move to the N partitions being consumed by just 1 work-dispenser (in some actor implementation) which distributes the work (according to availability) to the M workers. Then we can't commit on every single processed message anymore, because messages with a lower consumer offset might still be in process (and they wouldn't hence get redelivered after a restart).





Winton Capital Management Limited ("Winton") is a limited company registered in England and Wales with its registered offices at 16 Old Bailey, London, EC4M 7EG (Registered Company No. 3311531). Winton is authorised and regulated by the Financial Conduct Authority in the United Kingdom, registered as an investment adviser with the US Securities and Exchange Commission, registered with the US Commodity Futures Trading Commission and a member of the National Futures Association in the United States.

This communication, including any attachments, is confidential and may be privileged. This email is for use by the intended recipient only. If you receive it in error, please notify the sender and delete it. You should not copy or disclose all or any part of this email.

This email does not constitute an offer or solicitation and nothing contained in this email constitutes, and should not be construed as, investment advice. Prospective investors should request offering materials and consult their own advisers with respect to investment decisions and inform themselves as to applicable legal requirements, exchange control regulations and taxes in the countries of their citizenship, residence or domicile. Past performance is not indicative of future results.

Winton takes reasonable steps to ensure the accuracy and integrity of its communications, including emails. However Winton accepts no liability for any materials transmitted. Emails are not secure and cannot be guaranteed to be error free.

RE: committing particular offset with high level consumer?

Posted by Joris Peeters <j....@wintoncapital.com>.
Thanks, Ewen. I ended up migrating to 0.9.0.0 and using the new consumer's  KafkaConsumer.commitSync(offsets) function, which looks like it's working perfectly. Nice feature.

-J

-----Original Message-----
From: Ewen Cheslack-Postava [mailto:ewen@confluent.io]
Sent: 07 January 2016 20:15
To: users@kafka.apache.org
Subject: Re: committing particular offset with high level consumer?

Joris,

The high level consumer now has this API:

def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition, OffsetAndMetadata], retryOnFailure: Boolean)

I believe that was added in the 0.9 versions, so it wouldn't be in 0.8.1 or
0.8.2 versions. The new consumer also supports this feature.

-Ewen

On Thu, Jan 7, 2016 at 2:02 AM, Joris Peeters <j....@wintoncapital.com>
wrote:

> We are using Kafka 8.2.1 (*), and have
> kafka.javaapi.consumer.ConsumerConnector connected to a single topic
> (and are coding in scala). We're essentially doing something as simple as:
>
> while (it.hasNext) {
>   val msg = it.next
>   // do stuff
>   consumer.commitOffsets
> }
>
> This works great, but for some optimisation reasons (**) we want to
> refactor and have some control over which offsets we commit back. Is
> there a way to do that within the high level consumer? The API didn't
> immediately show anything, but I imagine this is a fairly common
> operator so perhaps there is a way that I'm missing. (Sorry if this
> was very clearly documented somewhere - I didn't stumble on it).
>
> Thanks!
> -Joris.
>
> (*) We could migrate to 0.9.0 if that helps.
> (**) Right now we have M workers (in one consumer group) taking from N
> partitions. To get better load balancing, we would want to move to the
> N partitions being consumed by just 1 work-dispenser (in some actor
> implementation) which distributes the work (according to availability)
> to the M workers. Then we can't commit on every single processed
> message anymore, because messages with a lower consumer offset might
> still be in process (and they wouldn't hence get redelivered after a restart).
>
>
>
>
>
> Winton Capital Management Limited ("Winton") is a limited company
> registered in England and Wales with its registered offices at 16 Old
> Bailey, London, EC4M 7EG (Registered Company No. 3311531). Winton is
> authorised and regulated by the Financial Conduct Authority in the
> United Kingdom, registered as an investment adviser with the US
> Securities and Exchange Commission, registered with the US Commodity
> Futures Trading Commission and a member of the National Futures
> Association in the United States.
>
> This communication, including any attachments, is confidential and may
> be privileged. This email is for use by the intended recipient only.
> If you receive it in error, please notify the sender and delete it.
> You should not copy or disclose all or any part of this email.
>
> This email does not constitute an offer or solicitation and nothing
> contained in this email constitutes, and should not be construed as,
> investment advice. Prospective investors should request offering
> materials and consult their own advisers with respect to investment
> decisions and inform themselves as to applicable legal requirements,
> exchange control regulations and taxes in the countries of their
> citizenship, residence or domicile. Past performance is not indicative of future results.
>
> Winton takes reasonable steps to ensure the accuracy and integrity of
> its communications, including emails. However Winton accepts no
> liability for any materials transmitted. Emails are not secure and
> cannot be guaranteed to be error free.
>



--
Thanks,
Ewen


Winton Capital Management Limited (“Winton”) is a limited company registered in England and Wales with its registered offices at 16 Old Bailey, London, EC4M 7EG (Registered Company No. 3311531). Winton is authorised and regulated by the Financial Conduct Authority in the United Kingdom, registered as an investment adviser with the US Securities and Exchange Commission, registered with the US Commodity Futures Trading Commission and a member of the National Futures Association in the United States.

This communication, including any attachments, is confidential and may be privileged. This email is for use by the intended recipient only. If you receive it in error, please notify the sender and delete it. You should not copy or disclose all or any part of this email.

This email does not constitute an offer or solicitation and nothing contained in this email constitutes, and should not be construed as, investment advice. Prospective investors should request offering materials and consult their own advisers with respect to investment decisions and inform themselves as to applicable legal requirements, exchange control regulations and taxes in the countries of their citizenship, residence or domicile. Past performance is not indicative of future results.

Winton takes reasonable steps to ensure the accuracy and integrity of its communications, including emails. However Winton accepts no liability for any materials transmitted. Emails are not secure and cannot be guaranteed to be error free.

Re: committing particular offset with high level consumer?

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Joris,

The high level consumer now has this API:

def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition,
OffsetAndMetadata], retryOnFailure: Boolean)

I believe that was added in the 0.9 versions, so it wouldn't be in 0.8.1 or
0.8.2 versions. The new consumer also supports this feature.

-Ewen

On Thu, Jan 7, 2016 at 2:02 AM, Joris Peeters <j....@wintoncapital.com>
wrote:

> We are using Kafka 8.2.1 (*), and have
> kafka.javaapi.consumer.ConsumerConnector connected to a single topic (and
> are coding in scala). We're essentially doing something as simple as:
>
> while (it.hasNext) {
>   val msg = it.next
>   // do stuff
>   consumer.commitOffsets
> }
>
> This works great, but for some optimisation reasons (**) we want to
> refactor and have some control over which offsets we commit back. Is there
> a way to do that within the high level consumer? The API didn't immediately
> show anything, but I imagine this is a fairly common operator so perhaps
> there is a way that I'm missing. (Sorry if this was very clearly documented
> somewhere - I didn't stumble on it).
>
> Thanks!
> -Joris.
>
> (*) We could migrate to 0.9.0 if that helps.
> (**) Right now we have M workers (in one consumer group) taking from N
> partitions. To get better load balancing, we would want to move to the N
> partitions being consumed by just 1 work-dispenser (in some actor
> implementation) which distributes the work (according to availability) to
> the M workers. Then we can't commit on every single processed message
> anymore, because messages with a lower consumer offset might still be in
> process (and they wouldn't hence get redelivered after a restart).
>
>
>
>
>
> Winton Capital Management Limited ("Winton") is a limited company
> registered in England and Wales with its registered offices at 16 Old
> Bailey, London, EC4M 7EG (Registered Company No. 3311531). Winton is
> authorised and regulated by the Financial Conduct Authority in the United
> Kingdom, registered as an investment adviser with the US Securities and
> Exchange Commission, registered with the US Commodity Futures Trading
> Commission and a member of the National Futures Association in the United
> States.
>
> This communication, including any attachments, is confidential and may be
> privileged. This email is for use by the intended recipient only. If you
> receive it in error, please notify the sender and delete it. You should not
> copy or disclose all or any part of this email.
>
> This email does not constitute an offer or solicitation and nothing
> contained in this email constitutes, and should not be construed as,
> investment advice. Prospective investors should request offering materials
> and consult their own advisers with respect to investment decisions and
> inform themselves as to applicable legal requirements, exchange control
> regulations and taxes in the countries of their citizenship, residence or
> domicile. Past performance is not indicative of future results.
>
> Winton takes reasonable steps to ensure the accuracy and integrity of its
> communications, including emails. However Winton accepts no liability for
> any materials transmitted. Emails are not secure and cannot be guaranteed
> to be error free.
>



-- 
Thanks,
Ewen