You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Sriharsha Chintalapani (JIRA)" <ji...@apache.org> on 2015/01/15 17:24:35 UTC

[jira] [Comment Edited] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior

    [ https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14278895#comment-14278895 ] 

Sriharsha Chintalapani edited comment on KAFKA-1461 at 1/15/15 4:24 PM:
------------------------------------------------------------------------

[~guozhang] I had the following code in my mind about backoff retries incase of any error. This code will be under ReplicaFetcherThread.handlePartitions.
I am thinking off maintaining two maps in ReplicaFetcherThread
  private val partitionsWithErrorStandbyMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset
  private val partitionsWithErrorMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> timestamp
one for offset and one for timestamp.
remove the partitions from the AbstractFetcherThread.partitionsMap and add back to the map once the currentTime > partitionsWithErrorMap.timestamp + replicaFetcherRetryBackoffMs .
I am not quite sure about maintaining these two maps . If its look ok to you , I'll send a patch or if you have any other approach please let me know. 

{code}
  def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {

    //add to the partitionsWithErrorMap with currentTime.
    for (partition <- partitions) {
      if(!partitionsWithErrorMap.contains(partition)) {
        partitionsWithErrorMap.put(partition, System.currentTimeMillis())
        currentOffset(partition) match {
          case Some(offset: Long) =>  partitionsWithErrorStandbyMap.put(partition, offset)
        }
      }
    }
    removePartitions(partitions.toSet)
    val partitionsToBeAdded = new mutable.HashMap[TopicAndPartition, Long]
    // process partitionsWithErrorMap and add partitions back if the backoff time elapsed.
    partitionsWithErrorMap.foreach {
      case((topicAndPartition, timeMs)) =>
        if(System.currentTimeMillis() > timeMs + brokerConfig.replicaFetcherRetryBackoffMs) {
          partitionsWithErrorStandbyMap.get(topicAndPartition) match {
            case Some(offset: Long) => partitionsToBeAdded.put(topicAndPartition, offset)
          }
          partitionsWithErrorStandbyMap.remove(topicAndPartition)
        }
    }
    addPartitions(partitionsToBeAdded)
  }
{code}


was (Author: sriharsha):
[~guozhang] I had the following code in my mind about backoff retries incase of any error. This code will be under ReplicaFetcherThread.handlePartitions.
I am thinking off maintaining two maps in ReplicaFetcherThread
  private val partitionsWithErrorStandbyMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset
  private val partitionsWithErrorMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> timestamp
one for offset and one for timestamp.
remove the partitions from the AbstractFetcherThread.partitionsMap and add back to the map once the currentTime > partitionsWithErrorMap.timestamp + replicaFetcherRetryBackoffMs .
I am not quite sure about maintaining these two maps . If its look ok to you , I'll send a patch or if you have any other approach please let me know. 

```code
  def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {

    //add to the partitionsWithErrorMap with currentTime.
    for (partition <- partitions) {
      if(!partitionsWithErrorMap.contains(partition)) {
        partitionsWithErrorMap.put(partition, System.currentTimeMillis())
        currentOffset(partition) match {
          case Some(offset: Long) =>  partitionsWithErrorStandbyMap.put(partition, offset)
        }
      }
    }
    removePartitions(partitions.toSet)
    val partitionsToBeAdded = new mutable.HashMap[TopicAndPartition, Long]
    // process partitionsWithErrorMap and add partitions back if the backoff time elapsed.
    partitionsWithErrorMap.foreach {
      case((topicAndPartition, timeMs)) =>
        if(System.currentTimeMillis() > timeMs + brokerConfig.replicaFetcherRetryBackoffMs) {
          partitionsWithErrorStandbyMap.get(topicAndPartition) match {
            case Some(offset: Long) => partitionsToBeAdded.put(topicAndPartition, offset)
          }
          partitionsWithErrorStandbyMap.remove(topicAndPartition)
        }
    }
    addPartitions(partitionsToBeAdded)
  }
```

> Replica fetcher thread does not implement any back-off behavior
> ---------------------------------------------------------------
>
>                 Key: KAFKA-1461
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1461
>             Project: Kafka
>          Issue Type: Improvement
>          Components: replication
>    Affects Versions: 0.8.1.1
>            Reporter: Sam Meder
>            Assignee: Sriharsha Chintalapani
>              Labels: newbie++
>             Fix For: 0.8.3
>
>
> The current replica fetcher thread will retry in a tight loop if any error occurs during the fetch call. For example, we've seen cases where the fetch continuously throws a connection refused exception leading to several replica fetcher threads that spin in a pretty tight loop.
> To a much lesser degree this is also an issue in the consumer fetcher thread, although the fact that erroring partitions are removed so a leader can be re-discovered helps some.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)