You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Ivan Simoneko (JIRA)" <ji...@apache.org> on 2016/03/29 15:22:25 UTC

[jira] [Commented] (SAMZA-920) BrokerProxy.abdicateAll can get stuck on adding and removing the same partitions infinitely

    [ https://issues.apache.org/jira/browse/SAMZA-920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15215992#comment-15215992 ] 

Ivan Simoneko commented on SAMZA-920:
-------------------------------------

In {{BrokerProxy.abdicateAll}} we iterate over all entries of map {{nextOffsets}} and on each iteration, we remove this entry from map and at the same time add it back in {{KafkaSystemConsumer.refreshBrokers}} as current BrokerProxy is still the leader for it. With ConcurrentHashMap it is not guaranteed if iterator will see newly added items or not as we see from logs, sometimes it happens in this case current BrokerProxy thread is stuck removing and adding back the same topic-partitions never ending iteration over {{nextOffsets}}.

To fix it I suggest iterate over an immutable copy of {{nextOffsets}}.

> BrokerProxy.abdicateAll can get stuck on adding and removing the same partitions infinitely
> -------------------------------------------------------------------------------------------
>
>                 Key: SAMZA-920
>                 URL: https://issues.apache.org/jira/browse/SAMZA-920
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.9.1, 0.10.1
>            Reporter: Ivan Simoneko
>              Labels: easyfix
>         Attachments: SAMZA-920_v1.patch
>
>
> abdicateAll is iterating over ConcurrentHashMap nextOffsets, removing and adding back elements which can result in an infinite iteration.
> {code}
> 2016-03-20 20:25:41,413 INFO  [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.KafkaSystemConsumer - Abdicating for [<TOPIC_PARTITION_1>].
> 2016-03-20 20:25:41,414 INFO  [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.KafkaSystemConsumer - Refreshing brokers for: Map([<TOPIC_PARTITION_1>] -> 20749911)
> 2016-03-20 20:25:41,414 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.BrokerProxy - Adding new topic and partition [<TOPIC_PARTITION_1>] to queue for <HOST>
> 2016-03-20 20:25:41,414 INFO  [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.GetOffset - Validating offset 20749911 for topic and partition [<TOPIC_PARTITION_1>]
> 2016-03-20 20:25:41,428 INFO  [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.GetOffset - Able to successfully read from offset 20749911 for topic and partition [<TOPIC_PARTITION_1>]. Using it to instantiate consumer.
> 2016-03-20 20:25:41,428 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.BrokerProxy - Got offset 20749911 for new topic and partition [<TOPIC_PARTITION_1>].
> 2016-03-20 20:25:41,428 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.BrokerProxy - Tried to start an already started broker proxy (BrokerProxy for <HOST>:<PORT>). Ignoring.
> 2016-03-20 20:25:41,429 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.KafkaSystemConsumer - Claimed topic-partition ([<TOPIC_PARTITION_1>]) for (BrokerProxy for <HOST>:<PORT>)
> 2016-03-20 20:25:41,429 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.BrokerProxy - Removed [<TOPIC_PARTITION_2>]
> 2016-03-20 20:25:41,429 INFO  [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.KafkaSystemConsumer - Abdicating for [<TOPIC_PARTITION_2>].
> 2016-03-20 20:25:41,429 INFO  [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.KafkaSystemConsumer - Refreshing brokers for: Map([<TOPIC_PARTITION_2>] -> 20749909)
> 2016-03-20 20:25:41,429 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.BrokerProxy - Adding new topic and partition [<TOPIC_PARTITION_2>] to queue for <HOST>
> 2016-03-20 20:25:41,429 INFO  [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.GetOffset - Validating offset 20749909 for topic and partition [<TOPIC_PARTITION_2>]
> 2016-03-20 20:25:41,444 INFO  [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.GetOffset - Able to successfully read from offset 20749909 for topic and partition [<TOPIC_PARTITION_2>]. Using it to instantiate consumer.
> 2016-03-20 20:25:41,444 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.BrokerProxy - Got offset 20749909 for new topic and partition [<TOPIC_PARTITION_2>].
> 2016-03-20 20:25:41,444 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.BrokerProxy - Tried to start an already started broker proxy (BrokerProxy for <HOST>:<PORT>). Ignoring.
> 2016-03-20 20:25:41,444 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.KafkaSystemConsumer - Claimed topic-partition ([<TOPIC_PARTITION_2>]) for (BrokerProxy for <HOST>:<PORT>)
> 2016-03-20 20:25:41,444 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.BrokerProxy - Removed [<TOPIC_PARTITION_1>]
> 2016-03-20 20:25:41,444 INFO  [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.KafkaSystemConsumer - Abdicating for [<TOPIC_PARTITION_1>].
> 2016-03-20 20:25:41,444 INFO  [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.KafkaSystemConsumer - Refreshing brokers for: Map([<TOPIC_PARTITION_1>] -> 20749911)
> 2016-03-20 20:25:41,445 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.BrokerProxy - Adding new topic and partition [<TOPIC_PARTITION_1>] to queue for <HOST>
> 2016-03-20 20:25:41,445 INFO  [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.GetOffset - Validating offset 20749911 for topic and partition [<TOPIC_PARTITION_1>]
> 2016-03-20 20:25:41,460 INFO  [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.GetOffset - Able to successfully read from offset 20749911 for topic and partition [<TOPIC_PARTITION_1>]. Using it to instantiate consumer.
> 2016-03-20 20:25:41,460 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.BrokerProxy - Got offset 20749911 for new topic and partition [<TOPIC_PARTITION_1>].
> 2016-03-20 20:25:41,460 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.BrokerProxy - Tried to start an already started broker proxy (BrokerProxy for <HOST>:<PORT>). Ignoring.
> 2016-03-20 20:25:41,460 DEBUG [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at <HOST>:<PORT> for client <CLIENT_ID>] org.apache.samza.system.kafka.KafkaSystemConsumer - Claimed topic-partition ([<TOPIC_PARTITION_1>]) for (BrokerProxy for <HOST>:<PORT>)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)