You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by 杨光 <la...@gmail.com> on 2018/01/16 11:25:15 UTC
consumer.seekToBeginning() will disable "enable.auto.commit"
Hi All,
I'm using kafka Manual Partition Assignment api to read kafka topic.
I found that if i use the "seekToBeginning" method ,the consumer will
not auto commit offset to kafka even if the "enable.auto.commit" is "true".
My code like next:
Properties props = new Properties();
props.put("bootstrap.servers", "host:9092");
props.put("group.id", groupid);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", autooffsetreset);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "aaa");
props.put("sasl.mechanism", "GSSAPI");
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
fromOffsets.put(new TopicPartition(topic, 0), (long) -1);
fromOffsets.put(new TopicPartition(topic, 1), (long) -1);
fromOffsets.put(new TopicPartition(topic, 2), (long) -1);
fromOffsets.put(new TopicPartition(topic, 3), (long) -1);
fromOffsets.put(new TopicPartition(topic, 4), (long) -1);
fromOffsets.put(new TopicPartition(topic, 5), (long) -1);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.assign(fromOffsets.keySet());
consumer.seekToBeginning(fromOffsets.keySet());
int i = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
if (records.isEmpty()) {
System.out.println("i is " + i);
consumer.commitSync();
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (ConsumerRecord<String, String> record : records) {
i++;
}
.......
Is there something i should know to avoid this problem ?
Thanks!