You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/08 00:20:21 UTC

[kafka] branch trunk updated: KAFKA-6519; Reduce log level for normal replica fetch errors (#4501)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 68ec1a3  KAFKA-6519; Reduce log level for normal replica fetch errors (#4501)
68ec1a3 is described below

commit 68ec1a3cce22706faf762de7d2d97b490c47fd90
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Feb 7 16:20:18 2018 -0800

    KAFKA-6519; Reduce log level for normal replica fetch errors (#4501)
    
    Out of range and not leader errors are common in replica fetchers and not necessarily an indication of a problem. This patch therefore reduces the log level for log messages corresponding to these errors from `ERROR` to `INFO`. Additionally, this patch removes some redundant information in the log message which is already present in the log context.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../kafka/consumer/ConsumerFetcherManager.scala    |  4 +--
 .../kafka/consumer/ConsumerFetcherThread.scala     |  8 ++++--
 .../scala/kafka/server/AbstractFetcherThread.scala | 33 +++++++++++++---------
 3 files changed, 26 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 23f5356..e84472f 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -114,9 +114,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
   }
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
-    new ConsumerFetcherThread(
-      "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
-      config, sourceBroker, partitionMap, this)
+    new ConsumerFetcherThread(consumerIdString, fetcherId, config, sourceBroker, partitionMap, this)
   }
 
   def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 705dc24..ac83fa1 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -34,12 +34,13 @@ import org.apache.kafka.common.requests.EpochEndOffset
 
 @deprecated("This class has been deprecated and will be removed in a future release. " +
             "Please use org.apache.kafka.clients.consumer.internals.Fetcher instead.", "0.11.0.0")
-class ConsumerFetcherThread(name: String,
+class ConsumerFetcherThread(consumerIdString: String,
+                            fetcherId: Int,
                             val config: ConsumerConfig,
                             sourceBroker: BrokerEndPoint,
                             partitionMap: Map[TopicPartition, PartitionTopicInfo],
                             val consumerFetcherManager: ConsumerFetcherManager)
-        extends AbstractFetcherThread(name = name,
+        extends AbstractFetcherThread(name = s"ConsumerFetcherThread-$consumerIdString-$fetcherId-${sourceBroker.id}",
                                       clientId = config.clientId,
                                       sourceBroker = sourceBroker,
                                       fetchBackOffMs = config.refreshLeaderBackoffMs,
@@ -49,6 +50,9 @@ class ConsumerFetcherThread(name: String,
   type REQ = FetchRequest
   type PD = PartitionData
 
+  this.logIdent = s"[ConsumerFetcher consumerId=$consumerIdString, leaderId=${sourceBroker.id}, " +
+    s"fetcherId=$fetcherId] "
+
   private val clientId = config.clientId
   private val fetchSize = config.fetchMessageMaxBytes
 
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 39a7032..8d787c9 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -47,8 +47,7 @@ abstract class AbstractFetcherThread(name: String,
                                      val sourceBroker: BrokerEndPoint,
                                      fetchBackOffMs: Int = 0,
                                      isInterruptible: Boolean = true,
-                                     includeLogTruncation: Boolean
-                                    )
+                                     includeLogTruncation: Boolean)
   extends ShutdownableThread(name, isInterruptible) {
 
   type REQ <: FetchRequest
@@ -140,16 +139,15 @@ abstract class AbstractFetcherThread(name: String,
 
   private def processFetchRequest(fetchRequest: REQ) {
     val partitionsWithError = mutable.Set[TopicPartition]()
-
     var responseData: Seq[(TopicPartition, PD)] = Seq.empty
 
     try {
-      trace(s"Issuing fetch to broker ${sourceBroker.id}, request: $fetchRequest")
+      trace(s"Sending fetch request $fetchRequest")
       responseData = fetch(fetchRequest)
     } catch {
       case t: Throwable =>
         if (isRunning) {
-          warn(s"Error in fetch to broker ${sourceBroker.id}, request $fetchRequest", t)
+          warn(s"Error in response for fetch request $fetchRequest", t)
           inLock(partitionMapLock) {
             partitionsWithError ++= partitionStates.partitionSet.asScala
             // there is an error occurred while fetching partitions, sleep a while
@@ -210,27 +208,34 @@ abstract class AbstractFetcherThread(name: String,
                   try {
                     val newOffset = handleOffsetOutOfRange(topicPartition)
                     partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
-                    error(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition out of range; reset offset to $newOffset")
+                    info(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " +
+                      s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset")
                   } catch {
                     case e: FatalExitError => throw e
                     case e: Throwable =>
-                      error(s"Error getting offset for partition $topicPartition from broker ${sourceBroker.id}", e)
+                      error(s"Error getting offset for partition $topicPartition", e)
                       partitionsWithError += topicPartition
                   }
+
+                case Errors.NOT_LEADER_FOR_PARTITION =>
+                  info(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
+                    "that the partition is being moved")
+                  partitionsWithError += topicPartition
+
                 case _ =>
-                  if (isRunning) {
-                    error(s"Error for partition $topicPartition from broker ${sourceBroker.id}", partitionData.exception.get)
-                    partitionsWithError += topicPartition
-                  }
+                  error(s"Error for partition $topicPartition at offset ${currentPartitionFetchState.fetchOffset}",
+                    partitionData.exception.get)
+                  partitionsWithError += topicPartition
               }
             })
         }
       }
     }
 
-    if (partitionsWithError.nonEmpty)
-      debug(s"handling partitions with error for $partitionsWithError")
-    handlePartitionsWithErrors(partitionsWithError)
+    if (partitionsWithError.nonEmpty) {
+      debug(s"Handling errors for partitions $partitionsWithError")
+      handlePartitionsWithErrors(partitionsWithError)
+    }
   }
 
   def markPartitionsForTruncation(topicPartition: TopicPartition, truncationOffset: Long) {

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.