You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/04/30 15:59:11 UTC

[kafka] branch trunk updated: MINOR: Ensure exception messages include partition/segment info when possible (#4907)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f467c9c  MINOR: Ensure exception messages include partition/segment info when possible (#4907)
f467c9c is described below

commit f467c9c2438a8b182083879927ff171a6a2c6f2f
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Apr 30 08:59:04 2018 -0700

    MINOR: Ensure exception messages include partition/segment info when possible (#4907)
    
    Reviewers: Anna Povzner <an...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
 .../common/record/ByteBufferLogInputStream.java    |  6 +++--
 .../kafka/common/record/FileLogInputStream.java    | 12 +++++----
 .../apache/kafka/common/record/FileRecords.java    | 16 +++++++++---
 .../common/record/FileLogInputStreamTest.java      | 15 ++++-------
 core/src/main/scala/kafka/log/Log.scala            | 30 +++++++++++++---------
 core/src/main/scala/kafka/log/LogCleaner.scala     |  8 +++++-
 core/src/main/scala/kafka/log/LogSegment.scala     |  2 +-
 7 files changed, 54 insertions(+), 35 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
index bd54e40..22f417f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
@@ -46,9 +46,11 @@ class ByteBufferLogInputStream implements LogInputStream<MutableRecordBatch> {
         int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET);
         // V0 has the smallest overhead, stricter checking is done later
         if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0)
-            throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0));
+            throw new CorruptRecordException(String.format("Record size %d is less than the minimum record overhead (%d)",
+                    recordSize, LegacyRecord.RECORD_OVERHEAD_V0));
         if (recordSize > maxMessageSize)
-            throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
+            throw new CorruptRecordException(String.format("Record size %d exceeds the largest allowable message size (%d).",
+                    recordSize, maxMessageSize));
 
         int batchSize = recordSize + LOG_OVERHEAD;
         if (remaining < batchSize)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index 75eb1b3..a1e3a2f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -40,25 +40,26 @@ import static org.apache.kafka.common.record.Records.SIZE_OFFSET;
 public class FileLogInputStream implements LogInputStream<FileLogInputStream.FileChannelRecordBatch> {
     private int position;
     private final int end;
-    private final FileChannel channel;
+    private final FileRecords fileRecords;
     private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC);
 
     /**
      * Create a new log input stream over the FileChannel
-     * @param channel Underlying FileChannel
+     * @param records Underlying FileRecords instance
      * @param start Position in the file channel to start from
      * @param end Position in the file channel not to read past
      */
