You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/04/09 19:11:33 UTC
samza git commit: SAMZA-920: BrokerProxy.abdicateAll can get stuck on
adding and removing the same partitions infinitely
Repository: samza
Updated Branches:
refs/heads/master 2a531b0bb -> 57aae364b
SAMZA-920: BrokerProxy.abdicateAll can get stuck on adding and removing the same partitions infinitely
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/57aae364
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/57aae364
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/57aae364
Branch: refs/heads/master
Commit: 57aae364bb919afcf25164e0204d3a776c779411
Parents: 2a531b0
Author: Ivan Simoneko <si...@gmail.com>
Authored: Sat Apr 9 10:11:02 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Sat Apr 9 10:11:02 2016 -0700
----------------------------------------------------------------------
.../main/scala/org/apache/samza/system/kafka/BrokerProxy.scala | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/57aae364/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 9aa9818..cbb8881 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -214,7 +214,8 @@ class BrokerProxy(
* TopicAndPartition.
*/
def abdicateAll {
- nextOffsets.keySet.foreach(abdicate(_))
+ val immutableNextOffsetsCopy = nextOffsets.toMap
+ immutableNextOffsetsCopy.keySet.foreach(abdicate(_))
}
def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = {