You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/11/04 01:10:58 UTC

[jira] [Commented] (KAFKA-2284) ConsumerRebalanceListener receives wrong type in partitionOwnership values

    [ https://issues.apache.org/jira/browse/KAFKA-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15634847#comment-15634847 ] 

ASF GitHub Bot commented on KAFKA-2284:
---------------------------------------

GitHub user leachbj opened a pull request:

    https://github.com/apache/kafka/pull/2102

    KAFKA-2284: corrects value type in beforeReleasingPartitions

    Previously the values in the map were scala Set values, now they
    are correctly java.util.Set as advertised by API.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/leachbj/kafka 2284-java-set-beforeReleasingPartitions

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/2102.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2102
    
----
commit 3f249614da15cc9239b4aec55f633a1f146359ea
Author: Bernard Leach <le...@bouncycastle.org>
Date:   2016-11-04T01:05:15Z

    KAFKA-2284: corrects value type in beforeReleasingPartitions
    
    Previously the values in the map were scala Set values, now they
    are correctly java.util.Set as advertised by API.

----


> ConsumerRebalanceListener receives wrong type in partitionOwnership values
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-2284
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2284
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8.2.0
>            Reporter: E. Sammer
>            Assignee: Neha Narkhede
>
> The ConsumerRebalanceListener's beforeReleasingPartitions() method is supposed to receive an arg of Map<String, Set<Integer>> (topic -> Set(partitions)). Even though the type of the map value is specified as a java.util.Set, a scala.collection.convert.Wrappers$JSetWrapper is passed instead which does not implement Set<T> causing a class cast exception as soon as one attempts to access any value of the map. It looks as if this method was never tested against the actual types specified by the interface.
> Here's what happens if you call {{Set<T> foo = partitionOwnership.get(topic)}}:
> {code}
> 2015-06-18 07:28:43,776 (search-consumer_esammer-mbp.local-1434637723383-12126c1b_watcher_executor) [WARN - com.rocana.search.consumer.ConsumerService$1.beforeReleasingPartitions(ConsumerService.java:246)] Exception while rebalancing!
> java.lang.ClassCastException: scala.collection.convert.Wrappers$JSetWrapper cannot be cast to java.util.Set
> 	at com.rocana.search.consumer.IndexConsumerWorker.onRebalance(IndexConsumerWorker.java:80)
> 	at com.rocana.search.consumer.ConsumerService$1.beforeReleasingPartitions(ConsumerService.java:244)
> 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:675)
> 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:625)
> 	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:142)
> 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:619)
> 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616)
> 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616)
> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:615)
> 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:568)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)