You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/03/19 11:08:19 UTC
[kafka] branch trunk updated: MINOR: Some logging improvements for
debugging delayed produce status (#4691)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7041e76 MINOR: Some logging improvements for debugging delayed produce status (#4691)
7041e76 is described below
commit 7041e76bd6ef0c28ec8de2d07f2208171745f936
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Mar 19 04:08:12 2018 -0700
MINOR: Some logging improvements for debugging delayed produce status (#4691)
A few small logging improvements which help debugging replication issues.
---
core/src/main/scala/kafka/cluster/Partition.scala | 29 +++++++++-------------
.../main/scala/kafka/server/DelayedProduce.scala | 10 ++++----
.../scala/kafka/server/LogOffsetMetadata.scala | 2 +-
.../main/scala/kafka/tools/DumpLogSegments.scala | 2 +-
4 files changed, 19 insertions(+), 24 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 68faf00..6eb9611 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -404,22 +404,16 @@ class Partition(val topic: String,
// keep the current immutable replica list reference
val curInSyncReplicas = inSyncReplicas
- def numAcks = curInSyncReplicas.count { r =>
- if (!r.isLocal)
- if (r.logEndOffset.messageOffset >= requiredOffset) {
- trace(s"Replica ${r.brokerId} received offset $requiredOffset")
- true
- }
- else
- false
- else
- true /* also count the local (leader) replica */
+ if (isTraceEnabled) {
+ def logEndOffsetString(r: Replica) = s"broker ${r.brokerId}: ${r.logEndOffset.messageOffset}"
+ val (ackedReplicas, awaitingReplicas) = curInSyncReplicas.partition { replica =>
+ replica.logEndOffset.messageOffset >= requiredOffset
+ }
+ trace(s"Progress awaiting ISR acks for offset $requiredOffset: acked: ${ackedReplicas.map(logEndOffsetString)}, " +
+ s"awaiting ${awaitingReplicas.map(logEndOffsetString)}")
}
- trace(s"$numAcks acks satisfied with acks = -1")
-
val minIsr = leaderReplica.log.get.config.minInSyncReplicas
-
if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
/*
* The topic may be configured not to accept messages if there are not enough replicas in ISR
@@ -468,9 +462,10 @@ class Partition(val topic: String,
leaderReplica.highWatermark = newHighWatermark
debug(s"High watermark updated to $newHighWatermark")
true
- } else {
- debug(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old hw $oldHighWatermark." +
- s"All LEOs are ${allLogEndOffsets.mkString(",")}")
+ } else {
+ def logEndOffsetString(r: Replica) = s"replica ${r.brokerId}: ${r.logEndOffset}"
+ debug(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old hw $oldHighWatermark. " +
+ s"All current LEOs are ${assignedReplicas.map(logEndOffsetString)}")
false
}
}
@@ -482,7 +477,7 @@ class Partition(val topic: String,
*/
def lowWatermarkIfLeader: Long = {
if (!isLeaderReplicaLocal)
- throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d".format(topicPartition, localBrokerId))
+ throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
val logStartOffsets = allReplicas.collect {
case replica if replicaManager.metadataCache.isBrokerAlive(replica.brokerId) || replica.brokerId == Request.FutureLocalReplicaId => replica.logStartOffset
}
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 718ed24..dbecba4 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -34,8 +34,8 @@ import scala.collection._
case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse) {
@volatile var acksPending = false
- override def toString = "[acksPending: %b, error: %d, startOffset: %d, requiredOffset: %d]"
- .format(acksPending, responseStatus.error.code, responseStatus.baseOffset, requiredOffset)
+ override def toString = s"[acksPending: $acksPending, error: ${responseStatus.error.code}, " +
+ s"startOffset: ${responseStatus.baseOffset}, requiredOffset: $requiredOffset]"
}
/**
@@ -44,8 +44,7 @@ case class ProducePartitionStatus(requiredOffset: Long, responseStatus: Partitio
case class ProduceMetadata(produceRequiredAcks: Short,
produceStatus: Map[TopicPartition, ProducePartitionStatus]) {
- override def toString = "[requiredAcks: %d, partitionStatus: %s]"
- .format(produceRequiredAcks, produceStatus)
+ override def toString = s"[requiredAcks: $produceRequiredAcks, partitionStatus: $produceStatus]"
}
/**
@@ -69,7 +68,7 @@ class DelayedProduce(delayMs: Long,
status.acksPending = false
}
- trace("Initial partition status for %s is %s".format(topicPartition, status))
+ trace(s"Initial partition status for $topicPartition is $status")
}
/**
@@ -116,6 +115,7 @@ class DelayedProduce(delayMs: Long,
override def onExpiration() {
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
if (status.acksPending) {
+ debug(s"Expiring produce request for partition $topicPartition with status $status")
DelayedProduceMetrics.recordExpiration(topicPartition)
}
}
diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
index edc010e..effbaa0 100644
--- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
+++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
@@ -79,6 +79,6 @@ case class LogOffsetMetadata(messageOffset: Long,
segmentBaseOffset == LogOffsetMetadata.UnknownSegBaseOffset && relativePositionInSegment == LogOffsetMetadata.UnknownFilePosition
}
- override def toString = messageOffset.toString + " [" + segmentBaseOffset + " : " + relativePositionInSegment + "]"
+ override def toString = s"(offset=$messageOffset segment=[$segmentBaseOffset:$relativePositionInSegment])"
}
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 2fc203a..e96e1ad 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -417,7 +417,7 @@ object DumpLogSegments {
}
} else {
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
- print("baseOffset: " + batch.baseOffset + " lastOffset: " + batch.lastOffset +
+ print("baseOffset: " + batch.baseOffset + " lastOffset: " + batch.lastOffset + " count: " + batch.countOrNull +
" baseSequence: " + batch.baseSequence + " lastSequence: " + batch.lastSequence +
" producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch +
" partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional)
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.