You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ben Stopford (JIRA)" <ji...@apache.org> on 2017/01/18 18:26:26 UTC

[jira] [Work started] (KAFKA-4596) KIP-73 rebalance throttling breaks on plans for specific partitions

     [ https://issues.apache.org/jira/browse/KAFKA-4596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Work on KAFKA-4596 started by Ben Stopford.
-------------------------------------------
> KIP-73 rebalance throttling breaks on plans for specific partitions
> -------------------------------------------------------------------
>
>                 Key: KAFKA-4596
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4596
>             Project: Kafka
>          Issue Type: Bug
>         Environment: Kafka 0.10.1.1
>            Reporter: Tom Crayford
>            Assignee: Ben Stopford
>             Fix For: 0.10.2.0
>
>
> The reassign-partitions.sh command fails if you both *throttle* and give it a specific partition reassignment. For example, upon reassigning {code}__consumer_offsets{code} partition 19, you get the following error:
> {code}
> Save this to use as the --reassignment-json-file option during rollback
> Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value.
> The throttle limit was set to 1048576 B/s
> Partitions reassignment failed due to key not found: [__consumer_offsets,30]
> java.util.NoSuchElementException: key not found: [__consumer_offsets,30]
>         at scala.collection.MapLike$class.default(MapLike.scala:228)
>         at scala.collection.AbstractMap.default(Map.scala:59)
>         at scala.collection.MapLike$class.apply(MapLike.scala:141)
>         at scala.collection.AbstractMap.apply(Map.scala:59)
>         at kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37
> 2)
>         at kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37
> 1)
>         at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>         at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>         at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
>         at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
>         at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
>         at kafka.admin.ReassignPartitionsCommand.kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions(ReassignPartitionsCommand.scala:371)
>         at kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:347)
>         at kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:343)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at kafka.admin.ReassignPartitionsCommand.assignThrottledReplicas(ReassignPartitionsCommand.scala:343)
>         at kafka.admin.ReassignPartitionsCommand.maybeThrottle(ReassignPartitionsCommand.scala:317)
>         at kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:387)
>         at kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:165)
>         at kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:149)
>         at kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:46)
>         at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)
> {code}
> This effectively breaks the throttling feature unless you want to rebalance many many partitions at once.
> For reference the command that was run is:
> {code}
> kafka-reassign-partitions.sh --reassignment-json-file 9b137f13_7e91_4757_a7b9_554ff458e7df.3d4269d5-e829
> -4e86-9d05-91e9e2fcb7e7.reassignment_plan.json --throttle 1048576'
> {code}
> and the contents of the plan file is:
> {code}
> {"version":1,"partitions":[{"topic":"__consumer_offsets","partition":19,"replicas":[2,1,0]}
> {code}
> This seems like a simple logic error to me, where we're trying to look up a partition that's not been proposed, when we should not be. It looks like the logic assumes that {code}Map.apply{code} doesn't error if the lookup value isn't found, when in fact it does.
> I checked that this cluster does indeed have the __consumer_offsets topic populated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)