You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/04/30 15:59:11 UTC
[kafka] branch trunk updated: MINOR: Ensure exception messages
include partition/segment info when possible (#4907)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f467c9c MINOR: Ensure exception messages include partition/segment info when possible (#4907)
f467c9c is described below
commit f467c9c2438a8b182083879927ff171a6a2c6f2f
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Apr 30 08:59:04 2018 -0700
MINOR: Ensure exception messages include partition/segment info when possible (#4907)
Reviewers: Anna Povzner <an...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
.../common/record/ByteBufferLogInputStream.java | 6 +++--
.../kafka/common/record/FileLogInputStream.java | 12 +++++----
.../apache/kafka/common/record/FileRecords.java | 16 +++++++++---
.../common/record/FileLogInputStreamTest.java | 15 ++++-------
core/src/main/scala/kafka/log/Log.scala | 30 +++++++++++++---------
core/src/main/scala/kafka/log/LogCleaner.scala | 8 +++++-
core/src/main/scala/kafka/log/LogSegment.scala | 2 +-
7 files changed, 54 insertions(+), 35 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
index bd54e40..22f417f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
@@ -46,9 +46,11 @@ class ByteBufferLogInputStream implements LogInputStream<MutableRecordBatch> {
int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET);
// V0 has the smallest overhead, stricter checking is done later
if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0)
- throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0));
+ throw new CorruptRecordException(String.format("Record size %d is less than the minimum record overhead (%d)",
+ recordSize, LegacyRecord.RECORD_OVERHEAD_V0));
if (recordSize > maxMessageSize)
- throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
+ throw new CorruptRecordException(String.format("Record size %d exceeds the largest allowable message size (%d).",
+ recordSize, maxMessageSize));
int batchSize = recordSize + LOG_OVERHEAD;
if (remaining < batchSize)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index 75eb1b3..a1e3a2f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -40,25 +40,26 @@ import static org.apache.kafka.common.record.Records.SIZE_OFFSET;
public class FileLogInputStream implements LogInputStream<FileLogInputStream.FileChannelRecordBatch> {
private int position;
private final int end;
- private final FileChannel channel;
+ private final FileRecords fileRecords;
private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC);
/**
* Create a new log input stream over the FileChannel
- * @param channel Underlying FileChannel
+ * @param records Underlying FileRecords instance
* @param start Position in the file channel to start from
* @param end Position in the file channel not to read past
*/
- FileLogInputStream(FileChannel channel,
+ FileLogInputStream(FileRecords records,
int start,
int end) {
- this.channel = channel;
+ this.fileRecords = records;
this.position = start;
this.end = end;
}
@Override
public FileChannelRecordBatch nextBatch() throws IOException {
+ FileChannel channel = fileRecords.channel();
if (position + HEADER_SIZE_UP_TO_MAGIC >= end)
return null;
@@ -71,7 +72,8 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
// V0 has the smallest overhead, stricter checking is done later
if (size < LegacyRecord.RECORD_OVERHEAD_V0)
- throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", LegacyRecord.RECORD_OVERHEAD_V0));
+ throw new CorruptRecordException(String.format("Found record size %d smaller than minimum record " +
+ "overhead (%d) in file %s.", size, LegacyRecord.RECORD_OVERHEAD_V0, fileRecords.file()));
if (position + LOG_OVERHEAD + size > end)
return null;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index fd9b4be..6b6e0ab 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -132,9 +132,9 @@ public class FileRecords extends AbstractRecords implements Closeable {
*/
public FileRecords read(int position, int size) throws IOException {
if (position < 0)
- throw new IllegalArgumentException("Invalid position: " + position);
+ throw new IllegalArgumentException("Invalid position: " + position + " in read from " + file);
if (size < 0)
- throw new IllegalArgumentException("Invalid size: " + size);
+ throw new IllegalArgumentException("Invalid size: " + size + " in read from " + file);
int end = this.start + position + size;
// handle integer overflow or if end is beyond the end of the file
@@ -228,7 +228,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
public int truncateTo(int targetSize) throws IOException {
int originalSize = sizeInBytes();
if (targetSize > originalSize || targetSize < 0)
- throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
+ throw new KafkaException("Attempt to truncate log segment " + file + " to " + targetSize + " bytes failed, " +
" size of this log segment is " + originalSize + " bytes.");
if (targetSize < (int) channel.size()) {
channel.truncate(targetSize);
@@ -347,6 +347,14 @@ public class FileRecords extends AbstractRecords implements Closeable {
return batches;
}
+ @Override
+ public String toString() {
+ return "FileRecords(file= " + file +
+ ", start=" + start +
+ ", end=" + end +
+ ")";
+ }
+
private Iterable<FileChannelRecordBatch> batchesFrom(final int start) {
return new Iterable<FileChannelRecordBatch>() {
@Override
@@ -362,7 +370,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
end = this.end;
else
end = this.sizeInBytes();
- FileLogInputStream inputStream = new FileLogInputStream(channel, start, end);
+ FileLogInputStream inputStream = new FileLogInputStream(this, start, end);
return new RecordBatchIterator<>(inputStream);
}
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
index d5de4bd..95b2a0c 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
@@ -60,8 +60,7 @@ public class FileLogInputStreamTest {
fileRecords.append(MemoryRecords.withRecords(magic, compression, new SimpleRecord("foo".getBytes())));
fileRecords.flush();
- FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
- fileRecords.sizeInBytes());
+ FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, fileRecords.sizeInBytes());
FileChannelRecordBatch batch = logInputStream.nextBatch();
assertNotNull(batch);
@@ -90,8 +89,7 @@ public class FileLogInputStreamTest {
fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecord));
fileRecords.flush();
- FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
- fileRecords.sizeInBytes());
+ FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, fileRecords.sizeInBytes());
FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
assertGenericRecordBatchData(firstBatch, 0L, 3241324L, firstBatchRecord);
@@ -126,8 +124,7 @@ public class FileLogInputStreamTest {
fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecords));
fileRecords.flush();
- FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
- fileRecords.sizeInBytes());
+ FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, fileRecords.sizeInBytes());
FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
assertNoProducerData(firstBatch);
@@ -169,8 +166,7 @@ public class FileLogInputStreamTest {
producerEpoch, baseSequence + firstBatchRecords.length, partitionLeaderEpoch, secondBatchRecords));
fileRecords.flush();
- FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
- fileRecords.sizeInBytes());
+ FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, fileRecords.sizeInBytes());
FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
assertProducerData(firstBatch, producerId, producerEpoch, baseSequence, false, firstBatchRecords);
@@ -198,8 +194,7 @@ public class FileLogInputStreamTest {
fileRecords.flush();
fileRecords.truncateTo(fileRecords.sizeInBytes() - 13);
- FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
- fileRecords.sizeInBytes());
+ FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, fileRecords.sizeInBytes());
FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
assertNoProducerData(firstBatch);
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index de4bb29..0b1a18a 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -686,7 +686,8 @@ class Log(@volatile var dir: File,
leaderEpoch,
isFromClient)
} catch {
- case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
+ case e: IOException =>
+ throw new KafkaException(s"Error validating messages while appending to log $name", e)
}
validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
@@ -705,15 +706,16 @@ class Log(@volatile var dir: File,
// to be consistent with pre-compression bytesRejectedRate recording
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
- throw new RecordTooLargeException("Message batch size is %d bytes which exceeds the maximum configured size of %d."
- .format(batch.sizeInBytes, config.maxMessageSize))
+ throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
+ s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
}
}
}
} else {
// we are taking the offsets we are given
if (!appendInfo.offsetsMonotonic || appendInfo.firstOrLastOffset < nextOffsetMetadata.messageOffset)
- throw new IllegalArgumentException("Out of order offsets found in " + records.records.asScala.map(_.offset))
+ throw new IllegalArgumentException(s"Out of order offsets found in append to $topicPartition: " +
+ records.records.asScala.map(_.offset))
}
// update the epoch cache with the epoch stamped onto the message by the leader
@@ -724,8 +726,8 @@ class Log(@volatile var dir: File,
// check messages set size may be exceed config.segmentSize
if (validRecords.sizeInBytes > config.segmentSize) {
- throw new RecordBatchTooLargeException("Message batch size is %d bytes which exceeds the maximum configured segment size of %d."
- .format(validRecords.sizeInBytes, config.segmentSize))
+ throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
+ s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
}
// now that we have valid records, offsets assigned, and timestamps updated, we need to
@@ -887,7 +889,8 @@ class Log(@volatile var dir: File,
for (batch <- records.batches.asScala) {
// we only validate V2 and higher to avoid potential compatibility issues with older clients
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0)
- throw new InvalidRecordException(s"The baseOffset of the record batch should be 0, but it is ${batch.baseOffset}")
+ throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " +
+ s"be 0, but it is ${batch.baseOffset}")
// update the first offset if on the first message. For magic versions older than 2, we use the last offset
// to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
@@ -913,8 +916,8 @@ class Log(@volatile var dir: File,
if (batchSize > config.maxMessageSize) {
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
- throw new RecordTooLargeException(s"The record batch size is $batchSize bytes which exceeds the maximum configured " +
- s"value of ${config.maxMessageSize}.")
+ throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " +
+ s"which exceeds the maximum configured value of ${config.maxMessageSize}.")
}
// check the validity of the message by checking CRC
@@ -957,7 +960,8 @@ class Log(@volatile var dir: File,
private def trimInvalidBytes(records: MemoryRecords, info: LogAppendInfo): MemoryRecords = {
val validBytes = info.validBytes
if (validBytes < 0)
- throw new CorruptRecordException("Illegal length of message set " + validBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
+ throw new CorruptRecordException(s"Cannot append record batch with illegal length $validBytes to " +
+ s"log for $topicPartition. A possible cause is a corrupted produce request.")
if (validBytes == records.sizeInBytes) {
records
} else {
@@ -1011,7 +1015,8 @@ class Log(@volatile var dir: File,
// return error on attempt to read beyond the log end offset or read below log start offset
if (startOffset > next || segmentEntry == null || startOffset < logStartOffset)
- throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next))
+ throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
+ s"but we only have log segments in the range $logStartOffset to $next.")
// Do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
@@ -1375,7 +1380,8 @@ class Log(@volatile var dir: File,
preallocate = config.preallocate)
val prev = addSegment(segment)
if (prev != null)
- throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
+ throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with " +
+ s"start offset $newOffset while it already exists.")
// We need to update the segment base offset and append position data of the metadata when log rolls.
// The next offset should not change.
updateLogEndOffset(nextOffsetMetadata.messageOffset)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 0dd63b3..2e32250 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -800,7 +800,13 @@ private[log] class Cleaner(val id: Int,
while (position < segment.log.sizeInBytes) {
checkDone(topicPartition)
readBuffer.clear()
- segment.log.readInto(readBuffer, position)
+ try {
+ segment.log.readInto(readBuffer, position)
+ } catch {
+ case e: Exception =>
+ throw new KafkaException(s"Failed to read from segment $segment of partition $topicPartition " +
+ "while loading offset map", e)
+ }
val records = MemoryRecords.readableRecords(readBuffer)
throttler.maybeThrottle(records.sizeInBytes)
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 5f5f3736..55ab088 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -205,7 +205,7 @@ class LogSegment private[log] (val log: FileRecords,
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {
if (maxSize < 0)
- throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
+ throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
val logSize = log.sizeInBytes // this may change, need to save a consistent copy
val startOffsetAndSize = translateOffset(startOffset)
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.