-    FileLogInputStream(FileChannel channel,
+    FileLogInputStream(FileRecords records,
                        int start,
                        int end) {
-        this.channel = channel;
+        this.fileRecords = records;
         this.position = start;
         this.end = end;
     }
 
     @Override
     public FileChannelRecordBatch nextBatch() throws IOException {
+        FileChannel channel = fileRecords.channel();
         if (position + HEADER_SIZE_UP_TO_MAGIC >= end)
             return null;
 
@@ -71,7 +72,8 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
 
         // V0 has the smallest overhead, stricter checking is done later
         if (size < LegacyRecord.RECORD_OVERHEAD_V0)
-            throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", LegacyRecord.RECORD_OVERHEAD_V0));
+            throw new CorruptRecordException(String.format("Found record size %d smaller than minimum record " +
+                            "overhead (%d) in file %s.", size, LegacyRecord.RECORD_OVERHEAD_V0, fileRecords.file()));
 
         if (position + LOG_OVERHEAD + size > end)
             return null;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index fd9b4be..6b6e0ab 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -132,9 +132,9 @@ public class FileRecords extends AbstractRecords implements Closeable {
      */
     public FileRecords read(int position, int size) throws IOException {
         if (position < 0)
-            throw new IllegalArgumentException("Invalid position: " + position);
+            throw new IllegalArgumentException("Invalid position: " + position + " in read from " + file);
         if (size < 0)
-            throw new IllegalArgumentException("Invalid size: " + size);
+            throw new IllegalArgumentException("Invalid size: " + size + " in read from " + file);
 
         int end = this.start + position + size;
         // handle integer overflow or if end is beyond the end of the file
@@ -228,7 +228,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
     public int truncateTo(int targetSize) throws IOException {
         int originalSize = sizeInBytes();
         if (targetSize > originalSize || targetSize < 0)
-            throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
+            throw new KafkaException("Attempt to truncate log segment " + file + " to " + targetSize + " bytes failed, " +
                     " size of this log segment is " + originalSize + " bytes.");
         if (targetSize < (int) channel.size()) {
             channel.truncate(targetSize);
@@ -347,6 +347,14 @@ public class FileRecords extends AbstractRecords implements Closeable {
         return batches;
     }
 
+    @Override
+    public String toString() {
+        return "FileRecords(file= " + file +
+                ", start=" + start +
+                ", end=" + end +
+                ")";
+    }
+
     private Iterable<FileChannelRecordBatch> batchesFrom(final int start) {
         return new Iterable<FileChannelRecordBatch>() {
             @Override
@@ -362,7 +370,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
             end = this.end;
         else
             end = this.sizeInBytes();
-        FileLogInputStream inputStream = new FileLogInputStream(channel, start, end);
+        FileLogInputStream inputStream = new FileLogInputStream(this, start, end);
         return new RecordBatchIterator<>(inputStream);
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
index d5de4bd..95b2a0c 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
@@ -60,8 +60,7 @@ public class FileLogInputStreamTest {
             fileRecords.append(MemoryRecords.withRecords(magic, compression, new SimpleRecord("foo".getBytes())));
             fileRecords.flush();
 
-            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
-                    fileRecords.sizeInBytes());
+            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, fileRecords.sizeInBytes());
 
             FileChannelRecordBatch batch = logInputStream.nextBatch();
             assertNotNull(batch);
@@ -90,8 +89,7 @@ public class FileLogInputStreamTest {
             fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecord));
             fileRecords.flush();
 
-            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
-                    fileRecords.sizeInBytes());
+            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, fileRecords.sizeInBytes());
 
             FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
             assertGenericRecordBatchData(firstBatch, 0L, 3241324L, firstBatchRecord);
@@ -126,8 +124,7 @@ public class FileLogInputStreamTest {
             fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecords));
             fileRecords.flush();
 
-            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
-                    fileRecords.sizeInBytes());
+            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, fileRecords.sizeInBytes());
 
             FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
             assertNoProducerData(firstBatch);
@@ -169,8 +166,7 @@ public class FileLogInputStreamTest {
                     producerEpoch, baseSequence + firstBatchRecords.length, partitionLeaderEpoch, secondBatchRecords));
             fileRecords.flush();
 
-            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
-                    fileRecords.sizeInBytes());
+            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, fileRecords.sizeInBytes());
 
             FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
             assertProducerData(firstBatch, producerId, producerEpoch, baseSequence, false, firstBatchRecords);
@@ -198,8 +194,7 @@ public class FileLogInputStreamTest {
             fileRecords.flush();
             fileRecords.truncateTo(fileRecords.sizeInBytes() - 13);
 
-            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
-                    fileRecords.sizeInBytes());
+            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, fileRecords.sizeInBytes());
 
             FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
             assertNoProducerData(firstBatch);
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index de4bb29..0b1a18a 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -686,7 +686,8 @@ class Log(@volatile var dir: File,
               leaderEpoch,
               isFromClient)
           } catch {
-            case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
+            case e: IOException =>
+              throw new KafkaException(s"Error validating messages while appending to log $name", e)
           }
           validRecords = validateAndOffsetAssignResult.validatedRecords
           appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
@@ -705,15 +706,16 @@ class Log(@volatile var dir: File,
                 // to be consistent with pre-compression bytesRejectedRate recording
                 brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
                 brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
-                throw new RecordTooLargeException("Message batch size is %d bytes which exceeds the maximum configured size of %d."
-                  .format(batch.sizeInBytes, config.maxMessageSize))
+                throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
+                  s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
               }
             }
           }
         } else {
           // we are taking the offsets we are given
           if (!appendInfo.offsetsMonotonic || appendInfo.firstOrLastOffset < nextOffsetMetadata.messageOffset)
-            throw new IllegalArgumentException("Out of order offsets found in " + records.records.asScala.map(_.offset))
+            throw new IllegalArgumentException(s"Out of order offsets found in append to $topicPartition: " +
+              records.records.asScala.map(_.offset))
         }
 
         // update the epoch cache with the epoch stamped onto the message by the leader
