You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/03/19 20:16:47 UTC

[kafka] branch 2.1 updated: MINOR: Improve logging around index files (#6385)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 3275965  MINOR: Improve logging around index files (#6385)
3275965 is described below

commit 327596585c4009aae306ae64467109ce40134105
Author: Bob Barrett <bo...@outlook.com>
AuthorDate: Tue Mar 19 02:00:01 2019 -0400

    MINOR: Improve logging around index files (#6385)
    
    This patch adds additional DEBUG statements in AbstractIndex.scala, OffsetIndex.scala, and TimeIndex.scala. It also changes the logging on append from DEBUG to TRACE to make DEBUG logging less disruptive, and it ensures that exceptions raised from index classes include file/offset information.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/log/AbstractIndex.scala    |  3 +++
 core/src/main/scala/kafka/log/OffsetIndex.scala      | 17 ++++++++++-------
 core/src/main/scala/kafka/log/TimeIndex.scala        | 17 +++++++++++------
 core/src/main/scala/kafka/log/TransactionIndex.scala | 13 +++++++------
 4 files changed, 31 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index ec9d55f..af28f83 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -174,6 +174,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
       val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
 
       if (_length == roundedNewSize) {
+        debug(s"Index ${file.getAbsolutePath} was not resized because it already has size $roundedNewSize")
         false
       } else {
         val raf = new RandomAccessFile(file, "rw")
@@ -188,6 +189,8 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
           mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
           _maxEntries = mmap.limit() / entrySize
           mmap.position(position)
+          debug(s"Resized ${file.getAbsolutePath} to $roundedNewSize, position is ${mmap.position()} " +
+            s"and limit is ${mmap.limit()}")
           true
         } finally {
           CoreUtils.swallow(raf.close(), this)
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 23dabf7..b11fa6d 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -58,8 +58,8 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
   /* the last offset in the index */
   private[this] var _lastOffset = lastEntry.offset
 
-  debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
-    .format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position()))
+  debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = $maxEntries, " +
+    s"maxIndexSize = $maxIndexSize, entries = ${_entries}, lastOffset = ${_lastOffset}, file position = ${mmap.position()}")
 
   /**
    * The last entry in the index
@@ -127,7 +127,8 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
   def entry(n: Int): OffsetPosition = {
     maybeLock(lock) {
       if(n >= _entries)
-        throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, _entries))
+        throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from index ${file.getAbsolutePath}, " +
+          s"which has size ${_entries}.")
       val idx = mmap.duplicate
       OffsetPosition(relativeOffset(idx, n), physical(idx, n))
     }
@@ -135,21 +136,21 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
 
   /**
    * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.
-   * @throws IndexOffsetOverflowException if the offset causes index offset to overflow
+   * @throws InvalidOffsetException if the offset causes index offset to overflow
    */
   def append(offset: Long, position: Int) {
     inLock(lock) {
       require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
       if (_entries == 0 || offset > _lastOffset) {
-        debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
+        trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
         mmap.putInt(relativeOffset(offset))
         mmap.putInt(position)
         _entries += 1
         _lastOffset = offset
         require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".")
       } else {
-        throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
-          .format(offset, entries, _lastOffset, file.getAbsolutePath))
+        throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
+          s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
       }
     }
   }
@@ -185,6 +186,8 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
       _entries = entries
       mmap.position(_entries * entrySize)
       _lastOffset = lastEntry.offset
+      debug(s"Truncated index ${file.getAbsolutePath} to $entries entries;" +
+        s" position is now ${mmap.position()} and last offset is now ${_lastOffset}")
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
index 1661cba..6106ed2 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -57,6 +57,9 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
 
   override def entrySize = 12
 
+  debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = $maxEntries, maxIndexSize = $maxIndexSize," +
+    s" entries = ${_entries}, lastOffset = ${_lastEntry}, file position = ${mmap.position()}")
+
   // We override the full check to reserve the last time index entry slot for the on roll call.
   override def isFull: Boolean = entries >= maxEntries - 1
 
