You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/08 00:22:00 UTC

[jira] [Commented] (KAFKA-6519) Change log level from ERROR to WARN for not leader for this partition exception

    [ https://issues.apache.org/jira/browse/KAFKA-6519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356277#comment-16356277 ] 

ASF GitHub Bot commented on KAFKA-6519:
---------------------------------------

hachikuji closed pull request #4501: KAFKA-6519: Reduce log level for normal replica fetch errors
URL: https://github.com/apache/kafka/pull/4501
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 23f53569a6b..e84472f06bb 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -114,9 +114,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
   }
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
-    new ConsumerFetcherThread(
-      "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
-      config, sourceBroker, partitionMap, this)
+    new ConsumerFetcherThread(consumerIdString, fetcherId, config, sourceBroker, partitionMap, this)
   }
 
   def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 705dc249bf3..ac83fa17a76 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -34,12 +34,13 @@ import org.apache.kafka.common.requests.EpochEndOffset
 
 @deprecated("This class has been deprecated and will be removed in a future release. " +
             "Please use org.apache.kafka.clients.consumer.internals.Fetcher instead.", "0.11.0.0")
-class ConsumerFetcherThread(name: String,
+class ConsumerFetcherThread(consumerIdString: String,
+                            fetcherId: Int,
                             val config: ConsumerConfig,
                             sourceBroker: BrokerEndPoint,
                             partitionMap: Map[TopicPartition, PartitionTopicInfo],
                             val consumerFetcherManager: ConsumerFetcherManager)
-        extends AbstractFetcherThread(name = name,
+        extends AbstractFetcherThread(name = s"ConsumerFetcherThread-$consumerIdString-$fetcherId-${sourceBroker.id}",
                                       clientId = config.clientId,
                                       sourceBroker = sourceBroker,
                                       fetchBackOffMs = config.refreshLeaderBackoffMs,
@@ -49,6 +50,9 @@ class ConsumerFetcherThread(name: String,
   type REQ = FetchRequest
   type PD = PartitionData
 
+  this.logIdent = s"[ConsumerFetcher consumerId=$consumerIdString, leaderId=${sourceBroker.id}, " +
+    s"fetcherId=$fetcherId] "
+
   private val clientId = config.clientId
   private val fetchSize = config.fetchMessageMaxBytes
 
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 39a70321a6e..8d787c96da6 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -47,8 +47,7 @@ abstract class AbstractFetcherThread(name: String,
                                      val sourceBroker: BrokerEndPoint,
                                      fetchBackOffMs: Int = 0,
                                      isInterruptible: Boolean = true,
-                                     includeLogTruncation: Boolean
-                                    )
+                                     includeLogTruncation: Boolean)
   extends ShutdownableThread(name, isInterruptible) {
 
   type REQ <: FetchRequest
@@ -140,16 +139,15 @@ abstract class AbstractFetcherThread(name: String,
 
   private def processFetchRequest(fetchRequest: REQ) {
     val partitionsWithError = mutable.Set[TopicPartition]()
-
     var responseData: Seq[(TopicPartition, PD)] = Seq.empty
 
     try {
-      trace(s"Issuing fetch to broker ${sourceBroker.id}, request: $fetchRequest")
+      trace(s"Sending fetch request $fetchRequest")
       responseData = fetch(fetchRequest)
     } catch {
       case t: Throwable =>
         if (isRunning) {
-          warn(s"Error in fetch to broker ${sourceBroker.id}, request $fetchRequest", t)
+          warn(s"Error in response for fetch request $fetchRequest", t)
           inLock(partitionMapLock) {
             partitionsWithError ++= partitionStates.partitionSet.asScala
             // there is an error occurred while fetching partitions, sleep a while
@@ -210,27 +208,34 @@ abstract class AbstractFetcherThread(name: String,
                   try {
                     val newOffset = handleOffsetOutOfRange(topicPartition)
                     partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
-                    error(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition out of range; reset offset to $newOffset")
+                    info(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " +
+                      s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset")
                   } catch {
                     case e: FatalExitError => throw e
                     case e: Throwable =>
-                      error(s"Error getting offset for partition $topicPartition from broker ${sourceBroker.id}", e)
+                      error(s"Error getting offset for partition $topicPartition", e)
                       partitionsWithError += topicPartition
                   }
+
+                case Errors.NOT_LEADER_FOR_PARTITION =>
+                  info(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
+                    "that the partition is being moved")
+                  partitionsWithError += topicPartition
+
                 case _ =>
-                  if (isRunning) {
-                    error(s"Error for partition $topicPartition from broker ${sourceBroker.id}", partitionData.exception.get)
-                    partitionsWithError += topicPartition
-                  }
+                  error(s"Error for partition $topicPartition at offset ${currentPartitionFetchState.fetchOffset}",
+                    partitionData.exception.get)
+                  partitionsWithError += topicPartition
               }
             })
         }
       }
     }
 
-    if (partitionsWithError.nonEmpty)
-      debug(s"handling partitions with error for $partitionsWithError")
-    handlePartitionsWithErrors(partitionsWithError)
+    if (partitionsWithError.nonEmpty) {
+      debug(s"Handling errors for partitions $partitionsWithError")
+      handlePartitionsWithErrors(partitionsWithError)
+    }
   }
 
   def markPartitionsForTruncation(topicPartition: TopicPartition, truncationOffset: Long) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Change log level from ERROR to WARN for not leader for this partition exception
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-6519
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6519
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>    Affects Versions: 1.0.0
>            Reporter: Antony Stubbs
>            Assignee: Jason Gustafson
>            Priority: Major
>
> Not the leader for this partition is not an error in operation and is in fact expected and a apart of the partition discovery / movement system. This confuses users because they think something is going wrong. I'd suggest at least changing it to WARN, but perhaps is it even something users should be warned about?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)