You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Manikumar (JIRA)" <ji...@apache.org> on 2017/10/30 12:47:01 UTC

[jira] [Resolved] (KAFKA-2995) in 0.9.0.0 Old Consumer's commitOffsets with specify partition can submit not exists topic and partition to zk

     [ https://issues.apache.org/jira/browse/KAFKA-2995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Manikumar resolved KAFKA-2995.
------------------------------
    Resolution: Auto Closed

Closing inactive issue. The old consumer is no longer supported, please upgrade to the Java consumer whenever possible.

> in 0.9.0.0 Old Consumer's commitOffsets with specify partition can submit not exists topic and partition to zk
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-2995
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2995
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.0
>            Reporter: Pengwei
>            Assignee: Neha Narkhede
>
> in 0.9.0.0 Version, the Old Consumer's commit interface is below:
> def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition, OffsetAndMetadata], isAutoCommit: Boolean) {
>     trace("OffsetMap: %s".format(offsetsToCommit))
>     var retriesRemaining = 1 + (if (isAutoCommit) 0 else config.offsetsCommitMaxRetries) // no retries for commits from auto-commit
>     var done = false
>     while (!done) {
>       val committed = offsetsChannelLock synchronized {
>         // committed when we receive either no error codes or only MetadataTooLarge errors
>         if (offsetsToCommit.size > 0) {
>           if (config.offsetsStorage == "zookeeper") {
>             offsetsToCommit.foreach { case (topicAndPartition, offsetAndMetadata) =>
>               commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset)
>             }
>       ........
> this interface does not check the parameter offsetsToCommit, if offsetsToCommit has some topic or partition which is not exist in the kafka. Then will create an entry in the  /consumers/[group]/offsets/[Not exists topic]   directory.
> We should check the offsetsToCommit's topic and partition is exists or just check it is contain in the topicRegistry or checkpointedZkOffsets ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)