@@ -86,7 +89,8 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
   def entry(n: Int): TimestampOffset = {
     maybeLock(lock) {
       if(n >= _entries)
-        throw new IllegalArgumentException("Attempt to fetch the %dth entry from a time index of size %d.".format(n, _entries))
+        throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from  time index ${file.getAbsolutePath} " +
+          s"which has size ${_entries}.")
       val idx = mmap.duplicate
       TimestampOffset(timestamp(idx, n), relativeOffset(idx, n))
     }
@@ -117,16 +121,16 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
       // 1. A log segment is closed.
       // 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled.
       if (_entries != 0 && offset < lastEntry.offset)
-        throw new InvalidOffsetException("Attempt to append an offset (%d) to slot %d no larger than the last offset appended (%d) to %s."
-          .format(offset, _entries, lastEntry.offset, file.getAbsolutePath))
+        throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot ${_entries} no larger than" +
+          s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.")
       if (_entries != 0 && timestamp < lastEntry.timestamp)
-        throw new IllegalStateException("Attempt to append a timestamp (%d) to slot %d no larger than the last timestamp appended (%d) to %s."
-            .format(timestamp, _entries, lastEntry.timestamp, file.getAbsolutePath))
+        throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger" +
+          s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.")
       // We only append to the time index when the timestamp is greater than the last inserted timestamp.
       // If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time
       // index will be empty.
       if (timestamp > lastEntry.timestamp) {
-        debug("Adding index entry %d => %d to %s.".format(timestamp, offset, file.getName))
+        trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.")
         mmap.putLong(timestamp)
         mmap.putInt(relativeOffset(offset))
         _entries += 1
@@ -202,6 +206,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
       _entries = entries
       mmap.position(_entries * entrySize)
       _lastEntry = lastEntryFromIndexFile
+      debug(s"Truncated index ${file.getAbsolutePath} to $entries entries; position is now ${mmap.position()} and last entry is now ${_lastEntry}")
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala
index da7fce8..e730fdb 100644
--- a/core/src/main/scala/kafka/log/TransactionIndex.scala
+++ b/core/src/main/scala/kafka/log/TransactionIndex.scala
@@ -53,7 +53,8 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
   def append(abortedTxn: AbortedTxn): Unit = {
     lastOffset.foreach { offset =>
       if (offset >= abortedTxn.lastOffset)
-        throw new IllegalArgumentException("The last offset of appended transactions must increase sequentially")
+        throw new IllegalArgumentException(s"The last offset of appended transactions must increase sequentially, but " +
+          s"${abortedTxn.lastOffset} is not greater than current last offset $offset of index ${file.getAbsolutePath}")
     }
     lastOffset = Some(abortedTxn.lastOffset)
     Utils.writeFully(channel, abortedTxn.buffer.duplicate())
@@ -138,8 +139,8 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
 
               val abortedTxn = new AbortedTxn(buffer)
               if (abortedTxn.version > AbortedTxn.CurrentVersion)
-                throw new KafkaException(s"Unexpected aborted transaction version ${abortedTxn.version}, " +
-                  s"current version is ${AbortedTxn.CurrentVersion}")
+                throw new KafkaException(s"Unexpected aborted transaction version ${abortedTxn.version} " +
+                  s"in transaction index ${file.getAbsolutePath}, current version is ${AbortedTxn.CurrentVersion}")
               val nextEntry = (abortedTxn, position)
               position += AbortedTxn.TotalSize
               nextEntry
@@ -147,7 +148,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
               case e: IOException =>
                 // We received an unexpected error reading from the index file. We propagate this as an
                 // UNKNOWN error to the consumer, which will cause it to retry the fetch.
-                throw new KafkaException(s"Failed to read from the transaction index $file", e)
+                throw new KafkaException(s"Failed to read from the transaction index ${file.getAbsolutePath}", e)
             }
           }
         }
@@ -187,8 +188,8 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
     val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize)
     for ((abortedTxn, _) <- iterator(() => buffer)) {
       if (abortedTxn.lastOffset < startOffset)
-        throw new CorruptIndexException(s"Last offset of aborted transaction $abortedTxn is less than start offset " +
-          s"$startOffset")
+        throw new CorruptIndexException(s"Last offset of aborted transaction $abortedTxn in index " +
+          s"${file.getAbsolutePath} is less than start offset $startOffset")
     }
   }