You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Anjani Gupta <an...@salesforce.com> on 2016/12/06 18:22:09 UTC

How to disable auto commit for SimpleConsumer kafka 0.8.1

I want to disable auto commit for kafka SimpleConsumer. I am using 0.8.1
version.For High level consumer, config options can be set and passed via
consumerConfig as follows
kafka.consumer.Consumer.createJavaConsumerConnector(this.consumerConfig);

How can I achieve the same for SimpleConsumer? I mainly want to disable
auto commit. I tried setting auto commit to false in consumer.properties
and restarted kafka server, zookeeper and producer. But, that does not
work. I think I need to apply this setting through code, not in
consumer.properties. Can anyone help here?

Here is how my code looks like

List<TopicAndPartition> topicAndPartitionList = new ArrayList<>();
topicAndPartitionList.add(topicAndPartition);
OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new
 OffsetFetchRequest("testGroup", topicAndPartitionList, (short) 0,
correlationId,    clientName));


Map<TopicAndPartition, OffsetMetadataAndError> offsets =
offsetFetchResponse.offsets();
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
           .addFetch(a_topic, a_partition,
offsets.get(topicAndPartition).offset(), 100000)   .build();

long readOffset = offsets.get(topicAndPartition).offset();
FetchResponse fetchResponse = consumer.fetch(req);

//Consume messages from fetchResponse


Map<TopicAndPartition, OffsetMetadataAndError > requestInfo = new
HashMap<>  ();
requestInfo.put(topicAndPartition, new
OffsetMetadataAndError(readOffset, "metadata", (short)0));
OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(new
        OffsetCommitRequest("testGroup", requestInfo, (short)0,
correlationId, clientName));


If above code crashes before committing offset, I still get latest offset
as result of offsets.get(topicAndPartition).offset() in next run which
makes me to think that auto commit of offset happens as code is executed.

Re: How to disable auto commit for SimpleConsumer kafka 0.8.1

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
The simple consumer doesn't do auto-commit. It really only issues
individual low-level Kafka protocol request types, so `commitOffsets` is
the only way it should write offsets.

Is it possible it crashed after the request was sent but before finishing
reading the response?

Side-note: I know you mentioned 0.8.1, but if at all possible, we'd highly
recommend moving to the new consumer if at all possible. It supports both
simple and consumer group modes and is what will be supported in the long
term moving forward.

-Ewen

On Tue, Dec 6, 2016 at 12:47 PM, Anjani Gupta <an...@salesforce.com>
wrote:

> I want to disable auto commit for kafka SimpleConsumer. I am using 0.8.1
> version.For High level consumer, config options can be set and passed via
> consumerConfig as follows kafka.consumer.Consumer.
> createJavaConsumerConnector(this.consumerConfig);
>
> How can I achieve the same for SimpleConsumer? I mainly want to disable
> auto commit. I tried setting auto commit to false in consumer.properties
> and restarted kafka server, zookeeper and producer. But, that does not
> work. I think I need to apply this setting through code, not in
> consumer.properties. Can anyone help here?
>
> Here is how my code looks like
>
> List<TopicAndPartition> topicAndPartitionList = new ArrayList<>();
> topicAndPartitionList.add(topicAndPartition);
> OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new
>  OffsetFetchRequest("testGroup", topicAndPartitionList, (short) 0,
> correlationId,    clientName));
>
> Map<TopicAndPartition, OffsetMetadataAndError> offsets =
> offsetFetchResponse.offsets();
> FetchRequest req = new FetchRequestBuilder() .clientId(clientName)
>            .addFetch(a_topic, a_partition,
> offsets.get(topicAndPartition).offset(), 100000)   .build();
> long readOffset = offsets.get(topicAndPartition).offset();
> FetchResponse fetchResponse = consumer.fetch(req);
>
> //Consume messages from fetchResponse
>
>
> Map<TopicAndPartition, OffsetMetadataAndError > requestInfo = new
> HashMap<>  ();
> requestInfo.put(topicAndPartition, new
> OffsetMetadataAndError(readOffset, "metadata", (short)0));
> OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(new
>         OffsetCommitRequest("testGroup", requestInfo, (short)0,
> correlationId, clientName));
>
>
> If above code crashes before committing offset, I still get latest offset
> as result of offsets.get(topicAndPartition).offset() in next run which
> makes me to think that auto commit of offset happens as code is executed.
>



-- 
Thanks,
Ewen

Fwd: How to disable auto commit for SimpleConsumer kafka 0.8.1

Posted by Anjani Gupta <an...@salesforce.com>.
I want to disable auto commit for kafka SimpleConsumer. I am using 0.8.1
version.For High level consumer, config options can be set and passed via
consumerConfig as follows kafka.consumer.Consumer.
createJavaConsumerConnector(this.consumerConfig);

How can I achieve the same for SimpleConsumer? I mainly want to disable
auto commit. I tried setting auto commit to false in consumer.properties
and restarted kafka server, zookeeper and producer. But, that does not
work. I think I need to apply this setting through code, not in
consumer.properties. Can anyone help here?

Here is how my code looks like

List<TopicAndPartition> topicAndPartitionList = new ArrayList<>();
topicAndPartitionList.add(topicAndPartition);
OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new
 OffsetFetchRequest("testGroup", topicAndPartitionList, (short) 0,
correlationId,    clientName));


Map<TopicAndPartition, OffsetMetadataAndError> offsets =
offsetFetchResponse.offsets();
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
           .addFetch(a_topic, a_partition,
offsets.get(topicAndPartition).offset(), 100000)   .build();

long readOffset = offsets.get(topicAndPartition).offset();
FetchResponse fetchResponse = consumer.fetch(req);

//Consume messages from fetchResponse


Map<TopicAndPartition, OffsetMetadataAndError > requestInfo = new
HashMap<>  ();
requestInfo.put(topicAndPartition, new
OffsetMetadataAndError(readOffset, "metadata", (short)0));
OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(new
        OffsetCommitRequest("testGroup", requestInfo, (short)0,
correlationId, clientName));


If above code crashes before committing offset, I still get latest offset
as result of offsets.get(topicAndPartition).offset() in next run which
makes me to think that auto commit of offset happens as code is executed.