You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Sriharsha Chintalapani (JIRA)" <ji...@apache.org> on 2015/01/15 17:22:35 UTC
[jira] [Commented] (KAFKA-1461) Replica fetcher thread does not
implement any back-off behavior
[ https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14278895#comment-14278895 ]
Sriharsha Chintalapani commented on KAFKA-1461:
-----------------------------------------------
[~guozhang] I had the following code in my mind about backoff retries incase of any error. This code will be under ReplicaFetcherThread.handlePartitions.
I am thinking off maintaining two maps in ReplicaFetcherThread
private val partitionsWithErrorStandbyMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset
private val partitionsWithErrorMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> timestamp
one for offset and one for timestamp.
remove the partitions from the AbstractFetcherThread.partitionsMap and add back to the map once the currentTime > partitionsWithErrorMap.timestamp + replicaFetcherRetryBackoffMs .
I am not quite sure about maintaining these two maps . If its look ok to you , I'll send a patch or if you have any other approach please let me know.
```code
def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
//add to the partitionsWithErrorMap with currentTime.
for (partition <- partitions) {
if(!partitionsWithErrorMap.contains(partition)) {
partitionsWithErrorMap.put(partition, System.currentTimeMillis())
currentOffset(partition) match {
case Some(offset: Long) => partitionsWithErrorStandbyMap.put(partition, offset)
}
}
}
removePartitions(partitions.toSet)
val partitionsToBeAdded = new mutable.HashMap[TopicAndPartition, Long]
// process partitionsWithErrorMap and add partitions back if the backoff time elapsed.
partitionsWithErrorMap.foreach {
case((topicAndPartition, timeMs)) =>
if(System.currentTimeMillis() > timeMs + brokerConfig.replicaFetcherRetryBackoffMs) {
partitionsWithErrorStandbyMap.get(topicAndPartition) match {
case Some(offset: Long) => partitionsToBeAdded.put(topicAndPartition, offset)
}
partitionsWithErrorStandbyMap.remove(topicAndPartition)
}
}
addPartitions(partitionsToBeAdded)
}
```
> Replica fetcher thread does not implement any back-off behavior
> ---------------------------------------------------------------
>
> Key: KAFKA-1461
> URL: https://issues.apache.org/jira/browse/KAFKA-1461
> Project: Kafka
> Issue Type: Improvement
> Components: replication
> Affects Versions: 0.8.1.1
> Reporter: Sam Meder
> Assignee: Sriharsha Chintalapani
> Labels: newbie++
> Fix For: 0.8.3
>
>
> The current replica fetcher thread will retry in a tight loop if any error occurs during the fetch call. For example, we've seen cases where the fetch continuously throws a connection refused exception leading to several replica fetcher threads that spin in a pretty tight loop.
> To a much lesser degree this is also an issue in the consumer fetcher thread, although the fact that erroring partitions are removed so a leader can be re-discovered helps some.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)