You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/05/22 15:08:29 UTC
kafka git commit: KAFKA-5303,
KAFKA-5305: Improve logging when fetches fail in ReplicaFetcherThread
Repository: kafka
Updated Branches:
refs/heads/trunk ceb10c533 -> c2ced5fb5
KAFKA-5303, KAFKA-5305: Improve logging when fetches fail in ReplicaFetcherThread
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Rajini Sivaram <ra...@googlemail.com>
Closes #3115 from ijuma/kafka-5305-missing-log-info-replica-fetcher
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c2ced5fb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c2ced5fb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c2ced5fb
Branch: refs/heads/trunk
Commit: c2ced5fb51f2a4ec94d158e18836afb7284d26ce
Parents: ceb10c5
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon May 22 16:08:06 2017 +0100
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Mon May 22 16:08:06 2017 +0100
----------------------------------------------------------------------
core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala | 3 ++-
core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 4 ++--
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala | 3 +++
3 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c2ced5fb/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index ec60220..394132b 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -128,6 +128,7 @@ object ConsumerFetcherThread {
}.toMap
def isEmpty: Boolean = underlying.requestInfo.isEmpty
def offset(topicPartition: TopicPartition): Long = tpToOffset(topicPartition)
+ override def toString = underlying.toString
}
class PartitionData(val underlying: FetchResponsePartitionData) extends AbstractFetcherThread.PartitionData {
@@ -136,6 +137,6 @@ object ConsumerFetcherThread {
def highWatermark: Long = underlying.hw
def exception: Option[Throwable] =
if (error == Errors.NONE) None else Some(ErrorMapping.exceptionFor(error.code))
-
+ override def toString = underlying.toString
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c2ced5fb/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 734c006..cb0680c 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -147,12 +147,12 @@ abstract class AbstractFetcherThread(name: String,
var responseData: Seq[(TopicPartition, PD)] = Seq.empty
try {
- trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
+ trace(s"Issuing fetch to broker ${sourceBroker.id}, request: $fetchRequest")
responseData = fetch(fetchRequest)
} catch {
case t: Throwable =>
if (isRunning.get) {
- warn(s"Error in fetch $fetchRequest", t)
+ warn(s"Error in fetch to broker ${sourceBroker.id}, request ${fetchRequest}", t)
inLock(partitionMapLock) {
partitionStates.partitionSet.asScala.foreach(updatePartitionsWithError)
// there is an error occurred while fetching partitions, sleep a while
http://git-wip-us.apache.org/repos/asf/kafka/blob/c2ced5fb/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index bd678b3..d7420dd 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -349,6 +349,7 @@ object ReplicaFetcherThread {
def isEmpty: Boolean = underlying.fetchData().isEmpty
def offset(topicPartition: TopicPartition): Long =
underlying.fetchData().asScala(topicPartition).fetchOffset
+ override def toString = underlying.toString
}
private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {
@@ -367,5 +368,7 @@ object ReplicaFetcherThread {
case Errors.NONE => None
case e => Some(e.exception)
}
+
+ override def toString = underlying.toString
}
}