You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Manikumar (JIRA)" <ji...@apache.org> on 2018/04/12 15:11:02 UTC

[jira] [Resolved] (KAFKA-2284) ConsumerRebalanceListener receives wrong type in partitionOwnership values

     [ https://issues.apache.org/jira/browse/KAFKA-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Manikumar resolved KAFKA-2284.
------------------------------
    Resolution: Auto Closed

The Scala consumers have been deprecated and no further work is planned, please upgrade to the Java consumer whenever possible.

> ConsumerRebalanceListener receives wrong type in partitionOwnership values
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-2284
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2284
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8.2.0
>            Reporter: E. Sammer
>            Assignee: Neha Narkhede
>            Priority: Major
>
> The ConsumerRebalanceListener's beforeReleasingPartitions() method is supposed to receive an arg of Map<String, Set<Integer>> (topic -> Set(partitions)). Even though the type of the map value is specified as a java.util.Set, a scala.collection.convert.Wrappers$JSetWrapper is passed instead which does not implement Set<T> causing a class cast exception as soon as one attempts to access any value of the map. It looks as if this method was never tested against the actual types specified by the interface.
> Here's what happens if you call {{Set<T> foo = partitionOwnership.get(topic)}}:
> {code}
> 2015-06-18 07:28:43,776 (search-consumer_esammer-mbp.local-1434637723383-12126c1b_watcher_executor) [WARN - com.rocana.search.consumer.ConsumerService$1.beforeReleasingPartitions(ConsumerService.java:246)] Exception while rebalancing!
> java.lang.ClassCastException: scala.collection.convert.Wrappers$JSetWrapper cannot be cast to java.util.Set
> 	at com.rocana.search.consumer.IndexConsumerWorker.onRebalance(IndexConsumerWorker.java:80)
> 	at com.rocana.search.consumer.ConsumerService$1.beforeReleasingPartitions(ConsumerService.java:244)
> 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:675)
> 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:625)
> 	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:142)
> 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:619)
> 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616)
> 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616)
> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:615)
> 	at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:568)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)