You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by 杨光 <la...@gmail.com> on 2018/01/16 11:25:15 UTC

consumer.seekToBeginning() will disable "enable.auto.commit"

Hi All,
I'm using  kafka Manual Partition Assignment api to read kafka topic.
I found that  if i use the "seekToBeginning" method ,the consumer will
not auto commit offset to kafka even if  the "enable.auto.commit" is "true".

My code like next:


Properties props = new Properties();

props.put("bootstrap.servers", "host:9092");

props.put("group.id", groupid);

props.put("enable.auto.commit", "true");

props.put("auto.commit.interval.ms", "1000");

props.put("session.timeout.ms", "30000");

props.put("key.deserializer",

"org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer",

"org.apache.kafka.common.serialization.StringDeserializer");

props.put("auto.offset.reset", autooffsetreset);


props.put("security.protocol", "SASL_PLAINTEXT");

props.put("sasl.kerberos.service.name", "aaa");

props.put("sasl.mechanism", "GSSAPI");


Map<TopicPartition, Long> fromOffsets = new HashMap<>();

fromOffsets.put(new TopicPartition(topic, 0), (long) -1);

fromOffsets.put(new TopicPartition(topic, 1), (long) -1);

fromOffsets.put(new TopicPartition(topic, 2), (long) -1);

fromOffsets.put(new TopicPartition(topic, 3), (long) -1);

fromOffsets.put(new TopicPartition(topic, 4), (long) -1);

fromOffsets.put(new TopicPartition(topic, 5), (long) -1);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);


consumer.assign(fromOffsets.keySet());

consumer.seekToBeginning(fromOffsets.keySet());


int i = 0;


while (true) {


ConsumerRecords<String, String> records = consumer.poll(1000);


if (records.isEmpty()) {

System.out.println("i is " + i);

consumer.commitSync();


try {

Thread.sleep(10 * 1000);

} catch (InterruptedException e) {


e.printStackTrace();

}

}

for (ConsumerRecord<String, String> record : records) {

i++;

}
.......



Is there something i should know  to avoid this problem ?
Thanks!