You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Manikumar (JIRA)" <ji...@apache.org> on 2017/10/30 12:47:01 UTC
[jira] [Resolved] (KAFKA-2995) in 0.9.0.0 Old Consumer's
commitOffsets with specify partition can submit not exists topic and
partition to zk
[ https://issues.apache.org/jira/browse/KAFKA-2995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Manikumar resolved KAFKA-2995.
------------------------------
Resolution: Auto Closed
Closing inactive issue. The old consumer is no longer supported, please upgrade to the Java consumer whenever possible.
> in 0.9.0.0 Old Consumer's commitOffsets with specify partition can submit not exists topic and partition to zk
> --------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-2995
> URL: https://issues.apache.org/jira/browse/KAFKA-2995
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.9.0.0
> Reporter: Pengwei
> Assignee: Neha Narkhede
>
> in 0.9.0.0 Version, the Old Consumer's commit interface is below:
> def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition, OffsetAndMetadata], isAutoCommit: Boolean) {
> trace("OffsetMap: %s".format(offsetsToCommit))
> var retriesRemaining = 1 + (if (isAutoCommit) 0 else config.offsetsCommitMaxRetries) // no retries for commits from auto-commit
> var done = false
> while (!done) {
> val committed = offsetsChannelLock synchronized {
> // committed when we receive either no error codes or only MetadataTooLarge errors
> if (offsetsToCommit.size > 0) {
> if (config.offsetsStorage == "zookeeper") {
> offsetsToCommit.foreach { case (topicAndPartition, offsetAndMetadata) =>
> commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset)
> }
> ........
> this interface does not check the parameter offsetsToCommit, if offsetsToCommit has some topic or partition which is not exist in the kafka. Then will create an entry in the /consumers/[group]/offsets/[Not exists topic] directory.
> We should check the offsetsToCommit's topic and partition is exists or just check it is contain in the topicRegistry or checkpointedZkOffsets ?
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)