You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 孙森 <se...@creditease.cn> on 2018/12/05 02:47:48 UTC

Re:kafka connector[specificStartOffset cannot auto commit offset to zookeeper]

HI,all:
        I specify the exact offsets the consumer should start from for each partition.But the Kafka consumer connot periodically commit the offsets to Zookeeper.
I have disabled the checkpoint only if the job is stopped.This is my code:



val properties = new Properties()
properties.setProperty("bootstrap.servers", config.kafka_input.kafka_base_config.brokers)
properties.setProperty("zookeeper.connect", config.zookeeper_address)
properties.setProperty("group.id<http://group.id>", config.kafka_input.groupId)
properties.setProperty("session.timeout.ms", config.kafka_input.sessionTimeout)
properties.setProperty("enable.auto.commit", config.kafka_input.autoCommit.toString)
val flinkxConfigUtils = new WormholeFlinkxConfigUtils(config)
val topics = flinkxConfigUtils.getKafkaTopicList
val myConsumer = new FlinkKafkaConsumer010[(String, String, String, Int, Long)](topics, new WormholeDeserializationStringSchema, properties)

val specificStartOffsets = flinkxConfigUtils.getTopicPartitionOffsetMap
myConsumer.setStartFromSpecificOffsets(specificStartOffsets)


Can anyone explain the problem?
Thanks very much!