@@ -724,8 +726,8 @@ class Log(@volatile var dir: File,
 
         // check messages set size may be exceed config.segmentSize
         if (validRecords.sizeInBytes > config.segmentSize) {
-          throw new RecordBatchTooLargeException("Message batch size is %d bytes which exceeds the maximum configured segment size of %d."
-            .format(validRecords.sizeInBytes, config.segmentSize))
+          throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
+            s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
         }
 
         // now that we have valid records, offsets assigned, and timestamps updated, we need to
@@ -887,7 +889,8 @@ class Log(@volatile var dir: File,
     for (batch <- records.batches.asScala) {
       // we only validate V2 and higher to avoid potential compatibility issues with older clients
       if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0)
-        throw new InvalidRecordException(s"The baseOffset of the record batch should be 0, but it is ${batch.baseOffset}")
+        throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " +
+          s"be 0, but it is ${batch.baseOffset}")
 
       // update the first offset if on the first message. For magic versions older than 2, we use the last offset
       // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
@@ -913,8 +916,8 @@ class Log(@volatile var dir: File,
       if (batchSize > config.maxMessageSize) {
         brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
         brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
-        throw new RecordTooLargeException(s"The record batch size is $batchSize bytes which exceeds the maximum configured " +
-          s"value of ${config.maxMessageSize}.")
+        throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " +
+          s"which exceeds the maximum configured value of ${config.maxMessageSize}.")
       }
 
       // check the validity of the message by checking CRC
@@ -957,7 +960,8 @@ class Log(@volatile var dir: File,
   private def trimInvalidBytes(records: MemoryRecords, info: LogAppendInfo): MemoryRecords = {
     val validBytes = info.validBytes
     if (validBytes < 0)
-      throw new CorruptRecordException("Illegal length of message set " + validBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
+      throw new CorruptRecordException(s"Cannot append record batch with illegal length $validBytes to " +
+        s"log for $topicPartition. A possible cause is a corrupted produce request.")
     if (validBytes == records.sizeInBytes) {
       records
     } else {
@@ -1011,7 +1015,8 @@ class Log(@volatile var dir: File,
 
       // return error on attempt to read beyond the log end offset or read below log start offset
       if (startOffset > next || segmentEntry == null || startOffset < logStartOffset)
-        throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next))
+        throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
+          s"but we only have log segments in the range $logStartOffset to $next.")
 
       // Do the read on the segment with a base offset less than the target offset
       // but if that segment doesn't contain any messages with an offset greater than that
@@ -1375,7 +1380,8 @@ class Log(@volatile var dir: File,
           preallocate = config.preallocate)
         val prev = addSegment(segment)
         if (prev != null)
-          throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
+          throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with " +
+            s"start offset $newOffset while it already exists.")
         // We need to update the segment base offset and append position data of the metadata when log rolls.
         // The next offset should not change.
         updateLogEndOffset(nextOffsetMetadata.messageOffset)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 0dd63b3..2e32250 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -800,7 +800,13 @@ private[log] class Cleaner(val id: Int,
     while (position < segment.log.sizeInBytes) {
       checkDone(topicPartition)
       readBuffer.clear()
-      segment.log.readInto(readBuffer, position)
+      try {
+        segment.log.readInto(readBuffer, position)
+      } catch {
+        case e: Exception =>
+          throw new KafkaException(s"Failed to read from segment $segment of partition $topicPartition " +
+            "while loading offset map", e)
+      }
       val records = MemoryRecords.readableRecords(readBuffer)
       throttler.maybeThrottle(records.sizeInBytes)
 
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 5f5f3736..55ab088 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -205,7 +205,7 @@ class LogSegment private[log] (val log: FileRecords,
   def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
            minOneMessage: Boolean = false): FetchDataInfo = {
     if (maxSize < 0)
-      throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
+      throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
 
     val logSize = log.sizeInBytes // this may change, need to save a consistent copy
     val startOffsetAndSize = translateOffset(startOffset)

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.