You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/08 00:20:21 UTC
[kafka] branch trunk updated: KAFKA-6519;
Reduce log level for normal replica fetch errors (#4501)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 68ec1a3 KAFKA-6519; Reduce log level for normal replica fetch errors (#4501)
68ec1a3 is described below
commit 68ec1a3cce22706faf762de7d2d97b490c47fd90
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Feb 7 16:20:18 2018 -0800
KAFKA-6519; Reduce log level for normal replica fetch errors (#4501)
Out of range and not leader errors are common in replica fetchers and not necessarily an indication of a problem. This patch therefore reduces the log level for log messages corresponding to these errors from `ERROR` to `INFO`. Additionally, this patch removes some redundant information in the log message which is already present in the log context.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../kafka/consumer/ConsumerFetcherManager.scala | 4 +--
.../kafka/consumer/ConsumerFetcherThread.scala | 8 ++++--
.../scala/kafka/server/AbstractFetcherThread.scala | 33 +++++++++++++---------
3 files changed, 26 insertions(+), 19 deletions(-)
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 23f5356..e84472f 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -114,9 +114,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
}
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
- new ConsumerFetcherThread(
- "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
- config, sourceBroker, partitionMap, this)
+ new ConsumerFetcherThread(consumerIdString, fetcherId, config, sourceBroker, partitionMap, this)
}
def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 705dc24..ac83fa1 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -34,12 +34,13 @@ import org.apache.kafka.common.requests.EpochEndOffset
@deprecated("This class has been deprecated and will be removed in a future release. " +
"Please use org.apache.kafka.clients.consumer.internals.Fetcher instead.", "0.11.0.0")
-class ConsumerFetcherThread(name: String,
+class ConsumerFetcherThread(consumerIdString: String,
+ fetcherId: Int,
val config: ConsumerConfig,
sourceBroker: BrokerEndPoint,
partitionMap: Map[TopicPartition, PartitionTopicInfo],
val consumerFetcherManager: ConsumerFetcherManager)
- extends AbstractFetcherThread(name = name,
+ extends AbstractFetcherThread(name = s"ConsumerFetcherThread-$consumerIdString-$fetcherId-${sourceBroker.id}",
clientId = config.clientId,
sourceBroker = sourceBroker,
fetchBackOffMs = config.refreshLeaderBackoffMs,
@@ -49,6 +50,9 @@ class ConsumerFetcherThread(name: String,
type REQ = FetchRequest
type PD = PartitionData
+ this.logIdent = s"[ConsumerFetcher consumerId=$consumerIdString, leaderId=${sourceBroker.id}, " +
+ s"fetcherId=$fetcherId] "
+
private val clientId = config.clientId
private val fetchSize = config.fetchMessageMaxBytes
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 39a7032..8d787c9 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -47,8 +47,7 @@ abstract class AbstractFetcherThread(name: String,
val sourceBroker: BrokerEndPoint,
fetchBackOffMs: Int = 0,
isInterruptible: Boolean = true,
- includeLogTruncation: Boolean
- )
+ includeLogTruncation: Boolean)
extends ShutdownableThread(name, isInterruptible) {
type REQ <: FetchRequest
@@ -140,16 +139,15 @@ abstract class AbstractFetcherThread(name: String,
private def processFetchRequest(fetchRequest: REQ) {
val partitionsWithError = mutable.Set[TopicPartition]()
-
var responseData: Seq[(TopicPartition, PD)] = Seq.empty
try {
- trace(s"Issuing fetch to broker ${sourceBroker.id}, request: $fetchRequest")
+ trace(s"Sending fetch request $fetchRequest")
responseData = fetch(fetchRequest)
} catch {
case t: Throwable =>
if (isRunning) {
- warn(s"Error in fetch to broker ${sourceBroker.id}, request $fetchRequest", t)
+ warn(s"Error in response for fetch request $fetchRequest", t)
inLock(partitionMapLock) {
partitionsWithError ++= partitionStates.partitionSet.asScala
// there is an error occurred while fetching partitions, sleep a while
@@ -210,27 +208,34 @@ abstract class AbstractFetcherThread(name: String,
try {
val newOffset = handleOffsetOutOfRange(topicPartition)
partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
- error(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition out of range; reset offset to $newOffset")
+ info(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " +
+ s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset")
} catch {
case e: FatalExitError => throw e
case e: Throwable =>
- error(s"Error getting offset for partition $topicPartition from broker ${sourceBroker.id}", e)
+ error(s"Error getting offset for partition $topicPartition", e)
partitionsWithError += topicPartition
}
+
+ case Errors.NOT_LEADER_FOR_PARTITION =>
+ info(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
+ "that the partition is being moved")
+ partitionsWithError += topicPartition
+
case _ =>
- if (isRunning) {
- error(s"Error for partition $topicPartition from broker ${sourceBroker.id}", partitionData.exception.get)
- partitionsWithError += topicPartition
- }
+ error(s"Error for partition $topicPartition at offset ${currentPartitionFetchState.fetchOffset}",
+ partitionData.exception.get)
+ partitionsWithError += topicPartition
}
})
}
}
}
- if (partitionsWithError.nonEmpty)
- debug(s"handling partitions with error for $partitionsWithError")
- handlePartitionsWithErrors(partitionsWithError)
+ if (partitionsWithError.nonEmpty) {
+ debug(s"Handling errors for partitions $partitionsWithError")
+ handlePartitionsWithErrors(partitionsWithError)
+ }
}
def markPartitionsForTruncation(topicPartition: TopicPartition, truncationOffset: Long) {
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.