You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/04/09 19:11:33 UTC

samza git commit: SAMZA-920: BrokerProxy.abdicateAll can get stuck on adding and removing the same partitions infinitely

Repository: samza
Updated Branches:
  refs/heads/master 2a531b0bb -> 57aae364b


SAMZA-920: BrokerProxy.abdicateAll can get stuck on adding and removing the same partitions infinitely


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/57aae364
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/57aae364
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/57aae364

Branch: refs/heads/master
Commit: 57aae364bb919afcf25164e0204d3a776c779411
Parents: 2a531b0
Author: Ivan Simoneko <si...@gmail.com>
Authored: Sat Apr 9 10:11:02 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Sat Apr 9 10:11:02 2016 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/system/kafka/BrokerProxy.scala    | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/57aae364/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 9aa9818..cbb8881 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -214,7 +214,8 @@ class BrokerProxy(
    * TopicAndPartition.
    */
   def abdicateAll {
-    nextOffsets.keySet.foreach(abdicate(_))
+    val immutableNextOffsetsCopy = nextOffsets.toMap
+    immutableNextOffsetsCopy.keySet.foreach(abdicate(_))
   }
 
   def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = {